celery動態任務元件Demo以及原理

2021-08-17 05:24:29 字數 3178 閱讀 6067

celery是乙個基於python的分布式排程系統,文件在這 ,最近有個需求,想要動態的新增任務而不用重啟celery服務,找了一圈沒找到什麼好辦法(也有可能是文件沒看仔細),所以只能自己實現囉

為celery動態新增任務,首先我想到的是傳遞乙個函式進去,讓某個特定任務去執行這個傳遞過去的函式,就像這樣

def execute(func, *args, **kwargs):

return

func

(*args, **kwargs)

很可惜,會出現這樣的錯誤

kombu.exceptions.encodeerror: object

oftype

'function'

isnot json serializable

換一種序列化方式

def execute(func, *args, **kwargs):

return

func

(*args, **kwargs)

結果又出現一大串錯誤資訊

traceback (most recent call

last):

file

"/home/jl/.virtualenvs/test/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__

return obj.__dict__[self.__name__]

keyerror: 'chord'

during handling of the above exception, another exception occurred:

traceback (most recent call

last):

file

"/home/jl/.virtualenvs/test/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__

return obj.__dict__[self.__name__]

keyerror: '_payload'

換一種思路

func = import_string

(func)

不知道這樣是否可以,結果測試:no

哎,流年不利.

最後一直測試,一直測試,終於找到了一種辦法,直接上**

from importlib import import_module, reload

def import_string(import_name):

import_name = str(import_name).replace(':', '.')

modules = import_name.split('.')

mod = import_module(modules[0])

for comp in modules[1:]:

ifnot hasattr(mod, comp):

reload(mod)

mod = getattr(mod, comp)

return

def execute(func, *args, **kwargs):

func = import_string

(func)

return

func

(*args, **kwargs)

專案結構是這樣的

注意:任務必須大於等於兩層目錄

以後每次新增任務都可以先新增到all_task.py裡,呼叫時不用再重啟celery服務

# task/all_task.py

def ee(c, d):

return c, d, '你好'

# example

execute.delay('task.all_task.ee', 2, 444) [ execute.delay('task.all_task.ee', 2, 444)]

ok,另外發現celery也支援任務定時呼叫,就像這樣

簡單實現乙個任務重複呼叫的功能

def interval(func, seconds, args=(), task_id=none):

next_run_time = current_time() + timedelta(seconds=seconds)

kwargs = dict(args=(func, seconds, args), eta=next_run_time)

if task_id is not none:

kwargs.update(task_id=task_id)

func = import_string

(func)

return

func

(*args)

大概意思就是先計算下次執行的時間,然後把任務新增到celery佇列裡,這裡有個task_id有些問題,因為假設新增了每隔3s執行乙個任務,

它的task_id缺省會使用uuid生成,如果想要再移除這個任務就不太方便,自定task_id可能會好一些,另外也許需要判斷task_id是否存在

asyncresult(task_id).state
ok,再獻上乙個好用的函式

from inspect import getmembers, isfunction

def get_tasks(module='task'):

return ['.format(f[1].__name__),

'doc': f[1].__

doc__,

} for f in getmembers(import

_module(module), isfunction)] [ from inspect import getmembers, isfunction def get_tasks(module='task'): return ['.format(f[1].__name__), 'doc': f[1].__doc__, } for f in getmembers(import_module(module), isfunction)]]

就這樣.

Celery任務佇列

使用任務佇列作為分發任務的機制。乙個任務佇列的輸入是一組被稱為任務的工作單元。專用的工人會持續監聽任務佇列來等待完成新的工作。celery通過訊息進行通訊,通常使用中間人作為客戶端和工人 workers 間的媒介。為了初始化一項任務,客戶端會新增一條訊息到佇列中,然後中間人傳遞這條訊息給乙個work...

celery 任務模組

每天不知道忙啥,到了這個點才開始學習 1.新建python檔案 from future import absolute import 絕對路徑的匯入 from celery import celery from django.conf import settings import os 設定系統的環...

celery 執行celery定時任務

場景 在虛擬機器上執行 python django celery redis 的定時任務 可能遇到的問題 如果在執行過程中,定時任務突然退出,並報以下錯誤,錯誤顯示,沒有許可權訪問一些目錄檔案 解決方案 1 關閉當前redis服務 在step 3中有描述如何關閉 2 以root使用者執行啟動redi...