TensorFlow 多執行緒輸入資料處理框架

2021-10-19 19:29:39 字數 4936 閱讀 3005

在tensorflow中,佇列和變數類似,都是計算圖上有狀態的節點。其它的計算節點可以修改它們的狀態。對於變數,可以通過賦值操作修改變數的取值。對於佇列,修改佇列狀態的操作主要有enqueue、enqueuemany和dequeue:

import tensorflow as tf

# 建立乙個先進先出佇列,指定佇列中最多可以儲存兩個元素,並指定型別為整數

q = tf.fifoqueue(2, "int32")

# 使用enqueue_many函式來初始化佇列中的元素。和初始化變數類似,在使用佇列之前需要明確的呼叫這個初始化過程

init = q.enqueue_many([0, 10],)

# 使用dequeue函式將佇列中的第乙個元素出佇列,這個元素的值將被存在變數x中

x = q.dequeue()

y = x+1

q_inc = q.enqueue([y])

with tf.session() as sess:

init.run()

for _ in range(5):

v, _ = sess.run([x, q_inc])

print v

# 0,10,1,11,2

tensorflow中提供了fifoqueue(先進先出)和randomshufflequeue(將佇列中的元素打亂,每次出佇列操作得到的是從當前佇列所有元素中隨機選擇的乙個)兩種佇列。

在tensorflow中,佇列不僅僅是一種資料結構,還是非同步計算張量取值的乙個重要機制。比如多個執行緒可以同時向乙個佇列中寫元素,或者同時讀取乙個佇列中的元素。

tensorflow提供了tf.coordinator和tf.queuerunner兩個類來完成多執行緒協同的功能。tf.coordinator主要用於協同多個執行緒一起停止,並提供了should_stop、request_stop和join三個函式。在啟動執行緒之前,需要先宣告乙個tf.coordinator類,並將這個類傳入每乙個建立的執行緒中。啟動的執行緒需要一直查詢tf.coordinator類中提供的should_stop函式,當這個函式的返回值為true時,則當前執行緒也需要退出。每乙個啟動的執行緒都可以通過呼叫request_stop函式來通知其他執行緒退出。當某乙個執行緒呼叫request_stop函式之後,should_stop函式的返回值將被設定為true,這樣其他的執行緒就可以同時終止了,以下程式展示了如何使用tf.coordinator

import tensorflow as tf

import numpy as np

import threading

import time

# 執行緒中執行的程式,這個程式每隔1秒判斷是否需要停止並列印自己的id

def myloop(coord, worker_id):

# 使用tf.coordinator類提供的協同工具判斷當前執行緒是否需要停止

while not coord.should_stop():

# 隨機停止所有的執行緒

if np.random.rand() < 0.1:

coord.request_stop()

else:

print worker_id

time.sleep(1) #暫停1秒

# 宣告乙個tf.train.coordinator類來協同多個執行緒

coord = tf.train.coordinator()

# 宣告建立5個執行緒

threads = [threading.thread(target=myloop, args=(coord, i, )) for i in xrange(5)]

# 啟動所有的執行緒

for t in threads:

t.start()

# 等待所有執行緒退出

coord.join(threads)

當所有執行緒啟動之後,每個執行緒會列印各自的id,然後暫停1秒之後,所有執行緒又開始第二遍列印id。。。

tf.queuerunner主要用於啟動多個執行緒來操作同乙個佇列,啟動的這些執行緒可以通過tf.coordinator類來統一管理。以下**展示了如何使用tf.queuerunner和tf.coordinator來管理多執行緒佇列操作

import tensorflow as tf

queue = tf.fifoqueue(100, "float")

enqueue_op = queue.enqueue([tf.random_noamal([1])])

# 使用tf.train.queuerunner來建立多個執行緒執行佇列的入隊操作

# tf.train.queuerunner的第乙個引數給出了被操作的佇列,[enqueue_op]*5表示需要啟動5個執行緒,每個執行緒中執行的是enqueue_op操作

qr = tf.train.queuerunner(queue, [enqueue_op]*5)

# 將定義過的queuerunner加入tensorflow計算圖上指定的集合,tf.train.add_queue_runner函式沒有指定集合,則加入預設集合tf.graphkeys.queue_runners

tf.train.add_queue_runner(qr)

# 定義出佇列操作

out_tensor = queue.dequeue()

with tf.session() as sess:

# 使用tf.train.coordinator來協同啟動的執行緒

coord = tf.train.coordinator

# 使用tf.train.queuerunner時,需要明確呼叫tf.train.start_queue_runners來啟動所有執行緒。否則因為沒有執行緒執行入隊操作,當呼叫出隊操作時,程式會一直預設等待入隊操作被執行。tf.train.start_queue_runners函式會預設啟動tf.graphkeys.queue_runners集合中所有的queuerunner,因為這個函式只支援啟動指定集合中的queuerunner,所以一般來說tf.train.add_queue_runner函式和tf.train.start_quque_runners函式會指定同乙個集合。

threads = tf.train.start_queue_runners(sess=sess, coord=coord)

# 獲取佇列中的取值

for _ in range(3):

print sess.run(out_tensor)[0]

# 使用tf.train.coordinator來停止所有的執行緒

coord.request_stop()

coord.join(threads)

tensorflow提供了tf.train.batch和tf.train.shuffle_batch函式來將單個的樣例組織成batch的樣式輸出。這兩個函式都會生成乙個佇列,佇列的入隊操作是生成單個樣例的方法,而每次出對得到的是乙個batch的樣例。

import tensorflow as tf

example, label = features['i'], features['j']

batch_size = 3

# 組合樣例的佇列中最多可以儲存的樣例個數。佇列如果太大,那麼需要占用很多記憶體資源;如果太小,那麼出隊操作可能會因為沒有資料而被阻礙,從而導致訓練效率降低。一般來說這個佇列的大小會和每乙個batch的大小相關

capacity = 1000 + 3 * batch_size

# 使用tf.train.batch函式來組合樣例。

example_batch, label_batch = tf.train.batch([example,label], batch_size=batch_size, capacity=capacity)

# 當佇列長度等於容量時,tensorflow將暫停入隊操作,而只等待元素出隊。當元素數小於容量時,tensorflow將自動重新啟動入隊操作。

# 使用tf.train.shuffle_batch函式來組合樣例。

# tf.train.shuffle_batch函式的引數大部分都和tf.train.batch函式相似,但是min_after_dequeue引數是tf.train.shuffle_batch特有的,min_atfer_dequeue引數限制了出隊時佇列中元素的最小個數。當佇列中元素太少時。隨機打亂樣例的作用就不大了。當出隊函式被呼叫但是佇列中元素不夠時,出隊操作將等待更多的元素入隊才會完成。

example_batch, label_batch = tf.train.shuffle_batch([example,label], batch_size=batch_size, capacity=capacity, min_after_dequeue=30)

with tf.session() as sess:

tf.initialize_all_variables().run()

coord = tf.train.coordinator()

threads = tf.train.start_queue_runners(sess=sess, coord=coord)

for i in range(2):

cur_example_batch, cur_label_batch = sess.run([example_batch,label_batch])

coord.request_stop()

coord.join(threads)

tf.train.batch和tf.train.shuffle_batch函式除了可以將單個訓練資料整理成輸入batch,也提供了並行化處理輸入資料的方法。tf.train.batch函式和tf.train.shuffle_batch函式並行化的方式一致,通過設定函式中的num_threads引數,可以指定多個執行緒同時執行入隊操作,入隊操作就是讀取資料以及預處理的過程。當需要多個執行緒處理不同檔案中的樣例時,可以使用tf.train.shuffle_batch_join函式,此函式會從輸入檔案佇列中獲取不同的檔案分配給不同的執行緒。

Tensorflow使用多執行緒

tensorflow的session物件支援多執行緒,可以在同乙個session中建立多個執行緒,預設是cpu有多少個核,就啟動多少個執行緒。tensorflow提供了倆個類來實現對session中多執行緒的管理 tf.coordinator和tf.queuerunner,這倆個類必須一起使用。co...

tensorflow 佇列與多執行緒

1 tensorflow資料輸入簡介 為了避免影象預處理成為神經網路模型訓練效率的瓶頸,tensorflow提供了多執行緒處理輸入資料的框架。流程如下 1 指定原始資料的檔案列表 2 建立檔案列表佇列 3 從檔案中讀取資料 4 資料預處理 5 整理成batch作為神經網路輸入 tensorflow中...

TensorFlow佇列與多執行緒

1 tf.coordinatorimport numpy as np import threading import time import tensorflow as tf 執行緒中執行的程式,這個程式每隔1秒判斷是否需要停止並列印自己的id def myloop coord,worker id ...