我們必須要把資料先讀入後才能進行計算,假設讀入用時0.1s,計算用時0.9s,那麼就意味著每過1s,gpu都會有0.1s無事可做,這就大大降低了運算的效率。
解決這個問題方法就是將讀入資料和計算分別放在兩個執行緒中,將資料讀入記憶體的乙個佇列
讀取執行緒源源不斷地將檔案系統中的讀入到乙個記憶體的佇列中,而負責計算的是另乙個執行緒,計算需要資料時,直接從記憶體佇列中取就可以了。這樣就可以解決gpu因為io而空閒的問題!
在tensorflow中,為了方便管理,在記憶體佇列前又新增了一層所謂的「檔名佇列」。tensorflow使用檔名佇列+記憶體佇列雙佇列的形式讀入檔案,可以很好地管理epoch。
對於檔名佇列,我們使用tf.train.string_input_producer函式。這個函式需要傳入乙個檔名list,系統會自動將它轉為乙個檔名佇列。tf.train.string_input_producer還有兩個重要的引數,乙個是num_epochs,表示epoch數,即限制載入出事檔案列表的最大輪數,當所有檔案都已經被使用了被設定的輪數後,如果繼續嘗試讀取新的檔案輸入佇列會報錯(outofrange)。另外乙個就是shuffle是指在乙個epoch內檔案的順序是否被打亂。shuffle=true時,檔案在加入佇列前會被打亂順序。
epochs:
當乙個完整的資料集通過了神經網路一次並且返回了一次,這個過程稱為乙個 epoch。
在tensorflow中,記憶體佇列不需要我們自己建立,我們只需要使用reader物件從檔名佇列中讀取資料就可以了。
在我們使用tf.train.string_input_producer建立檔名佇列後,整個系統其實還是處於「停滯狀態」的,也就是說,我們檔名並沒有真正被加入到佇列中,此時如果我們開始計算,因為記憶體佇列中什麼也沒有,計算單元就會一直等待,導致整個系統被阻塞。使用tf.train.start_queue_runners之後,才會啟動填充佇列的執行緒,這時系統就不再「停滯」。此後計算單元就可以拿到資料並進行計算,整個程式也就跑起來了。
reader每次讀取一張並儲存。
import三個概念:tensorflow as tf 23
#新建乙個session
4with tf.session() as sess:5#
讀三幅a.jpg, b.jpg, c.jpg
7#string_input_producer會產生乙個檔名佇列
8 filename_queue = tf.train.string_input_producer(filename, shuffle=false, num_epochs=5)9#
reader從檔名佇列中讀資料。對應的方法是reader.read
10 reader =tf.wholefilereader()
11 key, value =reader.read(filename_queue)12#
tf.train.string_input_producer定義了乙個epoch變數,要對它進行初始化
13tf.local_variables_initializer().run()14#
使用start_queue_runners之後,才會開始填充佇列
15 threads = tf.train.start_queue_runners(sess=sess)
16 i =0
17while
true:
18 i += 119#
獲取資料並儲存
queue:佇列
tf.fifoqueue(capacity, dtypes, shapes=none, names=none ...) #建立函式的引數
import tensorflow as tf
q=tf.fifoqueue(2,"int32")#建立佇列,指定佇列中最多可以儲存兩個元素,並制定元素型別為整數型
init=q.enqueue_many(([0,10],))#使用enqueue_many函式初始化佇列
x=q.dequeue()#使用dequeue函式將佇列中的第乙個元素出佇列,這個元素的值將被儲存在變數x中
y=x+1#將得到的值加1
q_inc=q.enqueue([y])#將得到的值在重新入佇列
with tf.session() as sess:
init.run()#執行初始化佇列的操作
for _ in rangr(5):
v,_ =sess.run([x,q_inc])#執行q_inc將執行資料出佇列,出隊元素加1,重新入佇列的整個過程
print v
輸出
'''佇列開始有列表[1,10]兩個元素,第乙個出隊列為0,加1後入佇列得到[10,1];第二次出隊列為10,加1 後入隊為[1,11];以此類推。010
1112'''
以上為如何使用fifoqueue,另外還有randomshufflequeue,此函式會將佇列中的元素打亂,出佇列是為隨機的乙個。
queuerunnertensorflow的計算主要在使用cpu/gpu和記憶體,而資料讀取涉及磁碟操作,速度遠低於前者操作。因此通常會使用多個執行緒讀取資料,然後
使用乙個執行緒消費資料,queuerunner就是來管理這些讀寫佇列的執行緒。
1 import tensorflow as tf
2 import sys
3 q = tf.fifoqueue(10, "float")
4 counter = tf.variable(0.0) #計數器 tf.variable(0.0) ==0 一維張量
5 # 給計數器加一
6 increment_op = tf.assign_add(counter, 1.0)
7 # 將計數器加入佇列
8 enqueue_op = q.enqueue(counter)
910 # 建立queuerunner
11 # 用多個執行緒向佇列新增資料
12 # 這裡實際建立了4個執行緒,兩個增加計數,兩個執行入隊
13 qr = tf.train.queuerunner(q, enqueue_ops=[increment_op, enqueue_op] * 2)
1415 # 主線程
16 sess = tf.interactivesession()
17 tf.global_variables_initializer().run()
18 # 啟動入隊執行緒
19 qr.create_threads(sess, start=true)
20 for i in range(20):
21 print (sess.run(q.dequeue()))
增加計數的程序會不停的後台執行,執行入隊的程序會先執行10次(因為佇列長度只有10),然後主線程開始消費資料,當一部分資料消費被後,入隊的程序又會開始執行。最終主線程消費完20個資料後停止,但其他執行緒繼續執行,程式不會結束。
1import
tensorflow as tf
2import
sys
3q = tf.fifoqueue(10,
"float")
4counter = tf.variable(0.0)
#計數器5#
給計數器加一
6increment_op = tf.assign_add(counter, 1.0)7
#將計數器加入佇列
8enqueue_op =
q.enqueue(counter)910
#建立queuerunner11#
用多個執行緒向佇列新增資料12#
這裡實際建立了4個執行緒,兩個增加計數,兩個執行入隊
13qr = tf.train.queuerunner(q, enqueue_ops=[increment_op, enqueue_op] * 2)14
15#主線程16
sess =
tf.interactivesession()
17tf.global_variables_initializer().run()18#
啟動入隊執行緒
19qr.create_threads(sess, start=
true)
20for
i in
range(20
):21
(sess.run(q.dequeue()))
對多執行緒進行測試
在junit下,很難對多執行緒的 進行測試,因此需要借助其他的方式,比如executor service框架.最近要對乙個監控類的輸出情況寫乙個測試,簡單的跑一下輸出格式是否跟預期一致.我這裡借助了threadpoolexecutor completionservice.即在結束前,需要從執行緒池中...
對多執行緒的理解
對執行緒的理解 1.當多個執行緒訪問同乙個靜態變數時,會發生執行緒安全問題,其中乙個執行緒對這個靜態變數修改值後,其餘執行緒在使用這個靜態變數就會收到值更新的影響,導致執行緒中其他地方使用這個值受影響。舉個例子 靜態變數 staitic string str 1 執行緒1 str 2 print s...
多執行緒 Java多執行緒與併發
實現的方式主要有三種 執行緒的狀態 基本差別 最主要的本質區別 兩個概念 鎖池 假設執行緒a已經擁有了某個物件 不是類 的鎖,而其他執行緒b c想要呼叫這個物件的某個synchronized方法 或者塊 由於b c執行緒在進入物件的synchronized方法 或者塊 之前必須先獲得該物件鎖的擁有權...