執行緒池原理及py實現

2021-10-23 13:58:09 字數 4371 閱讀 8518

目前的大多數網路伺服器,包括web伺服器、email伺服器以及資料庫伺服器等都具有乙個共同點,就是單位時間內必須處理數目巨大的

連線請求,但處理時間卻相對較短。

傳統多執行緒方案中我們採用的伺服器模型則是一旦接受到請求之後,即建立乙個新的執行緒,由該執行緒執行任務。任務執行完畢後,線

程退出,這就是是「即時建立, 即時銷毀」的策略。儘管與建立程序相比,建立執行緒的時間已經大大的縮短,但是如果提交給執行緒的任務是

執行時間較短,而且執行次數極其頻繁,那麼伺服器將處於不停的建立執行緒,銷毀執行緒的狀態。

我們將傳統方案中的執行緒執行過程分為三個過程:t1、t2、t3:

t1:執行緒建立時間

t2:執行緒執行時間,包括執行緒的同步等時間

t3:執行緒銷毀時間

那麼我們可以看出,執行緒本身的開銷所佔的比例為(t1+t3) / (t1+t2+t3)。如果執行緒執行的時間很短的話,這比開銷可能佔到20%-50%左

右。如果任務執行時間很頻繁的話,這筆開銷將是不可忽略的。

除此之外,執行緒池能夠減少建立的執行緒個數。通常執行緒池所允許的併發執行緒是有上界的,如果同時需要併發的執行緒數超過上界,那麼

一部分執行緒將會等待。而傳統方案中,如果同時請求數目為2000,那麼最壞情況下,系統可能需要產生2000個執行緒。儘管這不是乙個很

大的數目,但是也有部分機器可能達不到這種要求。

因此執行緒池的出現正是著眼於減少執行緒池本身帶來的開銷。執行緒池採用預建立的技術,在應用程式啟動之後,將立即建立一定數量的

執行緒(n1),放入空閒佇列 中。這些執行緒都是處於阻塞(suspended)狀態,不消耗cpu,但占用較小的記憶體空間。當任務到來後,緩衝

池選擇乙個空閒執行緒,把任務傳入此執行緒中執行。當n1個執行緒都在處理任務後,緩衝池自動建立一定數量的新執行緒,用於處理更多的任

務。在任務執行完畢後執行緒也不退出,而是繼續保持在池中等待下一次的任務。當系統比較空閒時,大部分執行緒都一直處於暫停狀態,線

程池自動銷毀一部分執行緒,**系統資源。

基於這種預建立技術,執行緒池將執行緒建立和銷毀本身所帶來的開銷分攤到了各個具體的任務上,執行次數越多,每個任務所分擔到的

執行緒本身開銷則越小,不過我們另外可能需要考慮進去執行緒之間同步所帶來的開銷。

一般執行緒池都必須具備下面幾個組成部分:

執行緒池管理器:用於建立並管理執行緒池

工作執行緒: 執行緒池中實際執行的執行緒

任務介面: 儘管執行緒池大多數情況下是用來支援網路伺服器,但是我們將執行緒執行的任務抽象出來,形成任務介面,從而是的執行緒池

與具體的任務無關。

任務佇列:執行緒池的概念具體到實現則可能是佇列,鍊錶之類的資料結構,其中儲存執行執行緒。

我們把任務放進佇列中去,然後開n個執行緒,每個執行緒都去佇列中取乙個任務,執行完了之後告訴系統說我執行完了,然後接著去佇列中

取下乙個任務,直至佇列中所有任務取空,退出執行緒。

實現

work類是乙個python執行緒池,不斷地從workqueue佇列中獲取需要執行的任務,執行之,並將結果寫入到resultqueue中。這裡的

workqueue和resultqueue都是執行緒安全的,其內部對各個執行緒的操作做了互斥。當從workqueue中獲取任務超時,則執行緒結束。

workermanager負責初始化python執行緒池,提供將任務加入佇列和獲取結果的介面,並能等待所有任務完成。

建立乙個 queue.queue() 的例項,然後使用資料對它進行填充。

將經過填充資料的例項傳遞給執行緒類,後者是通過繼承 threading.thread 的方式建立的。

生成守護執行緒池。

每次從佇列中取出乙個專案,

並使用該執行緒中的資料和 run 方法以執行相應的工作。

在完成這項工作之後,使用 queue.task_done() 函式向任務已經完成的佇列傳送乙個訊號。

對佇列執行 join 操作,實際上意味著等到隊列為空,再退出主程式。

import queue

import threading

import time

class

workmanager

(object):

def__init__

(self, work_num=

1000

,thread_num=2)

: self.work_queue = queue.queue(

) self.threads =

self.__init_work_queue(work_num)

self.__init_thread_pool(thread_num)

""" 初始化執行緒

"""def__init_thread_pool

(self,thread_num)

:for i in

range

(thread_num):)

""" 初始化工作佇列

"""def__init_work_queue

(self, jobs_num)

:for i in

range

(jobs_num)

: self.add_job(do_job, i)

""" 新增一項工作入隊

"""defadd_job

(self, func,

*args)

: self.work_queue.put(

(func,

list

(args)))

#任務入隊,queue內部實現了同步機制

""" 等待所有執行緒執行完畢

"""defwait_allcomplete

(self)

:for item in self.threads:

if item.isalive(

):item.join(

)class

work

(threading.thread)

:def

__init__

(self, work_queue)

: threading.thread.__init__(self)

self.work_queue = work_queue

self.start(

)def

run(self)

:#死迴圈,從而讓建立的執行緒在一定條件下關閉退出

while

true

:try

: do, args = self.work_queue.get(block=

false

)#任務非同步出隊,queue內部實現了同步機制

do(args)

self.work_queue.task_done(

)#通知系統任務完成

except

:break

#具體要做的任務

defdo_job

(args)

: time.sleep(

0.1)

#模擬處理時間

print

(threading.current_thread(),

list

(args)

)if __name__ ==

'__main__'

: start = time.time(

) work_manager = workmanager(

10000,10

)#或者work_manager = workmanager(10000, 20)

work_manager.wait_allcomplete(

) end = time.time(

)print

("cost all time: %s"

%(end-start)

)

在使用這個模式時需要注意一點:通過將守護執行緒設定為 true,將允許主線程或者程式僅在守護執行緒處於活動狀態時才能夠退出。這種方

式建立了一種簡單的方式以控制程式流程,因為在退出之前,您可以對佇列執行 join 操作、或者等到隊列為空。佇列模組文件詳細說明了

實際的處理過程,請參見參考資料:

join()

保持阻塞狀態,直到處理了佇列中的所有專案為止。在將乙個專案新增到該佇列時,未完成的任務的總數就會增加。當使用者執行緒呼叫 task_done() 以表示檢索了該專案、並完成了所有的工作時,那麼未完成的任務的總數就會減少。當未完成的任務的總數減少到零時,join() 就會結束阻塞狀態。

**:原文

執行緒池原理及python實現

為什麼需要執行緒池 目前的大多數網路伺服器,包括web伺服器 email伺服器以及資料庫伺服器等都具有乙個共同點,就是單位時間內必須處理數目巨大的連線請求,但處理時間卻相對較短。傳統多執行緒方案中我們採用的伺服器模型則是一旦接受到請求之後,即建立乙個新的執行緒,由該執行緒執行任務。任務執行完畢後,執...

Linux執行緒池原理及C 實現

在多執行緒程式中如果頻繁的建立和結束乙個執行緒這樣會使系統的效能降低,這時我們可以建立乙個執行緒 池來完成這些任務執行完後讓其阻塞等待其他的任務這樣就可以提高系統的效能 乙個執行緒池要包括以下幾部分 1 執行緒池管理器 threadpool 用於建立並管理執行緒池,包括 建立執行緒池,銷毀執行緒池,...

執行緒池實現原理

上面這幅圖作者表達的不夠完整,作者想通過如下文本來表達內含本質。過程如下 如果請求執行緒小於執行緒池目標執行緒,則執行緒池會新建立執行緒來處理請求 如果請求執行緒數過多,超過了目標執行緒則將請求任務放入佇列中進行緩衝 如果佇列滿了 但未達到最大執行緒池數,這時會新建立執行緒 直到上限為止即maxpo...