Python之路第十一天,高階 3 執行緒池

2022-08-02 06:09:11 字數 3514 閱讀 5276

簡單的執行緒池的實現:

import queue

import threading

import time

class threadpool(object):

def __init__(self, max_num=20):

self.queue = queue.queue(max_num)

for i in range(max_num):

self.queue.put(threading.thread)

def get_thread(self):

return self.queue.get()

def add_thread(self):

self.queue.put(threading.thread)

def func(arg, p):

print(arg)

time.sleep(2)

p.add_thread()

if __name__ == '__main__':

pool = threadpool(10)

for i in range(30):

thread = pool.get_thread()

t = thread(target=func, args=(i, pool))

t.start()

複雜版本的實現:

#!/usr/bin/env python3

import queue

import threading

import contextlib

import time

stopevent = object()

class threadpool(object):

def __init__(self, max_num, max_task_num = none):

if max_task_num:

self.q = queue.queue(max_task_num)

else:

self.q = queue.queue()

self.max_num = max_num

self.cancel = false

self.terminal = false

self.generate_list =

self.free_list =

def run(self, func, args, callback=none):

"""執行緒池執行乙個任務

:param func: 任務函式

:param args: 任務函式所需引數

:param callback: 任務執行失敗或成功後執行的**函式,**函式有兩個引數1、任務函式執行狀態;2、任務函式返回值(預設為none,即:不執行**函式)

:return: 如果執行緒池已經終止,則返回true否則none

"""if self.cancel:

return

if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:

self.generate_thread()

w = (func, args, callback,)

self.q.put(w)

def generate_thread(self):

"""建立乙個執行緒

"""t = threading.thread(target=self.call)

t.start()

def call(self):

"""迴圈去獲取任務函式並執行任務函式

"""current_thread = threading.currentthread

event = self.q.get()

while event != stopevent:

func, arguments, callback = event

try:

result = func(*arguments)

success = true

except exception as e:

success = false

result = none

if callback is not none:

try:

callback(success, result)

except exception as e:

pass

with self.worker_state(self.free_list, current_thread):

if self.terminal:

event = stopevent

else:

event = self.q.get()

else:

self.generate_list.remove(current_thread)

def close(self):

"""執行完所有的任務後,所有執行緒停止

"""self.cancel = true

full_size = len(self.generate_list)

while full_size:

self.q.put(stopevent)

full_size -= 1

def terminate(self):

"""無論是否還有任務,終止執行緒

"""self.terminal = true

while self.generate_list:

self.q.put(stopevent)

self.q.empty()

@contextlib.contextmanager

def worker_state(self, state_list, worker_thread):

"""用於記錄執行緒中正在等待的執行緒數

"""try:

yield

finally:

state_list.remove(worker_thread)

def callback(status, result):

# status, execute action status

# result, execute action return value

pass

def action(i):

print(i)

if __name__ == '__main__':

pool = threadpool(5)

for i in range(30):

pool.run(action, (i,), callback)

time.sleep(5)

print(len(pool.generate_list), len(pool.free_list))

print(len(pool.generate_list), len(pool.free_list))

pool.close()

pool.terminate()

python第十一天

函式總結 def func a,b print a,b return a b 四個組成部分 函式名 呼叫函式的依據 函式體 執行函式邏輯的 引數列表 為函式提供內部資源 返回值 將函式執行結果返回給外界 返回值 1 空返回 沒有return或空return 2 一鍵返回 3 多值返回 裝有多個值的元...

Python 集合 第十一天

集合 set 是乙個無序的不重複元素序列。可以使用大括號 或者 set 函式建立集合,注意 建立乙個空集合必須用set 而不是 因為 是用來建立乙個空字典。建立方式 parame 或者set value 給個例項 語法格式如下 s.add x 將元素 x 新增到集合 s 中,如果元素已存在,則不進行...

UnixC第十一天

回憶昨天內容 一 訊號阻塞 sigprocmask 2 sigset t 訊號阻塞和訊號忽略的區別 可靠訊號 不可靠 訊號丟失 二 獲取程序的未決訊號集 從未決訊號集中找未決訊號 sigpending 2 什麼是未決訊號?三 訊號從產生到處理的整個過程 四 system v ipc 訊息佇列 獲取乙...