python基於concurrent模組實現多執行緒

2022-09-25 22:24:12 字數 4172 閱讀 1301

之前也寫過多執行緒的部落格,用的是 threading ,今天來講下 python 的另外乙個自帶庫 concurrent 。concurrent 是在 python3.2 中引入的,只用幾行**就可以編寫出執行緒池/程序池,並且計算型任務效率和 mutiprocessing.pool 提供的 poll 和 threadpoll 相比不分伯仲,而且在 io 型任務由於引入了 future 的概念效率要高數倍。而 threading 的話還要自己維護相關的佇列防止死鎖,**的可讀性也會下降,相反 concurrent 提供的執行緒池卻非常的便捷,不用自己操心死鎖以及編寫執行緒池**,由於非同步的概念 io 型任務也更有優勢。

concurrent 的確很好用,主要提供了 threadpoolexecutor 和 processpoolexecutor 。乙個多執行緒,乙個多程序。但 concurrent 本質上都是對 threading 和 mutiprocessing 的封裝。看它的原始碼可以知道,所以最底層並沒有非同步。

threadpoolexecutor xkywx自己提供了任務佇列,不需要自己寫了。而所謂的執行緒池,它只是簡單的比較當前的 threads 數量和定義的 max_workers 的大小,小於 max_workers 就允許任務建立執行緒執行任務。

通過 threadpoolexecutor 類建立執行緒池物件,max_workers 設定最大執行執行緒數數。使用 threadpoolexecutor 的好處是不用擔心執行緒死鎖問題,讓多執行緒程式設計更簡潔。

from concurrent import futures

pool = futures.threadpoolexecutor(max_workers = 2)

submit(self, fn, *args, **kwargs):

該方法的作用就是提交乙個可執行的**task,它返回乙個future物件。可以看出此方法不會阻塞主線程的執行。

import requests,datetime,time

from concurrent import futures

def get_request(url):

r = requests.get(url)

print('{}:{} {}'.format(datetime.datetime.now(),url,r.status_code))

urls = ['','','']

pool = futures.threadpoolexecutor(max_workers = 2)

for url in urls:

task = pool.submit(get_request,url)

print('{}主線程'.format(datetime.datetime.now()))

time.sleep(2)

# 輸出結果

2021-03-12 15:29:10.780141:主線程

2021-03-12 15:29:10.865425: 200

2021-03-12 15:29:10.923062: 200

2021-03-12 15:29:10.940930: 200

map(self, fn, *iterables, timeout=none, chunksize=1):

map 第二個引數是可迭代物件,比如 list、tuple 等,寫法相對簡單。map 方法也不會阻塞主線程的執行。

import requests,datetime,time

from c import futures

def get_request(url):

r = requests.get(url)

print('{}:{} {}'.format(datetime.datetime.now(),url,r.status_code))

urls = ['','','']

pool = futures.threadpoolexecutor(max_workers = 2)

tasks = pool.map(get_request,urls)

print('{}:主線程程式設計客棧'.format(datetime.datetime.now()))

time.sleep(2)

# 輸出結果

2021-03-12 16:14:04.854452:主線程

2021-03-12 16:14:04.938870: 200

2021-03-12 16:14:05.033849: 200

2021-03-12 16:14:05.048952: 200

如果要等待子執行緒執行完之後再執行主線程要怎麼辦呢,可以通過 wait 。

wait(fs, timeout=none, return_when=all_completed):

import requests,datetime,time

from concurrent import futures

def get_request(url):

r = requests.get(url)

print('{}:{} {}'.format(datetime.datetime.now(),url,r.status_code))

urls = ['','','']

pool = futures.threadpoolexecutor(max_workers = 2)

tasks =

for url in urls:

task = pool.submit(get_request,url)

tasks.append(task)

futures.wait(tasks)

print('{}:主線程'.format(datetime.datetime.now()))

time.sleep(2)

# 輸出結果

2021-03-12 16:30:13.437042: 200

2021-03-12 16:30:13.552700: 200

2021-03-12 16:30:14.117325: 200

2021-03-12 16:30:14.118284:主線程

as_completed(fs, timeout=none)

使用 concurrent.futures 操作 多執行緒/多程序 過程中,很多函式報錯並不會直接終止程式,而是什麼都沒發生。使用 as_completed 可以捕獲異常,**如下

# 建立執行緒池

pool = futures.threadpoolexecutor(max_workers = 2)

tasks =

for url in urls:

task = pool.submit(get_request,url)

tasks.append(task)

# 異常捕獲

errors = futures.as_completed(tasks)

for error in errors:

# error.result() 等待子執行緒都完成,並丟擲異常,中斷主線程

# 捕獲子執行緒異常,不會終止主線程繼續執行

print(error.exception())

futures.wait(tasks)

print('{}:主線程'.format(datetime.datetime.now()))

time.sleep(2)

# 輸出結果

2021-03-12 17:24:26.994937:主線程

多程序程式設計也類似,將 threadpoolexecutor 替換成 processpoolexecutor 。

基於python的爬蟲

本次初學,參考的資料見 功能主要是抓取韓寒的部落格內容,以及儲存 到 hanhan的資料夾中,執行環境實在linux下的。見 具體 如何 usr bin env python coding utf 8 import urllib import time url 60 con urllib.urlop...

基於Python操作ElasticSearch

python 2.7 es依賴包 pyelasticsearch elasticsearch 5.5.1 6.0.1 作業系統 windows 10 centos 7 本文主要就es基本的crud操作做以歸納整理,es官方對python的依賴支援有很多,eg pyelasticsearch escl...

基於Python操作ElasticSearch

python 2.7 es依賴包 pyelasticsearch elasticsearch 5.5.1 6.0.1 作業系統 windows 10 centos 7 本文主要就es基本的crud操作做以歸納整理,es官方對python的依賴支援有很多,eg pyelasticsearch escl...