python非同步任務佇列示例

2022-10-06 05:27:08 字數 1534 閱讀 7525

很多場景為了不阻塞,都需要非同步**機制。這是乙個簡單的例子,大家參考使用吧

複製** **如下:

#!/usr/bin/env python

# -*- coding: utf-8 -*-

import logging

import queue

import threading

def func_a(a, b):

return a + b

def func_b():

pass

def func_c(a, b, c):

return a, b, c

# 非同步任務佇列

_task_queue = queue.queue()

def async_call(function, callback, *args, **kwargs):

_task_queue.put()

def _task_queue_consumer():

"""非同步任務佇列消費者

"""while true:

try:

task = _task_queue.get()

function = task.get('function')

callback = task.get('callback')

args = task.get('args')

kwargs = task.get('kwargs')

&程式設計客棧nbsp;        程式設計客棧;   try:

if callback:

callback(function(*args, **kwargs))

except exception as ex:

if callback:

&nbswww.cppcns.comp;            callback(ex)

finally:

&nbswww.cppcns.comp;  _task_queue.task_done()

except exception as ex:

logging.warning(ex)

def handle_result(result):

print(type(result), result)

if __name__ == '__main__':

t = threading.thread(target=_task_queue_consumer)

t.daemon = true

t.start()

async_call(func_a, handle_result, 1, 2)

async_call(func_b, handle_result)

async_call(func_c, handle_result, 1, 2, 3)

async_call(func_c, handle_result, 1, 2, 3, 4)

_task_queue.join()

本文標題: python非同步任務佇列示例

本文位址: /jiaoben/python/107058.html

DelayQueue延時佇列示例

delayqueue是乙個無界阻塞佇列,只有在延遲期滿時才能從中提取元素。該佇列的頭部是延遲期滿後儲存時間最長的delayed 元素。快取系統的設計,快取中的物件,超過了空閒時間,需要從快取中移出 任務排程系統,能夠準確的把握任務的執行時間。我們可能需要通過執行緒處理很多時間上要求很嚴格的資料,如果...

celery 非同步任務佇列

celery是基於python開發的分布式任務佇列。它可以讓任務的執行完全脫離主程式,甚至可以被分配到其他主機上執行。我們通常使用它來實現非同步任務 async task 和定時任務 crontab 它的架構組成如下圖 celery 4.x以上版本不安裝該模組,新增任務時會報錯 使用celery包含...

Python中的非同步任務佇列 arq

引言 最近在用 sanic 寫東西,所有涉及到io阻塞的 都需要用 aio 的模組,好在近年來 asyncio 生態圈發展的還算不錯,該有的都有 近期業務中 登入 註冊 業務涉及的很複雜 涉及到邀請 需要解鎖 傳送簡訊等操作,想來這麼個模組整的很繁瑣,以後加個滑動驗證那還了得。於是乎,想整乙個類似於...