Faust python分布式流式處理框架

2022-09-06 11:51:23 字數 2370 閱讀 3013

faust是用python開發的乙個分布式流式處理框架。在乙個機器學習應用中,機器學習演算法可能被用於資料流實時處理的各個環節,而不是僅僅在推理階段,演算法也不僅僅侷限於常見的分類回歸演算法,而是會根據業務需要執行乙個十分差異化的任務, 例如:在我們的時序異常檢測應用中, 前處理階段的變點檢測演算法。這就要求流處理框架除了具備進行常規的轉換聚合操作之外,可以支援更加強大的任意自定義邏輯和更加複雜的自定義狀態,能夠更好地與原生的python演算法**緊密結合在一起。在主流的flink, spark streaming不能滿足我們的個性化需求時, faust為我們提供了乙個選擇.

本文將對faust框架的主要功能進行概要描述。

faust以kafka作為資料傳輸和自組織管理的媒介,可以直接在faust應用中定義kafka主題。

table是faust中的分布式鍵值對資料表,可用於儲存流處理過程中的中間狀態。

agent是資料處理流中的乙個基本處理單元,通過從kafka中攝取指定topic中的資料,並進行相應的處理。

可定義事件處理後的額外操作,比如推送告警等。乙個sink可以是乙個callable、非同步callable、另外乙個主題、另外乙個agent等等。

**函式

非同步**

另外乙個topic

另外乙個agent

stream是乙個無限的非同步可迭代物件,從topic中消費資料。

stream的常規使用為:

也可以自己建立乙個stream:

乙個stream可以有任意多個處理器**。

每個faustworker會啟動乙個kafka consumer消費資料。如果兩個agent消費了相同的主題,那麼兩個agent會分別受到相同的訊息,每次訊息被回執,那麼引用級數-1,當引用計數為0時,consumer就可以提交偏移量了。

groupby

對流進行重新分割槽。新的流會使用乙個新的中間主題,並以相應的字段作為鍵,這個新的流是agent最終迭代的流。

流分割槽的關鍵字不僅可以是資料中的字段,也可以是乙個callable。

take

快取資料。

through

將流推送到乙個新的topic,並迭代新的topic裡的資料.

filter

過濾操作.

agents迭代streams, streams迭代channels.

model

model用來描述資料結構, 例如:

class point(record, serializer='json'):

x: int

y: int

匿名agent

匿名agent不顯示地使用乙個topic,而是自己建立topic,在定義好訊息型別後,只需直接向該agent傳送相應地訊息即可.

await my_agent.send(key=point(x=10, y=20), value=point(x=30, y=10))

schema

定義鍵值的型別和序列化反序列化器

collections

model中的乙個field可以是乙個其他型別資料的列表.

from typing import list

import faust

class user(faust.record):

accounts: list[account]

table是faust中的分布式記憶體資料表,使用kafka的changelog topic作為後端進行持久化和容錯.

table的修改只能在流操作只能進行, 否則會報錯.

co-partitioning tables and streams

table的任何鍵的資料只能存在於一台主機上.有狀態的流處理要求table和stream協同分割槽,即同一臺主機處理的流和table必須共享相同的分割槽.因此在操作table的流迭代中需要對流重新分割槽.

如果要進行的計算分別以兩個不太的字段分組,則應使用兩個不同的agent, 分別groupby.

window的定義

可以定義window使用的時間,包括系統時間relativ_to_now(), 當前流的處理時間relative_to_current(),相對資料中的時間欄位relative_to_field().

事件亂序

windowed table可以正確處理亂序, 只要遲到的資料在table的過期時間內.

分布式 分布式鎖

本質是利用redis的setnx 方法的特性來加鎖,setnx 即key不存在則設定key,否則直接返回false,要求在分布式系統中使用同乙個redis服務,以下提供兩種解決方案 1 直接使用redistemplate 這其實並不能完全保證高併發下的安全問題,因為可能在鎖過期之後該執行緒尚未執行完...

分布式 分布式事務

是資料庫執行過程中的乙個邏輯單位,由乙個有限的資料庫操作序列構成。事務的acid四大特性 原子性 atomicity 事務作為乙個整體被執行。一致性 consistency 從乙個一致的狀態轉換到另乙個一致的狀態。隔離性 isolation 多個事務併發執行時,併發事務之間互相影響的程度。永續性 d...

分布式之分布式事務

被人問到分布式事務,之前學rabbitmq 的時候學到過rabbitmq 高階的事務,因為沒有用過,所有沒有回答好。這裡總結一下。1.單機版事務。事務的四大特性 acid a.原子性 b.一致性 c.隔離性 d.永續性 單機事務可以通過設定事務的隔離級別 參見spring 的事務隔離級別 2.分布式...