簡易版celery的實現

2021-09-14 01:53:25 字數 4417 閱讀 2790

最近學習了下,celery原始碼,看了一點點皮毛後,自己動手寫了個簡易的celery,通過redis作為broker,沒有複雜的路由匹配規則,佇列和任務之間乙個直接匹配的簡易規則。這裡對專案簡單的記錄下。

****** 是celery類所在位置,具體實現了celery的啟動,載入配置檔案,任務裝飾器;

utils 下base是任務類的實現,任務的發布方法,任務繫結celery例項方法;

redisbase 是連線redis資料的類,主要是往佇列裡插入資料;

example 是我自己寫的用例

# -*- coding: utf-8 -*-

'''******celery @2019 03 26

author xuxiaolong

redis 作為broker

redis 作為結果返回result_backend

'''from time import sleep

from utils.base import basetask

import json

import multiprocessing

from importlib import import_module

from utils.redisbase import redishelper

class celery(object):

def __init__(self, name):

self.name = name

self.queuedic = dict()

self._task = dict()

def start(self):

'''啟動方法

從redis list 中獲取message ,並找到對應的任務例項去執行,通過呼叫task.runtask()方法執行

'''#_redis = redishelper(self.host,self.port,self.db,self.password)

#為每乙個佇列開啟乙個程序,執行對應的任務

queue = set([v["queue"] for v in self.taskdic.values()])

# pool = multiprocessing.pool(processes=len(queue))

def runloop(queue):

_redis = redishelper(host=self.host, port=self.port, db=self.db, password=self.password)

while true:

retjson = _redis.lpop(queue)

print retjson

if retjson is none:

sleep(5) #加了個休眠,避免terminal 刷的太快

continue

message = json.loads(retjson)

#print message

#print self.taskdic

for fun in self.queuedic[queue]:

#print message["name"] == fun.split(".")[-1:][0]

if message["name"] == fun.split(".")[-1:][0]:

print self._task[message["name"].encode('utf-8')](*message["args"],**message["kwargs"])

for q in queue:

runloop(q)

#pool.close()

#pool.join()

def config_from_object(self, include=none):

'''獲取基本的配置資訊'''

self.config = import_module(include)

# redis_url 配置redis位址資訊

redis,hostpass, portdb = self.config.redis_url.split(':')

self.password, self.host = hostpass.split('@')

self.port, self.db = portdb.split('/')

# celery_route配置路由資訊

self.taskdic = self.config.celery_route

for k,v in self.taskdic.items():

if v["queue"] not in self.queuedic.keys():

self.queuedic[v["queue"]] = [k]

else:

#任務裝飾器

def task(self,*args,**kwargs):

#print args,kwargs

def create_inner_task():

def create_tasks(func):

ret = self.create_task_fromfun(func)

return ret

return create_tasks

return create_inner_task()

#通過函式建立任務物件

def create_task_fromfun(self,func):

tasks = type(func.__name__, (basetask,), dict())()

#print isinstance(tasks,basetask)

if tasks.name not in self._task:

self._task[tasks.name] = tasks.run

tasks.bind(self)

return tasks

這裡task裝飾器的,裝飾的是任務函式,原理是,將任務函式,轉換成乙個任務物件的例項,也就是basetask,並把方法體,賦給basetask的run方法,利用的是type元類的來實現的,

tasks = type(func.name, (basetask,), dict())()

這裡有個小地方需要注意下,任務函式需要轉成非繫結的方法,也就是通過staticmethod方法,裝成靜態方法;

start 方法,可以理解成監聽函式,是乙個無限迴圈,不斷的從配置的佇列中讀取資料,然後,交給tasks去執行,也就是呼叫我們上面說的,run方法

# -*- coding: utf-8 -*-

import json

from redisbase import redishelper

class basetask(object):

'''任務類的基類,所有任務的拓展都繼承此類

'''def runtask(self,*args,**kwargs):

'''任務執行的方法

'''messagedic = dict()

messagedic["name"] = self.name

messagedic["args"] = args

messagedic["kwargs"] = kwargs

print self.run(*args,**kwargs)

if k.split(".")[-1:][0] == self.name:

print _redis.lpush(v["queue"],json.dumps(messagedic))

''''''

runtask 是我們的任務的發布函式,通過在客戶端呼叫,往指定佇列插入資料,服務端讀取執行

redisbase 就不介紹了,沒啥可說的,就是運算元據庫的

exampe 是示例 :

celery.py檔案**:

from ****** import celery

# 載入配置

def add(x,y):

#print(x+y)

return x+y

if __name__ == '__main__':

這裡例項化我們的celery,並且實現具體的任務函式,記得一定要加上task裝飾器,和celery一樣 的,

celery.py 是在服務端執行的,它會去具體執行任務;

config.py

是配置檔案,安裝這個規則寫就行,示例如下:

redis_url=「redis//?****:8004/0」

celery_route=}

run.py:

from example.celery import add

if __name__ == '__main__':

print(add.runtask(1,2))

這是在客戶端執行的,具體的任務發布,通過runtask來發布

簡易版redux實現

redux其實只有幾個重要的api,getstate,subscribe和dispatch,getstate用來獲取狀態,subscribe監聽狀態的改變,dispatch派發事件改變狀態,下面就來看下。首先是createstore,它接收三個引數,分別是reducer函式,初始狀態值,還有就是中介...

簡易版的Tween

與之前的tween 類似,只是這個為簡潔版 動畫處理器 緩動效果 param obj dom物件 param prop 要改變的樣式屬性,如left 填opacity時,1表示不透明,0表示完全透明 param v1 初始值 不帶px param v2 終止值 不帶px param opt obje...

用C實現簡易版掃雷

用兩個盤實現該遊戲 乙個是雷盤,乙個是展示盤 就是玩遊戲的盤 該 可以實現以下幾個功能 1.列印雷盤和展示盤。隨機產生雷的位置 2.保證第一次掃雷不會被炸死。3.點一下可以展開一片。4.判斷是否贏。注意 要注意兩個盤的座標和下標。還有,呼叫函式和傳參。test.c include include i...