Celery分布式任務佇列

2022-07-26 03:33:14 字數 4004 閱讀 7517

celery是乙個簡單、靈活且可靠的,處理大量訊息的分布式系統

專注於實時處理的非同步任務佇列

同時也支援任務排程

celery的架構由三部分組成,訊息中介軟體(message broker),任務執行單元(worker)和任務執行結果儲存(task result store)組成。

訊息中介軟體

celery本身不提供訊息服務,但是可以方便的和第三方提供的訊息中介軟體整合。包括,rabbitmq, redis等等

任務執行單元

worker是celery提供的任務執行的單元,worker併發的執行在分布式的系統節點中。

任務結果儲存

task result store用來儲存worker執行的任務的結果,celery支援以不同方式儲存任務的結果,包括amqp, redis等

celery version 4.0 runs on

python ❨2.7, 3.4, 3.5❩

pypy ❨5.4, 5.5❩

this is the last version to support python 2.7, and from the next version (celery 5.x) python 3.5 or newer is required.

​if you』re running an older version of python, you need to be running an older version of celery:

​python 2.6: celery series 3.1 or earlier.

python 2.5: celery series 3.0 or earlier.

python 2.4 was celery series 2.2 or earlier.

​celery is a project with minimal funding, so we don』t support microsoft windows. please don』t open any issues related to that platform.

定時任務:定時執行某件事情,比如每天資料統計

pip install celery

訊息中介軟體:rabbitmq/redis

基本使用

建立專案celerytest

import celery

import time

# broker='redis:' 不加密碼

backend='redis:'

broker='redis:'

cel=celery.celery('test',backend=backend,broker=broker)

@cel.task

def add(x,y):

return x+y​​

建立py檔案:add_task.py,新增任務

建立py檔案:result.py,檢視任務執行結果

執行 add_task.py,新增任務,並獲取任務id

執行 result.py,檢查任務狀態並獲取結果

多工結構

pro_cel

├── celery_task# celery相關資料夾

│   ├── celery.py   # celery連線和配置相關檔案,必須叫這個名字

│   └── tasks1.py    # 所有任務函式

│└── tasks2.py    # 所有任務函式

├── check_result.py # 檢查結果

└── send_task.py    # 觸發任務

celery.py

from celery import celery

​cel = celery('celery_demo',

broker='redis:',

backend='redis:',

# 包含以下兩個任務檔案,去相應的py檔案中找任務,對多個任務做分類

include=['celery_task.tasks1',

'celery_task.tasks2'])​

# 時區

cel.conf.timezone = 'asia/shanghai'

# 是否使用utc

cel.conf.enable_utc = false

tasks1.py

import time

from celery_task.celery import cel

​@cel.task

def test_celery(res):

time.sleep(5)

return "test_celery任務結果:%s"%res

tasks2.py

import time

from celery_task.celery import cel

@cel.task

def test_celery2(res):

time.sleep(5)

return "test_celery2任務結果:%s"%res

check_result.py

send_task.py

from celery_task.tasks1 import test_celery

from celery_task.tasks2 import test_celery2

​# 立即告知celery去執行test_celery任務,並傳入乙個引數

result = test_celery.delay('第乙個的執行')

print(result.id)

result = test_celery2.delay('第二個的執行')

print(result.id)

新增任務(執行send_task.py),開啟work:celery worker -a celery_task -l info -p eventlet,檢查任務執行結果(執行check_result.py)

設定時間讓celery執行乙個任務

add_task.py

類似於contab的定時任務

多工結構中celery.py修改如下

from datetime import timedelta

from celery import celery

from celery.schedules import crontab

​cel = celery('tasks', broker='redis:', backend='redis:', include=[

'celery_task.tasks1',

'celery_task.tasks2',

])cel.conf.timezone = 'asia/shanghai'

cel.conf.enable_utc = false

​cel.conf.beat_schedule = ,

# 'add-every-12-seconds': ,

}

啟動乙個beat:celery beat -a celery_task -l info

啟動work執行:celery worker -a celery_task -l info -p eventlet

安裝包

celery==3.1.25

django-celery==3.1.20

在專案目錄下建立celeryconfig.py

from celery import task

@task

def add(a,b):

with open('a.text', 'a', encoding='utf-8') as f:

f.write('a')

print(a+b)

檢視函式views.py

settings.py

分布式任務佇列Celery

celery 芹菜 是基於python開發的分布式任務佇列。它支援使用任務佇列的方式在分布的機器 程序 執行緒上執行任務排程。基本用法是在程式裡引用celery,並將函式方法繫結到task from celery import celery def add x,y return x y from t...

Celery分布式任務佇列

celery是一款非常簡單,靈活,可靠的分布式系統,可用於處理大量訊息,並且提供了一整套操作此系統的一系列工具 celery是一款訊息佇列工具,可用於處理實時資料以及任務排程 什麼是任務佇列?任務佇列一般用於執行緒或計算機之間分配工作的一種機制 任務佇列的輸入是乙個成為任務的工作單元,有專門的職稱 ...

關於Celery 分布式任務佇列

celery 是 distributed task queue,分布式任務佇列,分布式決定了可以有多個 worker 的存在,佇列表示其是非同步操作,即存在乙個產生任務提出需求的工頭,和一群等著被分配工作的碼農。在 python 中定義 celery 的時候,我們要引入 broker,中文翻譯過來就...