celery 學習筆記(二)

2021-08-11 10:58:57 字數 4319 閱讀 9516

從圖上我們可以看出celery包含幾個模組

主要包括非同步任務和定時任務,非同步任務通常在業務邏輯中被觸發併發送到任務佇列中,而定時任務是由celery beat程序週期性的將任務發往任務佇列。

broker就是任務排程佇列,接收任務生產者傳送過來的訊息,將任務存入佇列,之所以需要中間人的原因是celery本身是不提供訊息佇列的服務,所以需要第三方元件實現,包括rabbitmq,redis,mongodb等,我這裡先了解下redis

worker是執行任務的單元,它實時的監控訊息佇列,如果有任務就獲取它並執行,它併發的執行分布在系統節點中。

backend用來儲存worker執行的任務的結果,celery支援以不同方式儲存任務的結果,包括redis,mongodb,django orm,amqp等,這裡我先不去看它是如何儲存的,就先選用redis來儲存任務執行結果。

1、celery的安裝

sudo pip install celery
2、redis的安裝

sudo pip install redis
然後進行簡單的配置

broker_url = 'redis://localhost:6379/0'
url的格式為:

redis://:password@hostname:port/db_number
url scheme 後的所有欄位都是可選的,並且預設為 localhost 的 6379 埠,使用資料庫 0。我的配置是:

broker_url  = 'redis://localhost:6379/15'
使用celery實現非同步任務主要包含三個步驟:

1、建立乙個celery例項

2、啟動celeryworker

3、應用程式呼叫非同步任務

task1.py

-*- coding:utf-8 -*-

import time

from celery import celery

from config import broker_url,celery_result_backend

broker=broker_url, #broker_url = 'redis://localhost:6379/15'

backend=celery_result_backend) #celery_result_backend = 'redis://localhost:6379/15'

defsendmessage

(message):

#任務print message

time.sleep(2)

return message

在當前目錄下使用下邊的方法來啟動

celery worker -a task1 --loglevel=info
---- **** -----

--- * *** * -- linux-4.2.0-27-generic-x86_64-with-ubuntu-14.04-trusty 2017-11-2816:

17:35-- * - **** ---

-** ---------- [config]

tasks1:

0x7fc4c34ebc10

-** ---------- .> transport:

redis://localhost:6379/

15-** ---------- .> results:

redis://localhost:6379/

15-*** --- * --- .> concurrency:

4 (prefork)

-- ******* ---- .> task events:

off (enable -e to monitor tasks in this worker)

--- ***** -----

-------------- [queues]

.> celery exchange=celery(direct) key=celery

[tasks]

. task1.sendmessage

[2017-11-28 16:17:35,374: info/mainprocess] connected to redis://localhost:6379/15

[2017-11-28 16:17:35,377: info/mainprocess] mingle: searching for neighbors

[2017-11-28 16:17:36,386: info/mainprocess] mingle: all alone

重新開啟乙個控制台,在當前目錄開啟python控制台,

>>> 

from task1 import sendmessage

>>> sendmessage.delay('hello lijiajia')

51dce370-076c-4222-a5d4-88722c8da630>

在上邊我們從task1.py中匯入了sendmessage任務物件,然後使用delay()方法將任務傳送到訊息中介軟體broker,celery worker程序監控到該任務後,就會進行執行,將視窗切換到worker 視窗,就會看到以下輸出:

[2017-11-28

16:21:11,345: info/mainprocess] received task: task1.sendmessage[51dce370-076c-4222

-a5d4-88722c8da630]

[2017-11-28

16:21:11,347: warning/forkpoolworker-4] hello lijiajia

[2017-11-28

16:21:13,366: info/forkpoolworker-4] task task1.sendmessage[51dce370-076c-4222

-a5d4-88722c8da630] succeeded in

2.01967258751s: 'hello lijiajia'

說明任務已經被排程並執行成功。

我們如果想獲取執行後的結果,可以這樣:

>>> res=sendmessage.delay('i am studying celery')

>>> res.ready() #使用ready()檢視任務是否執行完畢

true

>>> res.get() #使用get()獲取任務結果。

u'i am studying celery'

上邊我們是在python 環境中呼叫任務,事實上我們通常在應用程式中呼叫任務

client.py

-*- coding:utf-8 -*-

from task1 import sendmessage

#非同步任務

sendmessage.delay('i am lijiajia')

print

'hello ,i am lijiajia'

執行命令python client.py,雖然任務函式 sendmessage需要兩秒才返回結果,但他由於是非同步任務,不會阻塞當前的任務環境,因此主程式會往下執行print語句,列印出結果

worker中輸出:

[2017-11-28

16:31:57,444: info/mainprocess] received task: task1.sendmessage[70b97e6b-62ac-40bc-ad0a-1e0d67a8c706]

[2017-11-28 16:31:57,445: warning/forkpoolworker-4] i am lijiajia

[2017-11-28

16:31:59,448: info/forkpoolworker-4] task task1.sendmessage[70b97e6b-62ac-40bc-ad0a-1e0d67a8c706] succeeded in 2.00329515897s: 'i am lijiajia'

列印結果:

$ python client.py

hello ,i am lijiajia

celery學習筆記

ubuntu系統下安裝指令 安裝rabbitmq sudo apt get install rabbitmq server 安裝celery sudo easy install celery 注意 這裡我開始採用官方文件的指令 pip install celery不過發現用不了,執行後面的指令提示不...

celery學習筆記

1.windows下 啟動redis命令 redis server redis.windows.conf 如果出現 5512 23 dec 16 53 14.121 creating server tcp listening socket 127.0.0.1 6379 bind no error 這...

Celery學習筆記(一)

在學習celery之前,我先簡單的去了解了一下什麼是生產者消費者模式。在實際的軟體開發過程中,經常會碰到如下場景 某個模組負責產生資料,這些資料由另乙個模組來負責處理 此處的模組是廣義的,可以是類 函式 執行緒 程序等 產生資料的模組,就形象地稱為生產者 而處理資料的模組,就稱為消費者。單單抽象出生...