celery非同步框架

2022-05-06 20:39:08 字數 3920 閱讀 4618

官方

celery 官網:

celery 官方文件英文版:

celery 官方文件中文版:

celery的架構由三部分組成,訊息中介軟體(message broker)、任務執行單元(worker)和 任務執行結果儲存(task result store)組成。

訊息中介軟體

celery本身不提供訊息服務,但是可以方便的和第三方提供的訊息中介軟體整合。包括,rabbitmq, redis等等

任務執行單元

worker是celery提供的任務執行的單元,worker併發的執行在分布式的系統節點中。

任務結果儲存

task result store用來儲存worker執行的任務的結果,celery支援以不同方式儲存任務的結果,包括amqp, redis等

延遲執行:解決延遲任務 --- 延遲傳送右鍵

定時執行:解決週期任務 --- 定時更新快取

#

如果celery物件:celery(...)是放在乙個模組下的,

#1)終端切換到該模組所在資料夾位置,如scripts

#2) 執行啟動worker的命令: celery worker -a 模組名 -l info -p eventlet

#注:windows系統需要eventlet支援,linux與macos直接執行:celery worker -a 模組名 -l info

#注:模組名隨意

#如果celery物件:celery(...)是放在乙個包下的,

#1)必須在這個包下建乙個規定名字為celery.py的檔案,將celery(...)產生物件的語句放在該檔案中

#2) 執行啟動worker的命令: celery worker -a 包名 -l info -p eventlet

#注:windows系統需要eventlet支援,linux與macos直接執行:celery worker -a 包名 -l info

#注:包名隨意

from celery import

celery

傳入必要引數

#

tt_celey是包

from celery import

celery

傳入必要引數

project

├── celery_task

#celery包

│ ├── __init__.py #

包檔案 │ ├── celery.py #

celery連線和配置相關檔案,且名字必須交celery.py

│ └── tasks.py #

所有任務函式

├── add_task.py #

新增任務

└── get_result.py #

獲取結果

scripts/celery_task_1/celery.py

from celery import

celery

#連線redis: 'redis://:密碼@伺服器ip:埠/資料庫編號'

broker = '

redis:

'backend = '

redis:'#

worker配置

celery_task_1.tasks'])

#時區asia/shanghai'#

是否使用utc##

定時任務的配置

''' '自定義定時任務名':

}'''

#beat配置

from datetime import

timedelta

from celery.schedules import

crontab

'test-task':

}

scripts/celery_task_1/tasks.py

from .celery import

deftest_task(data):

print('

該方法就是任務,任務被執行了,傳入的引數:%s

' %data)

return

'該內容就是任務結果

'def

add(n1, n2):

r = n1 +n2

print('

%s + %s = %s

' %(n1, n2, r))

return r

scripts/add_task.py

#

總結:該檔案一定要獨立開celery封裝的包

#原因:比如celery有乙個更新輪播圖快取的任務,django專案是可以響應前台或後台使用者主動更新輪播圖資料庫的資料,

#當使用者更新了資料庫資料,就可以執行一下**,通知celery可以去非同步執行更新輪播圖快取的任務了

#右鍵執行該檔案,下面帶導包是合理的

from celery_task_1.tasks import

test_task

#直接匯入函式,呼叫函式,和celery沒有任何關係

#test_task(666)

#要將任務交給celery來執行##

t1 = test_task.delay(666) # 返回值是任務物件,直接輸出代表任務唯一標識:id

#print(t1.id)

#2)延遲任務(達到設定的延遲時間後再去非同步執行) => 定時傳送郵件

from datetime import

datetime, timedelta

eta = datetime.utcnow() + timedelta(seconds=10)##

print(t2.id)

from celery_task_1.tasks import

add#

print

(t3.id)

#3)定時任務(在worker服務以外,再啟動乙個beat服務,定時幫我們自動新增任務) => 定時更新輪播圖

#i)在celery中配置好beat_schedule的配置後,執行命令啟動定時新增任務服務

#>: celery beat -a 包名|模組名 -l info

scripts/get_task_result.py

#

獲取任務結果也是專案正常邏輯來呼叫的

from celery.result import

asyncresult

from celery_task_1.celery import

id = '

fad4b75c-c168-443a-be7c-7696b3233295'if

__name__ == '

__main__':

iftask_result.successful():

result =task_result.get()

print

(result)

elif

task_result.failed():

print('

任務失敗')

elif task_result.status == '

pending':

print('

任務等待中被執行')

elif task_result.status == '

retry':

print('

任務異常後正在重試')

elif task_result.status == '

started':

print('

任務已經開始被執行

')

python非同步任務處理框架 celery

celery 是一款非常簡單 靈活 可靠的分布式系統,可用於處理大量訊息,並且提供了一整套操作此系統的一系列工具,同時celery 是一款訊息佇列工具,可用於處理實時資料以及任務排程。比方說現在站點註冊需要在使用者註冊完成後傳送啟用郵件給使用者,而後台傳送郵件時間需要一定時間,而又不能同步等待郵件傳...

celery 非同步通訊

celery 由 broker 中間人 client 任務的發出者 和worker 任務的處理者 三部分組成 client 發出訊息到佇列中,broker將佇列中的資訊派發給worker處理 乙個celery系統可以包含很多的worker 和broker,可增強橫向擴充套件性和高可用性能 1.建立c...

celery非同步分布式框架使用一

一 celery簡介 1 celery 分布式任務佇列 2 celery 是一款非常簡單 靈活 可靠的分布式系統,可用於處理大量訊息,並且提供了一整套操作此系統的一系列工具。3 celery 是一款訊息佇列工具,可用於處理實時資料以及任務排程。4 celery 通過訊息機制進行通訊,通常使用中間人 ...