apscheduler定時任務,異常重新執行任務

2022-07-20 11:03:11 字數 2074 閱讀 6445

from apscheduler.schedulers.blocking import

blockingscheduler

import

datetime

from apscheduler.events import

event_job_error, event_job_executed

deftest_1(a, b):

print

(a, b)

deftest_2(a, b):

print('

*'*16)

print

(a) c =0

#修改c的值,結束異常

if datetime.datetime(2020, 5, 26, 17, 19, 30) c = 1

print(b/c)

defjob_listener(event):

job =sched.get_job(event.job_id)

args =job.args

#正常結束任務

ifnot

event.exception:

#恢復原先的任務定時時間

sched.reschedule_job(event.job_id, trigger='

cron

', hour='

00', minute='

10', second='00'

)

print('

*'*20,'

成功', '

*'*20)

for job in

sched.get_jobs():

print

(job.name)

print

(job.trigger)

else

:

#計算當前時間5秒後的時間

next_datetime = datetime.datetime.now() + datetime.timedelta(seconds=5)

#修改出現異常的任務的定時,重新計算下次執行時間,本例為5秒後

sched.reschedule_job(event.job_id, trigger='

cron

', hour=next_datetime.hour, minute=next_datetime.minute, second=next_datetime.second)

msg = f"

jobname=|jobtrigger=|errcode=|exception=|traceback=|scheduled_time="if

__name__ == "

__main__":

service = 1seach_date_list = 2job_defaults =

#建立定時任務例項

sched =blockingscheduler()

sched.configure(job_defaults=job_defaults)

#新增任務1

sched.add_job(test_1,args=(service, seach_date_list,), trigger='

cron',

hour='

14', minute='

37', second='

00', id="

out_warehouse_order")

#新增任務2

sched.add_job(test_2,args=(service, seach_date_list,), trigger='

cron',

hour='

17', minute='

19', second='

00', id='

sale_after')

#建立監聽,任務出錯和任務正常結束都會執行job_listener函式

sched.add_listener(job_listener, event_job_error |\

event_job_executed)

#開始定時任務

sched.start()

apscheduler執行定時任務框架

最簡單用法 匯入模組 from apscheduler.schedulers.blocking import blockingscheduler from datetime import datetime 建立物件 scheduler blockingscheduler 建立定時任務 觸發器為 in...

使用apScheduler執行定時任務

從這篇博文可以了解到apscheduler的詳細解釋,也很簡潔優雅,改寫後可以直接用於生產環境!從這篇博文可以了解cron表示式的詳細解釋 coding utf 8 from apscheduler.schedulers.blocking import blockingscheduler impor...

APScheduler執行定時任務 簡單使用

本例目的 每天17 19 07列印 hello scheduler 版本3.3.1 tar zxvf 原始碼包名安裝 python setup.py installfrom apscheduler.schedulers.blocking import blockingscheduler schedu...