用 Python 實現的執行緒池

2021-06-16 11:01:48 字數 3269 閱讀 6856

用 python 實現的執行緒池

用 python 實現的執行緒池

軟體技術

lhwork 發表於 2007-2-1 8:53:26

為了提高程式的效率,經常要用到多執行緒,尤其是io等需要等待外部響應的部分。執行緒的建立、銷毀和排程本身是有代價的,如果乙個執行緒的任務相對簡單,那這些時間和空間開銷就不容忽視了,此時用執行緒池就是更好的選擇,即建立一些執行緒然後反覆利用它們,而不是在完成單個任務後就結束。

下面是用python實現的通用的執行緒池**:

undefined

view plain

copy to clipboard

print?

import

queue, threading, sys

from

threading

import thread   

import

time,urllib

# working thread 

class worker(thread):   

worker_count = 0   

def__init__( self, workqueue, resultqueue, timeout = 0, **kwds):   

thread.__init__( self, **kwds )   

self.id = worker.worker_count   

worker.worker_count += 1   

self.setdaemon( true )   

self.workqueue = workqueue   

self.resultqueue = resultqueue   

self.timeout = timeout   

def run( self ):   

''' the get-some-work, do-some-work main loop of worker threads '''

while

true:   

try:   

callable, args, kwds = self.workqueue.get(timeout=self.timeout)   

res = callable(*args, **kwds)   

print "worker[%2d]: %s" % (self.id, str(res) )   

self.resultqueue.put( res )   

except

queue.empty:   

break

except :   

print 'worker[%2d]' % self.id, sys.exc_info()[:2]   

class workermanager:   

def__init__( self, num_of_workers=10, timeout = 1):   

self.workqueue = queue.queue()   

self.resultqueue = queue.queue()   

self.workers =    

self.timeout = timeout   

self._recruitthreads( num_of_workers )   

def _recruitthreads( self, num_of_workers ):   

for i in

range( num_of_workers ):   

worker = worker( self.workqueue, self.resultqueue, self.timeout )   

def start(self):   

for w in

self.workers:   

w.start()   

def wait_for_complete( self):   

# ...then, wait for each of them to terminate: 

while

len(self.workers):   

worker = self.workers.pop()   

worker.join( )   

if worker.isalive() and

notself.workqueue.empty():   

print "all jobs are are completed."   

def add_job( self, callable, *args, **kwds ):   

self.workqueue.put( (callable, args, kwds) )   

def get_result( self, *args, **kwds ):   

return

self.resultqueue.get( *args, **kwds )   

worker類是乙個工作執行緒,不斷地從workqueue佇列中獲取需要執行的任務,執行之,並將結果寫入到resultqueue中,這裡的workqueue和resultqueue都是現成安全的,其內部對各個執行緒的操作做了互斥。當從workqueue中獲取任務超時,則執行緒結束。

workermanager負責初始化worker執行緒,提供將任務加入佇列和獲取結果的介面,並能等待所有任務完成。

undefined

view plain

copy to clipboard

print?

def test_job(id, sleep = 0.001 ):   

try:   

urllib.urlopen('').read()   

except:   

print '[%4d]' % id, sys.exc_info()[:2]   

return

iddef test():   

import

socket

socket.setdefaulttimeout(10)   

print 'start testing'   

wm = workermanager(10)   

for i in

range(500):   

wm.add_job( test_job, i, i*0.001 )   

wm.start()   

wm.wait_for_complete()   

print 'end testing'   

用python實現的執行緒池

python3標準庫里自帶執行緒池threadpoolexecutor 和程序池processpoolexecutor 當然也可以自己寫乙個threadpool。coding utf 8 import queue import threading import sys import time imp...

用python建立執行緒池

物件導向開發中,大家知道建立和銷毀物件是很費時間的,因為建立乙個物件要獲取記憶體資源或者其它更多資源。無節制的建立和銷毀執行緒是一種極大的浪費。那我們可不可以把執行完任務的執行緒不銷毀而重複利用呢?彷彿就是把這些執行緒放進乙個池子,一方面我們可以控制同時工作的執行緒數量,一方面也避免了建立和銷毀產生...

Python的執行緒池實現

實現 coding utf 8 import queue import threading import sys import time import urllib 替我們工作的執行緒池中的執行緒 class mythread threading.thread def init self,workq...