python celery 多work多佇列

2021-08-20 07:28:43 字數 3206 閱讀 7198

1.celery模組呼叫既然celery是乙個分布式的任務排程模組,那麼celery是如何和分布式掛鉤呢,celery可以支援多台不通的計算機執行不同的任務或者相同的任務。

如果要說celery的分布式應用的話,就要提到celery的訊息路由機制,amqp協議。具體的可以檢視amqp的文件。簡單地說就是可以有多個訊息佇列(message queue),不同的訊息可以指定傳送給不同的message queue,而這是通過exchange來實現的。傳送訊息到message queue中時,可以指定routiing_key,exchange通過routing_key來把訊息路由(routes)到不同的message queue中去。

多worker,多佇列,例項:

1.在伺服器上編寫檔案tasks.py。首先定義乙個celery的物件,然後通過celeryconfig.py對celery物件進行設定。之後又分別定義了三個task,分別是taska, taskb和add。

#!/usr/bin/env

#-*-conding:utf-8-*-

from celery import celery,platforms

platforms.c_force_root = true

def tasha(x,y):

return x*y

def taskb(x,y,z):

return x+y+z

def add(x,y):

return x+y

2.編寫celeryconfig.py檔案。

#!/usr/bin/env python

#-*- coding:utf-8 -*-

from kombu import exchange,queue

from celery import platforms

platforms.c_force_root = true

broker_url = "redis://localhost:6379/7" 

celery_result_backend = "redis://localhost:6379/8"

celery_queues = (

queue("default",exchange("default"),routing_key="default"),

queue("for_task_a",exchange("for_task_a"),routing_key="for_task_a"),

queue("for_task_b",exchange("for_task_b"),routing_key="for_task_b") 

)celery_routes = ,

'tasks.taskb':

}

3.啟動worker來指定task

4.傳入引數

將上面兩個檔案匯出到pycharm中:

編寫檔案傳參:

from tasks import *

re1 = taska.delay(100, 200)

re2 = taskb.delay(1,2, 3)

print(re3.status) #檢視re3的狀態

print(re3.id)               #檢視re3的id

執行之後可見:taska,taskb都已正常執行。

5.我們可以看到add(re3)的狀態是pending,表示沒有執行,這個是因為沒有celeryconfig.py檔案中指定改route到哪乙個queue中,所以會被發動到預設的名字celery的queue中,但是我們還沒有啟動worker執行celery中的任務。下面,我們來啟動乙個worker來執行celery佇列中的任務。

這樣我們再次執行pycharm就可以看見add也被執行了,並且redis資料庫中也有該id了。

1.在celery中執行定時任務非常簡單,只需要設定celery物件中的celerybeat_schedule屬性即可。

下面我們接著在celeryconfig.py中新增celerybeat_schedule變數:

celery_timezone = 'utc'

celerybeat_schedule = ,

'taskb_scheduler' : ,

'add_schedule':

}

2.celery啟動定時任務

celery -a tasks worker -l info -n workera.%h -q for_task_a -b

啟動完成後:

taska每20秒執行一次taska.delay(5, 6)

taskb每200秒執行一次taskb.delay(10, 20, 30)

celery每10秒執行一次add.delay(1, 2)

可以自行觀察下。

python celery學習筆記

專案中需要非同步執行某個任務,且失敗的時候需要重試,且需要知道是否執行成功,可以這樣設計。案例 如下 task def test task args,kwargs 處理邏輯 return truedef home request from celery tasks import test task ...

python celery 任務排程器

celery是python開發的分布式任務排程模組,今天抽空看了一下,果然介面簡單,開發容易,5分鐘就寫出了乙個非同步傳送郵件的服務。celery本身不含訊息服務,它使用第三方訊息服務來傳遞任務,目前,celery支援的訊息服務有rabbitmq redis甚至是資料庫,當然redis應該是最佳選擇...

Python celery 任務例項

coding utf 8 celery 任務示例 本地啟動celery命令 python manage.py celery worker settings settings 週期性任務還需要啟動celery排程命令 python manage.py celerybeat settings setti...