分布式任務佇列 Celery的學習筆記

2022-04-07 17:21:34 字數 3098 閱讀 6574

celery是乙個簡單,靈活,可靠的分布式系統,用於處理大量訊息,同時為操作提供維護此類系統所需的工具。它是乙個任務佇列,專注於實時處理,同時還支援任務排程。

所謂任務佇列,是乙個邏輯上的概念,可以將抽象中的任務傳送到指定的執行任務的元件,任務佇列可以跨執行緒或機器執行。

celery是基於python開發的分布式非同步訊息任務佇列,通過它可以輕鬆的實現任務的非同步處理, 如果你的業務場景中需要用到非同步任務,就可以考慮使用celery。

1.高併發的請求任務,比如需要傳送大量請求的網路爬蟲,就可以使用celery來加速爬取。

2.非同步任務,將耗時的操作交給celery來完成,比如傳送/接收郵件、訊息推送等等。

3.定時任務,需要定時執行的程式,比如每天定時執行爬蟲爬取資料。

下圖是我找到的一張表示celery架構的圖:

任務生產者:產生任務並且把任務提交到任務佇列的就是任務生產者。

任務排程beat:celery會根據配置檔案對任務進行調配,可以按一定時間間隔周期性地執行某些任務。

中間人broker:celery使用訊息進行通訊,需要中間人在客戶端和worker之間進行傳遞,接收客戶端傳送過來的任務,並將任務分配給worker。

在celery的文件中,可以找到官方給出的實現broker的工具有:

名稱狀態

監控遠端控制

rabbitmq穩定是

是redis穩定是

是amazon sqs穩定否

否zookeeper

實驗性否

消費者worker:worker是執行任務的單元,在celery任務佇列中屬於消費者。worker會不斷地監聽佇列,一旦有任務新增進來,就會將任務取出來進行執行。worker還可以執行在多台機器上,只要它們都指向同乙個broker就可以。

結果儲存backend:結果儲存backend,顧名思義就是將worker執行後得到的結果儲存起來。celery中有幾個內建的結果儲存可供選擇,包括sqlalchemy / django orm、redis、rabbitmq、mamcached等。

celery4.0版本是支援python2.7的最後乙個版本,所以如果你還在用py2的話,可能要選擇安裝celery3或者更早的版本。我本人用的python版本是python3.7,然後安裝的celery版本是4.3。安裝的話使用pip安裝就好:

pip install celery

1.應用

1

from celery import

celery23

test

", broker="

redis:

", backend="

redis:")

5678

defadd(x, y):

9return x + y

2.執行celery伺服器

在建立好應用之後,就可以使用celery命令執行程式執行worker了:

celery -a test worker -l info

執行後可以看到如下圖:  

有關可用命令列選項的完整列表,執行如下命令:

celery worker --help

3.呼叫任務

要呼叫任務,可以使用delay()方法。

該任務會返回乙個asyncresult例項,可用於查詢任務狀態、獲取任務返回值等。此時檢視前面執行的伺服器,會看到有如下資訊:

received task: test.add[e7f01461-8c4d-4c29-ab6b-27be5084ecd9]

task test.add[e7f01461-8c4d-4c29-ab6b-27be5084ecd9] succeeded in 0.006505205000166825s: 5

4.檢視結果

在前面定義的時候,已經選擇使用redis作為結果後端了,所以任務執行後的結果會儲存到redis中。而且,在呼叫任務的時候,還可以進行如下操作:

其中ready()方法會返回該任務是否已經執行,get()方法則會獲取任務返回的結果。

5.配置檔案

由於celery的配置資訊比較多,因此一般會建立乙個配置檔案來儲存這些配置資訊,通常會命名為celeryconfig.py。在test.py所在資料夾下新建配置檔案celeryconfig.py,其中的**如下:

1

#broker(訊息中介軟體來接收和傳送任務訊息)

2 broker_url = '

redis:'3

#backend(儲存worker執行的結果)

4 celery_result_backend = '

redis:'5

6#設定時間參照,不設定預設使用的utc時間

7 celery_timezone = '

asia/shanghai'8

#指定任務的序列化

9 celery_task_serializer = '

json'10

#指定執行結果的序列化

11 celery_result_serializer = '

json

'

然後修改下test.py中的**:

1

from celery import

celery23

test")

celerystudy.celeryconfig")

6789

defadd(x, y):

10return x + y

分布式任務佇列Celery

celery 芹菜 是基於python開發的分布式任務佇列。它支援使用任務佇列的方式在分布的機器 程序 執行緒上執行任務排程。基本用法是在程式裡引用celery,並將函式方法繫結到task from celery import celery def add x,y return x y from t...

Celery分布式任務佇列

celery是乙個簡單 靈活且可靠的,處理大量訊息的分布式系統 專注於實時處理的非同步任務佇列 同時也支援任務排程 celery的架構由三部分組成,訊息中介軟體 message broker 任務執行單元 worker 和任務執行結果儲存 task result store 組成。訊息中介軟體 ce...

Celery分布式任務佇列

celery是一款非常簡單,靈活,可靠的分布式系統,可用於處理大量訊息,並且提供了一整套操作此系統的一系列工具 celery是一款訊息佇列工具,可用於處理實時資料以及任務排程 什麼是任務佇列?任務佇列一般用於執行緒或計算機之間分配工作的一種機制 任務佇列的輸入是乙個成為任務的工作單元,有專門的職稱 ...