自定義執行緒池

2022-03-19 10:29:03 字數 3201 閱讀 8078

版本一:

#!/usr/bin/env python

#-*- coding:utf-8 -*-

import

queue

import

threading

class

threadpool(object):

def__init__(self, max_num=20):

self.queue =queue.queue(max_num)

for i in

xrange(max_num):

self.queue.put(threading.thread)

defget_thread(self):

return

self.queue.get()

defadd_thread(self):

self.queue.put(threading.thread)

"""pool = threadpool(10)

def func(arg, p):

print arg

import time

time.sleep(2)

p.add_thread()

for i in xrange(30):

thread = pool.get_thread()

t = thread(target=func, args=(i, pool))

t.start()

"""

版本二:

#!/usr/bin/env python

#-*- coding:utf-8 -*-

"""custom threadpool

how to use:

pool = threadpool(1)

def callback(status, result):

# status, execute action status

# result, execute action return value

pass

def action(i):

pass

for i in range(20):

if pool.stop:

pool.terminal()

break

ret = pool.run(action, (i,), callback)

print 'end'

"""import

queue

import

threading

import

contextlib

stopevent =object()

class

threadpool(object):

def__init__

(self, max_num):

self.q =queue.queue(max_num)

self.max_num =max_num

self.cancel =false

self.generate_list =

self.free_list =

def run(self, func, args, callback=none):

"""執行緒池執行乙個任務

:param func: 任務函式

:param args: 任務函式所需引數

:param callback: 任務執行失敗或成功後執行的**函式,**函式有兩個引數1、任務函式執行狀態;2、任務函式返回值(預設為none,即:不執行**函式)

:return: 如果執行緒池已經終止,則返回true否則none

"""if

self.cancel:

return

true

if len(self.free_list) == 0 and len(self.generate_list) self.generate_thread()

w =(func, args, callback,)

self.q.put(w)

defgenerate_thread(self):

"""建立乙個執行緒

"""t = threading.thread(target=self.call)

t.start()

defcall(self):

"""迴圈去獲取任務函式並執行任務函式

"""current_thread =threading.currentthread

event =self.q.get()

while event !=stopevent:

func, arguments, callback =event

try:

result = func(*arguments)

success =true

except

exception, e:

success =false

result =none

if callback is

notnone:

try:

callback(success, result)

except

exception, e:

pass

with self.worker_state(self.free_list, current_thread):

event =self.q.get()

else

: self.generate_list.remove(current_thread)

defterminal(self):

"""終止執行緒池中的所有執行緒

"""self.cancel =true

full_size =len(self.generate_list)

while

full_size:

self.q.put(stopevent)

full_size -= [email protected]

defworker_state(self, state_list, worker_thread):

"""用於記錄執行緒中正在等待的執行緒數

"""

try:

yield

finally

: state_list.remove(worker_thread)

上下文管理:

自定義執行緒池

有些時候 jdk自帶的cachedthreadpool fixedthreadpool等執行緒池完成不了我們業務的需求時 可以用threadpoolexecutorg構造自定義的執行緒池。public class usethreadpoolexecutor1 這段 會首先執行任務1,然後把2 3 4...

自定義執行緒池

建立執行緒池方法 儘管executors提供了四種執行緒池建立的方式,但為了實現某些特定的需求,可以自己建立執行緒池。如在阿里的程式設計規範使用executors建立執行緒時,一般會報錯,並提示以下資訊 執行緒池不允許使用executors去建立,而是通過threadpoolexecutor的方式,...

自定義執行緒池

自定義執行緒池建立api 執行緒池建立通過juc的介面 executor 實現,平時我們使用其實現類 threadpoolexecutor 實現自定義執行緒池。常用建構函式 public threadpoolexecutor int corepoolsize,int maximumpoolsize,...