分布式任務佇列Celery

2021-09-19 04:05:59 字數 3694 閱讀 5972

celery (芹菜)是基於python開發的分布式任務佇列。它支援使用任務佇列的方式在分布的機器/程序/執行緒上執行任務排程。

基本用法是在程式裡引用celery,並將函式方法繫結到task

from celery import celery

def add(x, y):

return x + y

from tasks import add

import time

result = add.delay(4,4)

while not result.ready():

print "not ready yet"

time.sleep(5)

print result.get()

由於是採用訊息佇列,因此任務提交之後,程式立刻返回乙個任務id。

之後可以通過該id查詢該任務的執行狀態和結果。

執行1個任務,完成後再執行第2個,第乙個任務的結果做第二個任務的入參

結果:2+2+16=20

還可以做錯誤處理

def error_handler(self, uuid):

print('task raised exception: \n'.format(

uuid, result.result, result.traceback))

讓任務在指定的時間執行,與下文敘述的週期性任務是不同的。

from datetime import datetime, timedelta

tomorrow = datetime.utcnow() + timedelta(seconds=3)

tip

expires單位秒,超過過期時間還未開始執行的任務會被**

將任務結果按照一定格式序列化處理,支援pickle, json, yaml and msgpack

將任務結果壓縮

使用-q引數為佇列(queue)命名,然後呼叫任務時可以指定相應佇列

$ celery -a proj worker -l info -q celery,priority.high
按照一定關係一次呼叫多個任務

from celery import group

>>> res = group(add.s(i, i) for i in xrange(10))()

>>> res.get(timeout=1)

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

>>> from celery import chain

# 2 + 2 + 4 + 8

>>> res = chain(add.s(2, 2), add.s(4), add.s(8))()

>>> res.get()

16可以用|來表示chain

# ((4 + 16) * 2 + 4) * 8

>>> c2 = (add.s(4, 16) | mul.s(2) | (add.s(4) | mul.s(8)))

>>> res = c2()

>>> res.get()

>>> from celery import chord

#1*2+2*2+...9*2

>>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())()

>>> res.get()

90

>>> from proj.tasks import add

>>> ~xsum.map([range(10), range(100)])

[45, 4950]

>>> ~add.starmap(zip(range(10), range(10)))

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

>>> from proj.tasks import add

>>> res = add.chunks(zip(range(100), range(100)), 10)()

>>> res.get()

[[0, 2, 4, 6, 8, 10, 12, 14, 16, 18],

[20, 22, 24, 26, 28, 30, 32, 34, 36, 38],

[40, 42, 44, 46, 48, 50, 52, 54, 56, 58],

[60, 62, 64, 66, 68, 70, 72, 74, 76, 78],

[80, 82, 84, 86, 88, 90, 92, 94, 96, 98],

[100, 102, 104, 106, 108, 110, 112, 114, 116, 118],

[120, 122, 124, 126, 128, 130, 132, 134, 136, 138],

[140, 142, 144, 146, 148, 150, 152, 154, 156, 158],

[160, 162, 164, 166, 168, 170, 172, 174, 176, 178],

[180, 182, 184, 186, 188, 190, 192, 194, 196, 198]]

週期性任務就是按照一定的時間檢查反覆執行的任務。前面描述的定時任務值的是一次性的任務。

程式中引入並配置好週期性任務後,beat程序就會定期調起相關任務

beat程序是需要單獨啟動的

$ celery -a proj beat
或者在worker啟動時一起拉起

$ celery -a proj worker -b
注意一套celery只能啟乙個beat程序

由於python中時間預設是utc時間,因此最簡便的方法是celery也用utc時區

celery_timezone = 'utc'
這麼配置可以保證任務排程的時間是準確的,但由於伺服器一般都配置時區,因此flower、以及日誌中的時間可能會有偏差

另外一種方法,就是配置正確的時區

celery_timezone = 'asia/shanghai'
然後任務調起時,將時間帶入時區配置

format_eta = local_tz.localize(datetime.strptime(eta.strip(), '%y/%m/%d %h:%m:%s'))

from datetime import timedelta

celerybeat_schedule = ,

}

from celery.schedules import crontab

celerybeat_schedule = ,

}

本文參考官方文件

Celery分布式任務佇列

celery是乙個簡單 靈活且可靠的,處理大量訊息的分布式系統 專注於實時處理的非同步任務佇列 同時也支援任務排程 celery的架構由三部分組成,訊息中介軟體 message broker 任務執行單元 worker 和任務執行結果儲存 task result store 組成。訊息中介軟體 ce...

Celery分布式任務佇列

celery是一款非常簡單,靈活,可靠的分布式系統,可用於處理大量訊息,並且提供了一整套操作此系統的一系列工具 celery是一款訊息佇列工具,可用於處理實時資料以及任務排程 什麼是任務佇列?任務佇列一般用於執行緒或計算機之間分配工作的一種機制 任務佇列的輸入是乙個成為任務的工作單元,有專門的職稱 ...

關於Celery 分布式任務佇列

celery 是 distributed task queue,分布式任務佇列,分布式決定了可以有多個 worker 的存在,佇列表示其是非同步操作,即存在乙個產生任務提出需求的工頭,和一群等著被分配工作的碼農。在 python 中定義 celery 的時候,我們要引入 broker,中文翻譯過來就...