celery實現微信小程式訊息推送

2021-09-23 22:39:50 字數 4435 閱讀 2051

使用如下目錄結構:

celery.py建立應用例項

config.py引數配置檔案

tasks.py執行任務檔案(生產者檔案)

from __future__ import absolute_import

from celery import celery

'celery_proj'

, include=

['celery_proj.tasks'])

'celery_proj.config'

)if __name__ ==

'__main__'

:)

from __future__ import absolute_import

from datetime import timedelta

celery_result_backend =

'redis:'

broker_url =

'redis:'

celery_timezone =

'asia/shanghai'

celerybeat_schedule =

,}

from __future__ import absolute_import

import requests

import sys

".."

)#呼叫封裝好的redis資料庫記錄推送狀態 已推送?未推送? 可以通過設定key的過期時間設定最小推送時間間隔

from model.redis_models import redis_db

from model.ord_models import

*import datetime

template_id =

'wjuawz1dghzjcok0w2bevwkzdorxitbnze5gufher9m'

'wx64118b44bbd3bfa3'

secret_key =

defget_access_token

:try

: payload =

req = requests.get(

'', params=payload,

timeout=3,

verify=

false

) access_token = req.json(

).get(

'access_token',""

)print

('access_token'

, access_token)

return access_token

except exception as e:

print

(e)def

push

( openid,

formid,

item_info:

dict

, access_token,

template_id=template_id)

:'''推送'''

data ='.

format

( item_info.get(

'item_pass_id'

,'未知'))

,"data":,

'keyword2':,

'keyword3':}

,"emphasis_keyword":''

} push_url =

''.format

( access_token)

result = requests.post(push_url, json=data, timeout=

3, verify=

false

)return result

@ap.task

defpullmsg

(delta=60)

:'''推送訊息'''

access_token = get_access_token(

) now = datetime.datetime.now(

) timedel = datetime.datetime.now(

)+ datetime.timedelta(minutes=delta)

# 搜尋所有快要開始的活動

objlist = db.session.query(ordobject.obj_id)

.filter

( ordobject.startord_time <= timedel,

ordobject.startord_time >= now)

# 搜尋相關訂單

ordlist = db.session.query(orderinfo.ordnum)

.filter

( orderinfo.objid.in_(objlist)

)# 查詢可推送的使用者和例項 此處為專案/活動

# 搜尋所有符合的使用者,預定資訊,專案名稱,位址, 並且逐個推送

# all (1, 'jhon', 'xixixi', 'bbbui0egbjy1zhbyw2khdufwvjje', datetime.datetime(2019, 5, 5, 14, 0))

result = db.session.query(

item.item_id,

item.item_name,

item.item_address,

item.pass_id,

order.ord_usid,

db.func.

min(

ordobject.startord_time)

).join(

orderinfo,

order.ord_num == orderinfo.ordnum)

.outerjoin(ordobject)

.filter

( ordobject.obj_id == orderinfo.objid,

ordobject.startord_time <= timedel,

ordobject.startord_time <= now)

.group_by(

item.item_id,

order.ord_usid)

.all()

# 按狀態推送 已推送的不推送

status_db = redis_db(

'pushmsg_status'

) formid_db = redis_db(

'formid'

)if result and access_token:

for i in result:

item_id, openid = i[0]

, i[4]

# 檢查是否已推送

response = status_db.check_pushmsg_status(item_id, openid)

if response:

# 已推送 下個

continue

else

:# 未推送

# 記錄狀態為已推送 假設未推送成功也最少24小時後再試

response = status_db.set_pushmsg_status(item_id, openid)

formid = formid_db.get_formid(openid)

if formid:

# 推送 無法推送的情況,需要 改進redis過期時間

item_info =

.. :'

.format

( i[5]

.year,

i[5]

.month,

i[5]

.day,

i[5]

.hour,

i[5]

.minute)

}return push(openid, formid, item_info, access_token)

else

:# 無法推送

return

'without formid!'

return

'no result for!'

else

:return

'no result!'

微信小程式傳送 模板訊息

實現步驟 1.先在前端獲取fromid,openid 2.將fromid,openid存入對應使用者的資料庫 3.下來就是寫模板訊息,查詢對應使用者的fromid和openid,將key值對應寫上 4.獲取access token,儲存時間7200 5.呼叫模板方法即可 前端 js獲取fromid存...

微信小程式客服自動回覆訊息後端實現

註冊賬號並授權 ngrok authtoken yourtoken 如果不註冊就會過期 8h之後生成的鏈結即會失效 本地埠內網穿透 ngrok http 8553 訪問 重新啟動之後更新網域名稱 可 ngrok http 8553 subdomain 4c4e91f7 如何使用ngrok生成固定的u...

微信小程式 幾步實現模版訊息的傳送

1.首先先建立乙個簡單的wxml頁面,包括乙個簡單的form表單 特別注意必須加上report submit true 2.獲取使用者 要傳送的使用者 的openid js 生命週期函式 監聽頁面載入 onload function options success function res php後...