tensorflow queue 多執行緒與對列

2021-08-19 01:29:56 字數 4741 閱讀 9941

我們必須要把資料先讀入後才能進行計算,假設讀入用時0.1s,計算用時0.9s,那麼就意味著每過1s,gpu都會有0.1s無事可做,這就大大降低了運算的效率。

解決這個問題方法就是將讀入資料和計算分別放在兩個執行緒中,將資料讀入記憶體的乙個佇列

讀取執行緒源源不斷地將檔案系統中的讀入到乙個記憶體的佇列中,而負責計算的是另乙個執行緒,計算需要資料時,直接從記憶體佇列中取就可以了。這樣就可以解決gpu因為io而空閒的問題!

在tensorflow中,為了方便管理,在記憶體佇列前又新增了一層所謂的「檔名佇列」。tensorflow使用檔名佇列+記憶體佇列雙佇列的形式讀入檔案,可以很好地管理epoch。

對於檔名佇列,我們使用tf.train.string_input_producer函式。這個函式需要傳入乙個檔名list,系統會自動將它轉為乙個檔名佇列。tf.train.string_input_producer還有兩個重要的引數,乙個是num_epochs,表示epoch數,即限制載入出事檔案列表的最大輪數,當所有檔案都已經被使用了被設定的輪數後,如果繼續嘗試讀取新的檔案輸入佇列會報錯(outofrange)。另外乙個就是shuffle是指在乙個epoch內檔案的順序是否被打亂。shuffle=true時,檔案在加入佇列前會被打亂順序。

epochs:

當乙個完整的資料集通過了神經網路一次並且返回了一次,這個過程稱為乙個 epoch。

在tensorflow中,記憶體佇列不需要我們自己建立,我們只需要使用reader物件從檔名佇列中讀取資料就可以了。

在我們使用tf.train.string_input_producer建立檔名佇列後,整個系統其實還是處於「停滯狀態」的,也就是說,我們檔名並沒有真正被加入到佇列中,此時如果我們開始計算,因為記憶體佇列中什麼也沒有,計算單元就會一直等待,導致整個系統被阻塞。使用tf.train.start_queue_runners之後,才會啟動填充佇列的執行緒,這時系統就不再「停滯」。此後計算單元就可以拿到資料並進行計算,整個程式也就跑起來了。

reader每次讀取一張並儲存。

import

tensorflow as tf 23

#新建乙個session

4with tf.session() as sess:5#

讀三幅a.jpg, b.jpg, c.jpg

7#string_input_producer會產生乙個檔名佇列

8 filename_queue = tf.train.string_input_producer(filename, shuffle=false, num_epochs=5)9#

reader從檔名佇列中讀資料。對應的方法是reader.read

10 reader =tf.wholefilereader()

11 key, value =reader.read(filename_queue)12#

tf.train.string_input_producer定義了乙個epoch變數,要對它進行初始化

13tf.local_variables_initializer().run()14#

使用start_queue_runners之後,才會開始填充佇列

15 threads = tf.train.start_queue_runners(sess=sess)

16 i =0

17while

true:

18 i += 119#

獲取資料並儲存

三個概念:

queue:佇列

tf.fifoqueue(capacity, dtypes, shapes=none, names=none ...) #建立函式的引數

import tensorflow as tf

q=tf.fifoqueue(2,"int32")#建立佇列,指定佇列中最多可以儲存兩個元素,並制定元素型別為整數型

init=q.enqueue_many(([0,10],))#使用enqueue_many函式初始化佇列

x=q.dequeue()#使用dequeue函式將佇列中的第乙個元素出佇列,這個元素的值將被儲存在變數x中

y=x+1#將得到的值加1

q_inc=q.enqueue([y])#將得到的值在重新入佇列

with tf.session() as sess:

init.run()#執行初始化佇列的操作

for _ in rangr(5):

v,_ =sess.run([x,q_inc])#執行q_inc將執行資料出佇列,出隊元素加1,重新入佇列的整個過程

print v

輸出

'''佇列開始有列表[1,10]兩個元素,第乙個出隊列為0,加1後入佇列得到[10,1];第二次出隊列為10,加1 後入隊為[1,11];以此類推。010

1112'''

以上為如何使用fifoqueue,另外還有randomshufflequeue,此函式會將佇列中的元素打亂,出佇列是為隨機的乙個。

queuerunner

tensorflow的計算主要在使用cpu/gpu和記憶體,而資料讀取涉及磁碟操作,速度遠低於前者操作。因此通常會使用多個執行緒讀取資料,然後

使用乙個執行緒消費資料,queuerunner就是來管理這些讀寫佇列的執行緒。

1 import tensorflow as tf   

2 import sys

3 q = tf.fifoqueue(10, "float")

4 counter = tf.variable(0.0) #計數器 tf.variable(0.0) ==0 一維張量

5 # 給計數器加一

6 increment_op = tf.assign_add(counter, 1.0)

7 # 將計數器加入佇列

8 enqueue_op = q.enqueue(counter)

910 # 建立queuerunner

11 # 用多個執行緒向佇列新增資料

12 # 這裡實際建立了4個執行緒,兩個增加計數,兩個執行入隊

13 qr = tf.train.queuerunner(q, enqueue_ops=[increment_op, enqueue_op] * 2)

1415 # 主線程

16 sess = tf.interactivesession()

17 tf.global_variables_initializer().run()

18 # 啟動入隊執行緒

19 qr.create_threads(sess, start=true)

20 for i in range(20):

21 print (sess.run(q.dequeue()))

增加計數的程序會不停的後台執行,執行入隊的程序會先執行10次(因為佇列長度只有10),然後主線程開始消費資料,當一部分資料消費被後,入隊的程序又會開始執行。最終主線程消費完20個資料後停止,但其他執行緒繼續執行,程式不會結束。

1import

tensorflow as tf

2import

sys

3q = tf.fifoqueue(10,

"float")

4counter = tf.variable(0.0)

#計數器5#

給計數器加一

6increment_op = tf.assign_add(counter, 1.0)7

#將計數器加入佇列

8enqueue_op =

q.enqueue(counter)910

#建立queuerunner11#

用多個執行緒向佇列新增資料12#

這裡實際建立了4個執行緒,兩個增加計數,兩個執行入隊

13qr = tf.train.queuerunner(q, enqueue_ops=[increment_op, enqueue_op] * 2)14

15#主線程16

sess =

tf.interactivesession()

17tf.global_variables_initializer().run()18#

啟動入隊執行緒

19qr.create_threads(sess, start=

true)

20for

i in

range(20

):21

print

(sess.run(q.dequeue()))

對多執行緒進行測試

在junit下,很難對多執行緒的 進行測試,因此需要借助其他的方式,比如executor service框架.最近要對乙個監控類的輸出情況寫乙個測試,簡單的跑一下輸出格式是否跟預期一致.我這裡借助了threadpoolexecutor completionservice.即在結束前,需要從執行緒池中...

對多執行緒的理解

對執行緒的理解 1.當多個執行緒訪問同乙個靜態變數時,會發生執行緒安全問題,其中乙個執行緒對這個靜態變數修改值後,其餘執行緒在使用這個靜態變數就會收到值更新的影響,導致執行緒中其他地方使用這個值受影響。舉個例子 靜態變數 staitic string str 1 執行緒1 str 2 print s...

多執行緒 Java多執行緒與併發

實現的方式主要有三種 執行緒的狀態 基本差別 最主要的本質區別 兩個概念 鎖池 假設執行緒a已經擁有了某個物件 不是類 的鎖,而其他執行緒b c想要呼叫這個物件的某個synchronized方法 或者塊 由於b c執行緒在進入物件的synchronized方法 或者塊 之前必須先獲得該物件鎖的擁有權...