celery的task任務結果在redis儲存亂碼

2021-10-09 14:43:04 字數 1510 閱讀 3653

用redis做backend儲存celery非同步任務的結果,

可以看到任務成功存進去了,

127.0.0.1:6379[1]> keys *

celery-task-meta-31a89019-4106-4685-8a71-0075d976d91e

不過用這個key去查詢的時候可以看到結果亂碼:

127.0.0.1:6379[1]> get celery-task-meta-31a89019-4106-4685-8a71-0075d976d91e

�}q(ustatusqusuccessqu tracebackqnuresultqchildrenq]u.

但是如果用官方asyncresult提供的api是可以正常取到結果的。

python**:

def get_result(task_id):

res = asyncresult(task_id)

while true:

print(res.status)

if res.ready():

break

time.sleep(1)

print(res.result)

print(res.get()

順著官方的res.status一路追蹤原始碼,可以看到取資料的方法也是從redis中用key拿到原始資料,只不過在做了一些解碼和序列化相關的操作。

拿到status的呼叫棧的過程大致是:state()->_get_task_meta()-get_task_meta()->_get_task_meta_for()->decode_result()->decode()->loads()->pickle_loads()

最終結果就是如果用的json序列化,會呼叫kombu庫中serialization模組的pickle_loads方法。

所以我們也直接用pickle_loads這個方法就可以正常拿到redis中的資料。

**如下:

# -*- coding: utf-8 -*-

import time

from kombu.serialization import pickle_loads

from redis import redis

redis_client = redis(db=1)

res= redis_client.get('celery-task-meta-31a89019-4106-4685-8a71-0075d976d91e')

#亂碼結果

print "亂碼結果:%s"%res

res = pickle_loads(res)

#正常結果

print "正常結果:%s"%res

輸出:

亂碼結果:�}q(ustatusqusuccessqu    tracebackqnuresultqchildrenq]u.

正常結果:

Celery任務佇列

使用任務佇列作為分發任務的機制。乙個任務佇列的輸入是一組被稱為任務的工作單元。專用的工人會持續監聽任務佇列來等待完成新的工作。celery通過訊息進行通訊,通常使用中間人作為客戶端和工人 workers 間的媒介。為了初始化一項任務,客戶端會新增一條訊息到佇列中,然後中間人傳遞這條訊息給乙個work...

celery 任務模組

每天不知道忙啥,到了這個點才開始學習 1.新建python檔案 from future import absolute import 絕對路徑的匯入 from celery import celery from django.conf import settings import os 設定系統的環...

celery 執行celery定時任務

場景 在虛擬機器上執行 python django celery redis 的定時任務 可能遇到的問題 如果在執行過程中,定時任務突然退出,並報以下錯誤,錯誤顯示,沒有許可權訪問一些目錄檔案 解決方案 1 關閉當前redis服務 在step 3中有描述如何關閉 2 以root使用者執行啟動redi...