Python中的非同步任務佇列 arq

2021-10-09 13:11:11 字數 3802 閱讀 6064

引言

最近在用 sanic 寫東西,所有涉及到io阻塞的**都需要用 aio 的模組,好在近年來 asyncio 生態圈發展的還算不錯,該有的都有 ~

近期業務中 登入/註冊 業務涉及的很複雜(涉及到邀請),需要解鎖、傳送簡訊等操作,想來這麼個模組整的很繁瑣,以後加個滑動驗證那還了得。

於是乎,想整乙個類似於celery 的模組,進行任務解耦,但是目前 celery 還目前不支援非同步(官方將在 celery5 支援非同步)。

所以目前查閱資料發現了乙個 python 實現的 arq 模組,已經應用在了生產環境,效果還算不錯 ~

官方是這麼介紹它的:

首先先安裝一下它:

$ pip install arq
那麼接下來,快速了解下它的使用吧 ~

先看下面編寫的這段**

import asyncio

from arq import create_pool

from arq.connections import redissettings

''''''

async

defsay_hello

(ctx, name)

->

none

:"""任務函式

parameters

----------

ctx: dict

工作者上下文

name: string

returns

-------

dict

"""print

(ctx)

print

(f"hello "

)async

defstartup

(ctx)

:print

("starting..."

)async

defshutdown

(ctx)

:print

("ending..."

)async

defmain()

:# 建立

redis =

await create_pool(redissettings(password=

"root123456"))

# 分配任務

await redis.enqueue_job(

'say_hello'

, name=

"liuzhichao"

)# workersettings定義了建立工作時要使用的設定,

# 它被arq cli使用

class

workersettings

:# 佇列使用 `redis` 配置, 可以配置相關引數

# 例如我的密碼是 `rooot123456`

redis_settings = redissettings(password=

"root123456"

)# 被監聽的函式

functions =

[say_hello]

# 開啟 `worker` 執行

on_startup = startup

# 終止 `worker` 後執行

on_shutdown = shutdown

if __name__ ==

'__main__'

: loop = asyncio.get_event_loop(

) loop.run_until_complete(main(

))

1、接下來看我們怎麼執行它

$ arq tasks.workersettings

maybe you can see 10:

56:25: starting worker for

1 functions: say_hello10:

56:25: redis_version=

4.0.1 mem_usage=

32.00m clients_connected=

6 db_keys=

19189

starting.

..

2、執行 tasks.py 檔案

$ python3 tasks.py

maybe you can see 11:

01:04:

0.29s → 5a5ac0edd5ad4b318b9848637b1ae800

:say_hello(name=

'liuzhichao'

)hello liuzhichao11:

01:04:

0.00s ← 5a5ac0edd5ad4b318b9848637b1ae800

:say_hello ●

3、那麼這個簡單任務就執行完成了,是不是特別簡單 ~

定時任務

#! /usr/bin/env python

# -*- coding: utf-8 -*-

''''''

from arq import cron

from arq.connections import redissettings

async

defrun_regularly

(ctx)

:# 表示在 10、11、12 分 50秒的時候列印

print

('run job at 26:05, 27:05 and 28:05'

)class

workersettings

: redis_settings = redissettings(password=

"root123456"

) cron_jobs =

[ cron(run_regularly, minute=

, second=50)

]

1、執行它

$ arq tasks.workersettings

if run out of the time,maybe you can see11:

10:25: starting worker for

1 functions: cron:run_regularly11:

10:25: redis_version=

4.0.1 mem_usage=

32.00m clients_connected=

6 db_keys=

1919011:

10:51:

0.51s → cron:run_regularly(

)run foo job at 26:05

,27:05

and28:05

11:10:

51:0.00s ← cron:run_regularly ● 11:

11:51:

0.51s → cron:run_regularly(

)run foo job at 26:05

,27:05

and28:05

11:11:

51:0.00s ← cron:run_regularly ● 11:

12:50:

0.50s → cron:run_regularly(

)run foo job at 26:05

,27:05

and28:05

11:12:

50:0.00s ← cron:run_regularly ●

按照此時間線,然後會一直進行無限迴圈下去

python非同步任務佇列示例

很多場景為了不阻塞,都需要非同步 機制。這是乙個簡單的例子,大家參考使用吧 複製 如下 usr bin env python coding utf 8 import logging import queue import threading def func a a,b return a b def...

celery 非同步任務佇列

celery是基於python開發的分布式任務佇列。它可以讓任務的執行完全脫離主程式,甚至可以被分配到其他主機上執行。我們通常使用它來實現非同步任務 async task 和定時任務 crontab 它的架構組成如下圖 celery 4.x以上版本不安裝該模組,新增任務時會報錯 使用celery包含...

非同步任務佇列Celery在Django中的使用

前段時間在django web平台開發中,碰到一些請求執行的任務時間較長 幾分鐘 為了加快使用者的響應時間,因此決定採用非同步任務的方式在後台執行這些任務。在同事的指引下接觸了celery這個非同步任務佇列框架,鑑於網上關於celery和django結合的文件較少,大部分也只是粗粗介紹了大概的流程,...