用redis實現有優先順序的 celery

2021-07-09 14:37:53 字數 2753 閱讀 6620

[新部落格對應文章]

【需求背景】

對於非同步任務處理,相信很多人首選celery,的確,celery處理非同步任務非常強悍,使用簡單,支援各種併發。但是,大家來看看我所遇到的乙個應用場景:每次後台上傳乙個遊戲母包,然後對這個母包處理(新增某種標識,比如id)生成多個遊戲子包,其中有一些id號的包是要求盡快的處理的,剩下的可以閒時處理。這裡就對要把乙個母包分成兩個任務來處理,其中乙個是優先處理的,另乙個是閒時處理。

【方案初探】
對於上面的場景,最先想到的方案是,把每個母包處理任務分成優先和閒時兩個celery任務佇列分別處理,分別單獨配給cpu資源(土豪的話給多一台機器也行)專門處理。大家估計也想到這種做法的弊端了,這樣無法有效使用資源,當優先任務佇列沒有任務時,閒時任務佇列卻滿載,顯然這種設計方案不是很好。

那麼有沒有更好的處理方案呢?試想如果任務可以按優先級別在佇列中排隊就好了。顯然celery並沒有提供優先佇列這種機制,那麼我們只能自己實現乙個celery一樣的非同步事件佇列,並且支援優先順序的佇列。這時候顯然想到的是redis。

【redis優先佇列】
redis中提供了blpop,rpush(rlpop,lpsuh)這些佇列操作。

來看看bloop的介紹:

blpop key [key …] timeout

blpop 是列表的阻塞式(blocking)彈出原語。

它是 lpop 命令的阻塞版本,當給定列表內沒有任何元素可供彈出的時候,連線將被 blpop命令阻塞,直到等待超時,或有另乙個客戶端對給定 key 的任意乙個執行 lpush 或 rpush 命令為止。

當給定多個 key 引數時,按引數 key的先後順序依次檢查各個列表,彈出第乙個非空列表的頭元素(這是就是實現優先順序的關鍵)。

那麼我們可以設定兩個key,乙個表示優先任務的key,姑且叫priority_task,另乙個閒時任務的key,就叫normal_task。在新增任務時,把對應任務所要必備引數新增的對應的key值佇列即可。具體如下:

priority_task = 

redis.rpush('priority_task', json.dumps(priority_task))

normal_task =

redis.rpush('normal_task', json.dumps(normal_task))

成功入隊後,接下來就是不斷從佇列中取出任務,然後對應處理,大概**如下:

while

1: # 監聽任務,沒有打包任務則阻塞

key, task = redis.blpop(['priority_task', 'normal_task'])

deal_task(key, json.loads(task))

【任務動態切換】
上面實現保證了每次從佇列取出的任務都是優先級別最高的,但是存在著問題,比如當前正在處理閒時任務,可是這個閒時任務可能要處理200+個包,這時候佇列中又來了乙個優先任務,那麼這個優先任務必須等待之前的閒時任務處理完成才能開始處理,這顯然不是我們想要的,那麼我們能掛起當前正在處理的閒時任務,先去處理優先任務嗎。顯然是可以的,就是乙個最簡單的協程:函式呼叫。只需要在閒時任務處理完每個子包後,檢查優先任務佇列是否有元素,有則呼叫函式先處理優先任務,等優先任務完成後,再繼續處理閒時任務。

處理函式大概如下:

def

deal_task

(key, task):

# 任務處理函式

id_list = task['id_list'] # 要生成的子包id

for id in id_list:

do something.... # 生成對應的id子包

if key == 'normal_key': # 如果當前是閒時任務

while redis.llen('priority_key') > 0: # 檢查是否有優先任務,有則獲取並執行

priority_task = redis.lpop('priority_key')

if priority_task:

deal_task('priority_key', json.loads(priority_task)) # 執行優先任務處理

以上就是實現乙個單程序處理非同步優先任務佇列的全過程。

【多程序化】
上面實現都是單程序處理的,為了提公升處理效率,我們可以開多個程序提公升併發量,這裡建議使用supervisor來管理你的這些程序。這裡需要注意:

1. 多程序處理臨界資源,如果沒有相關臨界資源的競爭那最好,如果有,那麼你必須考慮怎麼處理,一般是用佇列順序化。

2. supervisor持久化程序資料庫鏈結,會導致資料庫雖然已經斷開連線,但是程序並不知曉,當程序再次執行資料庫查詢時就會出錯,mysql一般會報乙個gone away的錯誤。

注:還可以用程序池非同步處理。

【最後】
以上是本人的處理方案,如果有更好的建議記得留下寶貴的意見(>▽<)。

Redis實現優先順序佇列

title redis實現優先順序佇列 tags 基於目前系統中存在部分非同步需求,比如匯入或者新開客戶車輛匹配vin碼等 redis中使用列表作為佇列 最關鍵提供了阻塞版本的指令blpop 新建三個佇列對應高中低優先順序 比如f6car high f6car mid f6 car low 再新建d...

用redis實現支援優先順序的訊息佇列

用redis實現支援優先順序的訊息佇列 系統中引入訊息佇列機制是對系統乙個非常大的改善。例如乙個 web系統中,使用者做了某項操作後需要傳送郵件通知到使用者郵箱中。你可以使用同步方式讓使用者等待郵件傳送完成後反饋給使用者,但是這樣可能會因為網路的不確定性造成使用者長時間的等待從而影響使用者體驗。30...

使用Redis實現優先順序佇列

優先順序佇列是一種如先進先出佇列和堆疊資料結構的抽象資料型別。所不同的是每乙個元素關聯乙個 優先順序 優先順序高的元素比優先順序低的元素優先得到處理。本文講解如何基於redis的sorted set資料型別實現優先順序佇列。sorted set中元素關聯乙個score,可以按score有序查詢元素。...