tensorflow 佇列與多執行緒

2021-08-08 05:48:26 字數 3917 閱讀 7112

1、tensorflow資料輸入簡介

為了避免影象預處理成為神經網路模型訓練效率的瓶頸,tensorflow提供了多執行緒處理輸入資料的框架。流程如下:

(1)指定原始資料的檔案列表

(2)建立檔案列表佇列

(3)從檔案中讀取資料

(4)資料預處理

(5)整理成batch作為神經網路輸入

tensorflow中的佇列不僅是一種資料結構,還提供了多執行緒的機制。佇列也是多執行緒輸入資料處理框架的基礎。

2、佇列與多執行緒

在tensorflow中,佇列和變數類似,都是計算圖上有狀態的節點。其他的計算節點可以修改他們的狀態。對於變數,可以通過賦值操作修改變數的取值。對於佇列,修改佇列狀態的操作主要有enqueue、enqueuemany和dequeue。以下程式展示了如何使用這些函式來操作乙個佇列。例項如下:

#建立乙個先進先出佇列,指定佇列中最多可以儲存兩個元素,並指定型別為整數

q = tf.fifoqueue(2, "int32")

print(q)

#使用enqueue_many函式來初始化佇列中的元素。和變數初始化類似,在使用佇列之前需要明確的呼叫這個初始化過程

#init = q.enqueue_many(([0,10],))

q1 = q.enqueue(1)

q2 = q.enqueue(2)

#使用dequeue函式將佇列中的第乙個元素出佇列。這個元素的值將被存在變數x中。

#x = q.dequeue()

#將得到的值加1

#y = x+1

#q_inc = q.enqueue([y])

with tf.session() as sess:

#執行初始化佇列的操作

#init.run()

q1.run()

q2.run()

for _ in range(2):

#print(sess.run(x))

print(sess.run(q.dequeue()))

print(sess.run(q.size()))

#q.si

#for _ in range(5):

#執行q_inc將執行資料出佇列、出隊的元素+1、重新加入佇列的整個過程。

#v, _ = sess.run([x, q_inc])

#print(v)

tensorflow提供了fifoqueue和randomshufflequeue兩種佇列。fifoqueue實現的是乙個先進先出佇列,randomshufflequeue提供元素打亂的佇列,每次出佇列操作時隨機選擇乙個。佇列提供非同步計算張量取值的乙個機制,如多個執行緒可以同時向乙個佇列中寫元素,或者同時讀取乙個佇列中的元素。

3、多執行緒協同輔助函式

tensorflow提供了tf.coordinator和tf.queuerunner兩個類來完成多執行緒協同的功能。tf.coordinator主要用於協同多個執行緒一起停止,並提供了should_stop、request_stop和join三個函式。在啟動執行緒之前,需要先宣告乙個tf.coordinator類,並將這個類傳入每乙個建立的執行緒中。啟動的執行緒需要一直查詢tf.coordinator類中提供的should_stop函式,當這個函式的返回值為true時,則當前執行緒也需要退出。每乙個啟動的執行緒都可以通過呼叫的request_stop函式來通知其他執行緒退出。當某乙個執行緒呼叫request_stop函式之後,should_stop函式的返回值將被設定為true,這樣其他的執行緒就可以同時終止了。例項如下:

import numpy as np

import threading

import time

#執行緒中執行的程式,這個程式每隔1秒判斷是否需要停止並列印自己的id

def myloop(coord, worker_id):

#使用tf.coordinator類提供的協同工具判斷當前執行緒是否需要停止

while not coord.should_stop():

#隨機停止所有的執行緒

if np.random.rand()<0.1:

print("stoping from id: %d" % worker_id)

coord.request_stop()

else:

#列印當前執行緒的id

print("working on id: %d\n" % worker_id)

time.sleep(1)

#宣告乙個tf.train.coordinator類來協同多個執行緒

coord = tf.train.coordinator()

#宣告建立5個執行緒

threads = [threading.thread(target=myloop, args=(coord, i)) for i in range(1,5)]

#啟動所有的執行緒

for t in threads:

t.start()

#等待所有執行緒退出

coord.join(threads)

tf.queuerunner主要用於啟動多個執行緒來操作同乙個佇列,啟動的這些執行緒可以通過上面介紹的tf.coordinator類來同意管理。通過tf.queuerunner和tf.coordinator來管理多執行緒佇列操作例項如下:

#宣告乙個先進先出佇列,佇列中最多100個元素,型別為實數

queue = tf.fifoqueue(100, "float")

#定義佇列的入隊操作

enqueue_op = queue.enqueue([tf.random_normal([1])])

#使用tf.train.queuerunner來建立多個執行緒執行佇列的入隊操作

#tf.train.queuerunner的第乙個引數給出了被操作的佇列,[enqueue_op]*5

#表示了需要啟動5個執行緒,每個執行緒中執行的是enqueue_op操作

qr = tf.train.queuerunner(queue, [enqueue_op]*5)

#將定義過的queuerunner加入tensorflow計算圖上指定的集合

#tf.train.add_queue_runner函式沒有指定集合,則加入預設集合tf.graphkeys.queue_runners.下面的函式

#就是將剛剛定義的qr加入預設的tf.graphkeys.queue_runners集合

tf.train.add_queue_runner(qr)

#定義出隊操作

out_tensor = queue.dequeue()

with tf.session() as sess:

#使用tf.train.coordinator來協同啟動的執行緒

coord = tf.train.coordinator()

#使用tf.train.queuerunner時,需要明確呼叫tf.train.start_queue_runners來啟動所有執行緒。

#否則因為沒有執行緒執行入隊操作,當呼叫出隊操作時,程式會一直等待入隊操作被執行。tf.train.start_queue_runners

#函式會預設啟動tf.graphkeys.queue_runners集合中所有的queuerunner。因為這個函式只支援啟動指定集合中

#的queuerunner,所以一般來說tf.train.add_queue_runner函式和tf.train.start_queue_runners函式會指定

#同乙個集合

threads = tf.train.start_queue_runners(sess=sess, coord=coord)

#獲取佇列中的取值

for _ in range(20):

print(sess.run(out_tensor))

#使用tf.train.coordinator來停止所有的執行緒

coord.request_stop()

coord.join(threads)

TensorFlow佇列與多執行緒

1 tf.coordinatorimport numpy as np import threading import time import tensorflow as tf 執行緒中執行的程式,這個程式每隔1秒判斷是否需要停止並列印自己的id def myloop coord,worker id ...

Tensorflow 順序佇列與IO操作

cpu負責tensorflow的計算,io負責讀取檔案 由於速度上的差異,通常做法是 主線程進行模型訓練,子執行緒讀取資料,二者通過佇列進行資料傳輸 相當於主線程從佇列讀資料,子程序往佇列放資料 在使用tensorflow進行非同步計算時,佇列是一種強大的機制。乙個簡單的例子。先建立乙個 先入先出 ...

ThreadPoolExecutor 多執行緒

from concurrent.futures import threadpoolexecutor,wait,all completed from queue import queue myqueue queue 佇列,用於儲存函式執行結果。多執行緒的問題之一 如何儲存函式執行的結果。def thr...