實戰非同步IO框架 asyncio 下篇

2021-10-25 06:15:56 字數 4218 閱讀 1470

前面兩節,我們講了協程中的單任務和多工

這節我們將通過乙個小實戰,來對這些內容進行鞏固。

在實戰中,將會用到以下知識點:

在實戰之前,我們要先了解下在asyncio中如何將協程態新增到事件迴圈中的。這是前提。

如何實現呢,有兩種方法:

import time

import asyncio

from queue import queue

from threading import thread

def start_loop(loop):

# 乙個在後台永遠執行的事件迴圈

asyncio.set_event_loop(loop)

loop.run_forever()

def do_sleep(x, queue, msg=""):

time.sleep(x)

queue.put(msg)

queue = queue()

new_loop = asyncio.new_event_loop()

# 定義乙個執行緒,並傳入乙個事件迴圈物件

t = thread(target=start_loop, args=(new_loop,))

t.start()

print(time.ctime())

# 動態新增兩個協程

# 這種方法,在主線程是同步的

new_loop.call_soon_threadsafe(do_sleep, 6, queue, "第乙個")

new_loop.call_soon_threadsafe(do_sleep, 3, queue, "第二個")

while true:

msg = queue.get()

print("{} 協程執行完..".format(msg))

print(time.ctime())

由於是同步的,所以總共耗時6+3=9秒.

輸出結果

thu may 31 22:11:16 2018

第乙個 協程執行完..

thu may 31 22:11:22 2018

第二個 協程執行完..

thu may 31 22:11:25 2018

import time

import asyncio

from queue import queue

from threading import thread

def start_loop(loop):

# 乙個在後台永遠執行的事件迴圈

asyncio.set_event_loop(loop)

loop.run_forever()

async def do_sleep(x, queue, msg=""):

await asyncio.sleep(x)

queue.put(msg)

queue = queue()

new_loop = asyncio.new_event_loop()

# 定義乙個執行緒,並傳入乙個事件迴圈物件

t = thread(target=start_loop, args=(new_loop,))

t.start()

print(time.ctime())

# 動態新增兩個協程

# 這種方法,在主線程是非同步的

asyncio.run_coroutine_threadsafe(do_sleep(6, queue, "第乙個"), new_loop)

asyncio.run_coroutine_threadsafe(do_sleep(3, queue, "第二個"), new_loop)

while true:

msg = queue.get()

print("{} 協程執行完..".format(msg))

print(time.ctime())

輸出結果

由於是同步的,所以總共耗時max(6, 3)=6

thu may 31 22:23:35 2018

第二個 協程執行完..

thu may 31 22:23:38 2018

第乙個 協程執行完..

thu may 31 22:23:41 2018

對於併發任務,通常是用生成消費模型,對佇列的處理可以使用類似master-worker的方式,master主要使用者獲取佇列的msg,worker使用者處理訊息。

為了簡單起見,並且協程更適合單執行緒的方式,我們的主線程用來監聽佇列,子執行緒用於處理佇列。這裡使用redis的佇列。主線程中有乙個是無限迴圈,使用者消費佇列。

然後,在當前路徑執行cmd,執行redis的服務端。

一切準備就緒之後,我們就可以執行我們的**了。

import time

import redis

import asyncio

from queue import queue

from threading import thread

def start_loop(loop):

# 乙個在後台永遠執行的事件迴圈

asyncio.set_event_loop(loop)

loop.run_forever()

async def do_sleep(x, queue):

await asyncio.sleep(x)

queue.put("ok")

def get_redis():

connection_pool = redis.connectionpool(host='127.0.0.1', db=0)

return redis.redis(connection_pool=connection_pool)

def consumer():

while true:

task = rcon.rpop("queue")

if not task:

time.sleep(1)

continue

asyncio.run_coroutine_threadsafe(do_sleep(int(task), queue), new_loop)

if __name__ == '__main__':

print(time.ctime())

new_loop = asyncio.new_event_loop()

# 定義乙個執行緒,執行乙個事件迴圈物件,用於實時接收新任務

loop_thread = thread(target=start_loop, args=(new_loop,))

loop_thread.setdaemon(true)

loop_thread.start()

# 建立redis連線

rcon = get_redis()

queue = queue()

# 子執行緒:用於消費佇列訊息,並實時往事件物件容器中新增新任務

consumer_thread = thread(target=consumer)

consumer_thread.setdaemon(true)

consumer_thread.start()

while true:

msg = queue.get()

print("協程執行完..")

稍微講下**

loop_thread:單獨的執行緒,執行著乙個事件物件容器,用於實時接收新任務。consumer_thread:單獨的執行緒,實時接收來自redis的訊息佇列,並實時往事件物件容器中新增新任務。

輸出結果

thu may 31 23:42:48 2018

協程執行完..

協程執行完..

協程執行完..

我們在redis,分別發起了5s,3s,1s的任務。 從結果來看,這三個任務,確實是併發執行的,1s的任務最先結束,三個任務完成總耗時5s

執行後,程式是一直執行在後台的,我們每一次在redis中輸入新值,都會觸發新任務的執行。。

python非同步框架asyncio的使用

python對非同步程式設計有原生的支援,即asyncio標準庫,使用非同步io模型可以節約大量的io等待時間,非常適合於爬蟲任務。import time import asyncio import aiohttp 用非同步方式獲取網頁內容 loop asyncio.get event loop 獲...

Python3 asyncio非同步框架 基礎知識

有些問題 async defrun await asyncio.ensure future p.packing 非阻塞 await asyncio.gather tasks 阻塞先解釋幾個名詞 同步與非同步 同步需要等待io返回的結果,非同步不需要io返回的結果 阻塞與非阻塞 阻塞 程式要等待,非阻...

asyncio 非同步任務

前言 python由於gil 全域性鎖 的存在,不能發揮多核的優勢,其效能一直飽受詬病。然而在io密集型的網路程式設計裡,非同步處理比同步處理能提公升成百上千倍的效率,彌補了python效能方面的短板,如最新的微服務框架japronto,resquests per second可達百萬級。pytho...