Celery的實踐指南

2022-05-08 12:12:08 字數 4225 閱讀 1946

celery的實踐指南

celery原理:

celery實際上是實現了乙個典型的生產者-消費者模型的訊息處理/任務排程統,消費者(worker)和生產者(client)都可以有任意個,他們通過訊息系統(broker)來通訊。

典型的場景為:

客戶端啟動乙個程序(生產者),當使用者的某些操作耗時較長或者比較頻繁時,考慮接入本訊息系統,傳送乙個task任務給broker。

後台啟動乙個worker程序(消費者),當發現broker中儲存有某個任務到了該執行的時間,他就會拿過來,根據task型別和引數執行。

實踐中的典型場景:

執行celery的worker,讓他作為consumer執行,自動從broker上獲得任務並執行。`celery -a celery_demo worker`

執行celery的client,讓其根據schedule,自動生產出task msg,並發布到broker上。`celery -a celery_demo beat`

安裝並執行flower,方便監控task的執行狀態`celery flower -a celery_demo`

或者設定登入密碼 `

celery flower -a celery_demo --basic_auth=user1:password1,user2:password2

多同步任務-鏈式任務-

失敗自動重試的task失敗重試方法: 將task**函式引數增加self,同時繫結bind。

自動重試後,是否將任務重新入queue後排隊,還是等待指定的時間?可以通過self.retry()引數來指定。

派發到不同queue佇列的task乙個task自動對映到多個queue中的方法, 通過配置task和queue的routing_key命名模式。比如:把queue的exchange和routing_key配置成通用模式:

再定義task的routing_key的名稱:

可用的不同exchange策略:direct:直接根據定義routing_key

topic:exchange會根據萬用字元來將乙個訊息推送到多個queue。

fanout:將訊息拆分,分別推送到不同queue,通常用於超大任務,耗時任務。

參考:高階配置result是否儲存

失敗郵件通知:

關閉rate limit:

auto_reload方法(*nix系統):celery通過監控源**目錄的改動,自動地進行reload

使用方法:1.依賴inotify(linux) 2. kqueue(os x / bsd)

安裝依賴:

$ pip install pyinotify

(可選) 指定fsnotify的依賴:

$ env celeryd_fsnotify=stat celery worker -l info --autoreload

auto-scale方法:啟用auto-scale

臨時增加worker程序數量(增加consumer):

$ celery -a proj control add_consumer foo -d worker1.local

臨時減少worker程序數量(減少consumer):

啟動停止worker的方法:啟動 as daemon : 

root使用者可以使用celerydom

e/ru

n/ce

lery

/'>home/run/celery/

home/run/celery/home/log/celery/%n.log"

或者 celery worker —detach

停止ps auxww | grep 'celery worker' | awk '' | xargs kill -9

與flask整合的方法整合後flask將充當producer來建立並傳送task給broker,在celery啟動的獨立worker程序將從broker中獲得task並執行,同時將結果返回。

flask獲得

與flask整合後的啟動問題由於celery的預設routing_key是根據生產者在**中的import級別來設定的,所以worker端在啟動時應該注意其啟動目錄應該在專案頂級目錄上,否者會出現keyerror。

效能提公升: eventlet 和 greenlet

官方參考:

celery的實踐指南

celery原理:

celery實際上是實現了乙個典型的生產者-消費者模型的訊息處理/任務排程統,消費者(worker)和生產者(client)都可以有任意個,他們通過訊息系統(broker)來通訊。

典型的場景為:

客戶端啟動乙個程序(生產者),當使用者的某些操作耗時較長或者比較頻繁時,考慮接入本訊息系統,傳送乙個task任務給broker。

後台啟動乙個worker程序(消費者),當發現broker中儲存有某個任務到了該執行的時間,他就會拿過來,根據task型別和引數執行。

實踐中的典型場景:

執行celery的worker,讓他作為consumer執行,自動從broker上獲得任務並執行。`celery -a celery_demo worker`

執行celery的client,讓其根據schedule,自動生產出task msg,並發布到broker上。`celery -a celery_demo beat`

安裝並執行flower,方便監控task的執行狀態`celery flower -a celery_demo`

或者設定登入密碼 `

celery flower -a celery_demo --basic_auth=user1:password1,user2:password2

多同步任務-鏈式任務-

失敗自動重試的task失敗重試方法: 將task**函式引數增加self,同時繫結bind。

自動重試後,是否將任務重新入queue後排隊,還是等待指定的時間?可以通過self.retry()引數來指定。

派發到不同queue佇列的task乙個task自動對映到多個queue中的方法, 通過配置task和queue的routing_key命名模式。比如:把queue的exchange和routing_key配置成通用模式:

再定義task的routing_key的名稱:

可用的不同exchange策略:direct:直接根據定義routing_key

topic:exchange會根據萬用字元來將乙個訊息推送到多個queue。

fanout:將訊息拆分,分別推送到不同queue,通常用於超大任務,耗時任務。

參考:高階配置result是否儲存

失敗郵件通知:

關閉rate limit:

auto_reload方法(*nix系統):celery通過監控源**目錄的改動,自動地進行reload

使用方法:1.依賴inotify(linux) 2. kqueue(os x / bsd)

安裝依賴:

$ pip install pyinotify

(可選) 指定fsnotify的依賴:

$ env celeryd_fsnotify=stat celery worker -l info --autoreload

auto-scale方法:啟用auto-scale

臨時增加worker程序數量(增加consumer):

$ celery -a proj control add_consumer foo -d worker1.local

臨時減少worker程序數量(減少consumer):

啟動停止worker的方法:啟動 as daemon : 

root使用者可以使用celerydom

e/ru

n/ce

lery

/'>home/run/celery/

home/run/celery/home/log/celery/%n.log"

或者 celery worker —detach

停止ps auxww | grep 'celery worker' | awk '' | xargs kill -9

與flask整合的方法整合後flask將充當producer來建立並傳送task給broker,在celery啟動的獨立worker程序將從broker中獲得task並執行,同時將結果返回。

flask獲得

與flask整合後的啟動問題由於celery的預設routing_key是根據生產者在**中的import級別來設定的,所以worker端在啟動時應該注意其啟動目錄應該在專案頂級目錄上,否者會出現keyerror。

效能提公升: eventlet 和 greenlet

官方參考:

Celery入門指南

個人理解celery分布式訊息佇列就是乙個生產者消費者模式,celery產生任務交給中間人broker 在這裡使用redis作為中間人 中間人將任務分發給眾多的worker來完成任務。看乙個簡單的專案 建立tasks.py from celery import celery def add x,y ...

celery 視覺化 Celery 最佳實踐

1.用好celery beat 如果你想更好的管理專案的定時任務,可以用celery beat代替crontab管理。celery不僅支援動態的非同步任務 通過delay呼叫 也支援定時任務執行。當然我們可以用crontab實現任務的定時執行,但是crontab是與專案 隔離的,為了更方便地管理定時...

LINQ TO SQLite實踐指南

前言 當前,軟體應用程式中,資料庫已經成為不可缺少的重要組成部分.然而傳統資料庫正趨向巨無霸化,對系統的要求一步步提高,管理成本也越來越大,對於中小型專案的應用,它的很多功能變得越來越多餘,但是我卻不得不為這些不需要的功能付出更多的資金和人力成本.在這些場景,嵌入式資料庫的輕量,零部署,跨平台,義移...