Django非同步任務執行緒池

2022-05-12 20:37:22 字數 2851 閱讀 5644

當資料庫資料量很大時(百萬級),許多批量資料修改請求的響應會非常慢,一些不需要即時響應的任務可以放到後台的非同步執行緒中完成,發起非同步任務的請求就可以立即響應

選擇用執行緒池的原因是:執行緒比程序更為可控。不像子程序,子執行緒會在所屬程序結束時立即結束。執行緒可共享記憶體。

使用python manage.py runserver模式啟動的django應用只有乙個程序,對於每個請求,主線程會開啟乙個子執行緒來處理請求。請求子執行緒向主線程申請乙個新執行緒,然後把耗時的任務交給新執行緒,自身立即響應,這就是請求任務非同步處理的原理。

如果想要管理這批非同步執行緒,知道他們是否在執行中,可以使用執行緒池(threadpoolexecutor)。

執行緒池會先啟動若干數量的執行緒,並讓這些執行緒都處於睡眠狀態,當向執行緒池submit乙個任務後,會喚醒執行緒池中的某乙個睡眠執行緒,讓它來處理這個任務,當處理完這個任務,執行緒又處於睡眠狀態。

submit任務後會返回乙個期程(future),這個物件可以檢視執行緒池中執行此任務的執行緒是否仍在處理中

因此可以構建乙個全域性視覺化執行緒池:

from concurrent.futures.thread import threadpoolexecutor

class threadpool(object):

def __init__(self):

# 執行緒池

self.executor = threadpoolexecutor(20)

# 用於儲存每個專案批量任務的期程

self.future_dict = {}

# 檢查某個專案是否有正在執行的批量任務

def is_project_thread_running(self, project_id):

future = self.future_dict.get(project_id, none)

if future and future.running():

# 存在正在執行的批量任務

return true

return false

# 展示所有的非同步任務

def check_future(self):

data = {}

for project_id, future in self.future_dict.items():

data[project_id] = future.running()

return data

def __del__(self):

self.executor.shutdown()

# 主線程中的全域性執行緒池

# global_thread_pool的生命週期是django主線程執行的生命週期

global_thread_pool = threadpool()

使用:

# 檢查非同步任務

if global_thread_pool.is_project_thread_running(project_id):

raise exceptions.validationerror(detail='存在正在處理的批量任務,請稍後重試')

# 提交乙個非同步任務

future = global_thread_pool.executor.submit(self.batch_thread, project_id)

global_thread_pool.future_dict[project_id] = future

# 檢視所有非同步任務

@login_required

def check_future(request):

data = global_thread_pool.check_future()

return httpresponse(status=status.http_200_ok, content=json.dumps(data))

使用執行緒鎖

在全域性執行緒池中初始化執行緒鎖

class threadpool(object):

def __init__(self):

self.executor = threadpoolexecutor(20)

self.future_dict = {}

self.lock = threading.lock()

然後執行執行緒前需要獲取鎖並再執行結束後釋放鎖

def batch_thread(self):

global_thread_pool.lock.acquire()

try:

...global_thread_pool.lock.release()

except exception:

trace_log = traceback.format_exc()

logger.error('非同步任務執行失敗:\n %s' % trace_log)

global_thread_pool.lock.release()

需要捕捉異常預防子執行緒出錯而無法釋放鎖的情況

由於django的資料庫連線是儲存到執行緒本地變數中的,通過threadpoolexecutor建立的執行緒會儲存各自的資料庫連線。

當連線被儲存的時間超過mysql連線的最大超時時間,連線失效,但不會被執行緒釋放。

之後再調起執行緒執行涉及到資料庫操作的非同步任務時,會用到失效的資料庫連線,導致報錯「mysql server has gone away」。

def batch_thread(self):

for conn in connections.all():

conn.close_if_unusable_or_obsolete()

...

spring boot 非同步執行緒池

在專案中,有乙個非同步方法 async註解。當多使用者呼叫該非同步方法時,通過日誌跟蹤 發現最多只有兩個執行緒在非同步執行,其它的任務都在等待狀態。非同步配置檔案如下所示,懷疑是corepoolsize影響,故將其修改為5.後來網上發現如下解釋 重點講解 其中比較容易讓人誤解的是 corepools...

day4 19總結 非同步任務和執行緒池

1 asynctask 是乙個工具類,它封裝了 android 中訊息模型的的應用過程,用於簡化訊息傳遞及處理的方式,此類是乙個抽象類,它內部定義的方法有的定義主線程,有的執行在工作執行緒,我們在使用此類時,通常要根據需要重寫其中方法。2 當我們在執行非同步任務時,要構建非同步任務物件,然後呼叫物件...

執行緒池概念,任務

問題 執行緒是寶貴的記憶體資源,單個執行緒約佔1mb空間,過多分配易造成記憶體溢位 頻繁的建立及銷毀執行緒會增加虛擬機器 頻率 資源開銷,造成程式效能下降 執行緒池 執行緒容器,可設定執行緒分配的數量上限 將預先建立的執行緒物件存入池中,並重用執行緒池中的執行緒物件 避免頻繁的建立和銷毀 將任務提交...