celery多個佇列優先順序實現

2021-07-25 06:51:21 字數 3777 閱讀 5450

celery 是乙個簡單、靈活且可靠的,處理大量訊息的分布式系統,並且提供維護這樣乙個系統的必需工具。

它是乙個專注於實時處理的任務佇列,同時也支援任務排程。

優先順序佇列問題:

celery 官方給出的是不支援優先順序佇列:

(建議看看這個論壇)

或者:

celery中broker 常用的是rabbitmq,rabbitmq 3.5版本之後支援優先順序佇列,訊息具有不同的優先順序,同乙個佇列中優先順序高的先得到處理。rabbitmq 支援這種特性的話,celery就可以實現優先順序佇列,畢竟celery是從broker中獲取msg的。

celery還可以通過rabbitmq中的consumer priorities 的特性支援佇列之間的優先順序。a和b兩個佇列,a佇列的consumer優先順序位10,b佇列的consumer優先順序位1.那麼之後當a佇列的consumer都處於阻塞狀態的時候,b佇列的consumer才能從rabbitmq中獲取消費資訊。

from kombu import exchange, queue

celery_acks_late = true

celery_ignore_result = true

celery_disable_rate_limits = true

broker_transport_options =

worker_max_memory_per_child = 300

celery_queues = (

queue('analyse', routing_key='analyse',consumer_arguments=),

queue('transcode', routing_key='transcode',consumer_arguments=),

queue('download', routing_key='download',consumer_arguments=),

)consumer_arguments=   數字越大,優先順序越高

在使用python celery分布式框架時,有時候會遇到緊急事務處理,但是當佇列很長時,celery也沒有隊首插入task的api,那麼這個時候可以採用一種變相的方法來實現優先順序佇列。

【需求背景】

對於非同步任務處理,相信很多人首選celery,的確,celery處理非同步任務非常強悍,使用簡單,支援各種併發。但是,大家來看看我所遇到的乙個應用場景:每次後台上傳乙個遊戲母包,然後對這個母包處理(新增某種標識,比如id)生成多個遊戲子包,其中有一些id號的包是要求盡快的處理的,剩下的可以閒時處理。這裡就對要把乙個母包分成兩個任務來處理,其中乙個是優先處理的,另乙個是閒時處理。

【方案初探】

對於上面的場景,最先想到的方案是,把每個母包處理任務分成優先和閒時兩個celery任務佇列分別處理,分別單獨配給cpu資源(土豪的話給多一台機器也行)專門處理。大家估計也想到這種做法的弊端了,這樣無法有效使用資源,當優先任務佇列沒有任務時,閒時任務佇列卻滿載,顯然這種設計方案不是很好。

那麼有沒有更好的處理方案呢?試想如果任務可以按優先級別在佇列中排隊就好了。顯然celery並沒有提供優先佇列這種機制,那麼我們只能自己實現乙個celery一樣的非同步事件佇列,並且支援優先順序的佇列。這時候顯然想到的是redis。

【redis優先佇列】

redis中提供了blpop,rpush(rlpop,lpsuh)這些佇列操作。

來看看bloop的介紹:

blpop key [key …] timeout

blpop 是列表的阻塞式(blocking)彈出原語。

它是 lpop 命令的阻塞版本,當給定列表內沒有任何元素可供彈出的時候,連線將被 blpop命令阻塞,直到等待超時,或有另乙個客戶端對給定 key 的任意乙個執行 lpush 或 rpush 命令為止。

當給定多個 key 引數時,按引數 key的先後順序依次檢查各個列表,彈出第乙個非空列表的頭元素(這是就是實現優先順序的關鍵)。

那麼我們可以設定兩個key,乙個表示優先任務的key,姑且叫priority_task,另乙個閒時任務的key,就叫normal_task。在新增任務時,把對應任務所要必備引數新增的對應的key值佇列即可。具體如下:

priority_task = 

redis.rpush('priority_task', json.dumps(priority_task))

normal_task =

redis.rpush('normal_task', json.dumps(normal_task))

成功入隊後,接下來就是不斷從佇列中取出任務,然後對應處理,大概**如下:

while 1:

# 監聽任務,沒有打包任務則阻塞

key, task = redis.blpop(['priority_task', 'normal_task'])

deal_task(key, json.loads(task))

【任務動態切換】

上面實現保證了每次從佇列取出的任務都是優先級別最高的,但是存在著問題,比如當前正在處理閒時任務,可是這個閒時任務可能要處理200+個包,這時候佇列中又來了乙個優先任務,那麼這個優先任務必須等待之前的閒時任務處理完成才能開始處理,這顯然不是我們想要的,那麼我們能掛起當前正在處理的閒時任務,先去處理優先任務嗎。顯然是可以的,就是乙個最簡單的協程:函式呼叫。只需要在閒時任務處理完每個子包後,檢查優先任務佇列是否有元素,有則呼叫函式先處理優先任務,等優先任務完成後,再繼續處理閒時任務。

處理函式大概如下:

def deal_task(key, task):  # 任務處理函式

id_list = task['id_list'] # 要生成的子包id

for id in id_list:

do something.... # 生成對應的id子包

if key == 'normal_key': # 如果當前是閒時任務

while redis.llen('priority_key') > 0: # 檢查是否有優先任務,有則獲取並執行

priority_task = redis.lpop('priority_key')

if priority_task:

deal_task('priority_key', json.loads(priority_task)) # 執行優先任務處理

以上就是實現乙個單程序處理非同步優先任務佇列的全過程。

【多程序化】

上面實現都是單程序處理的,為了提公升處理效率,我們可以開多個程序提公升併發量,這裡建議使用supervisor來管理你的這些程序。這裡需要注意:

多程序處理臨界資源,如果沒有相關臨界資源的競爭那最好,如果有,那麼你必須考慮怎麼處理,一般是用佇列順序化。

supervisor持久化程序資料庫鏈結,會導致資料庫雖然已經斷開連線,但是程序並不知曉,當程序再次執行資料庫查詢時就會出錯,mysql一般會報乙個gone away的錯誤。

注:還可以用程序池非同步處理。

優先順序佇列的實現

優先順序佇列 佇列裡面的所有元素都有相應的權值,元素的刪除順序由這些權值決定。優先順序佇列的實現一般用堆來實現其效率比一般的實現要高。要弄清楚堆我們得先弄清楚下面的定義 一顆大根樹 小根樹 是這樣一棵樹,其中每個節點的值都大於 小於 或等於其子節點 如果有子節點的話 的值。大根堆 乙個大根堆 小根堆...

優先順序佇列(堆實現)

一 優先順序佇列定義 二 方法實現 獲得最大元素方法 去掉最大元素方法 修改優先順序方法 新增節點 三 實現 用堆實現乙個優先順序佇列 主要是新增 修改 刪除節點 節點具有唯一性 author hhf 2014年11月28日 public class priorityqueue 返回優先佇列中優先順...

Redis實現優先順序佇列

title redis實現優先順序佇列 tags 基於目前系統中存在部分非同步需求,比如匯入或者新開客戶車輛匹配vin碼等 redis中使用列表作為佇列 最關鍵提供了阻塞版本的指令blpop 新建三個佇列對應高中低優先順序 比如f6car high f6car mid f6 car low 再新建d...