Flink 實時計算 維表 Join 解讀

2021-10-19 09:33:36 字數 3061 閱讀 4889

flink 1.9 版本可以說是乙個具有里程碑意義的版本,其內部合入了很多 blink table/sql 方面的功能,同時也開始增強 flink 在批處理方面的能力,真的是向批流統一的終極方向開始前進。flink 1.9 版本在 8.22 號也終於發布了。本文主要介紹學習 flink sql 維表 join,維表 join 對於sql 任務來說,一般是乙個很正常的功能,本文給出**層面的實現,和大家分享使用者如何自定義 flink 維表。

維表作為 sql 任務中一種常見表的型別,其本質就是關聯表資料的額外資料屬性,通常在 join 語句中進行使用。比如源資料有人的身份證號,人名,你現在想要得到人的家庭位址,那麼可以通過身份證號去關聯人的身份證資訊,就可以得到更全的資料。

維表可以是靜態的資料,也可以是動態的資料(比如定時更新的資料),一般會通過特定的主鍵來進行關聯。它可以在 mysql 中進行儲存,也可以在 nosql 資料庫中進行儲存,比如 hbase等。

flink 1.9 中維表功能**於新加入的blink中的功能,如果你要使用該功能,那就需要自己引入 blink 的 planner,而不是引用社群的 planner。由於新合入的 blink 相關功能,使得 flink 1.9 實現維表功能很簡單,只要自定義實現 lookupabletablesource 介面,同時實現裡面的方法就可以進行,下面來分析一下 lookupabletablesource的**:

public

inte***ce

lookupabletablesource

extends

tablesource

isasyncenabled 方法主要表示該錶是否支援非同步訪問外部資料來源獲取資料,當返回 true 時,那麼在註冊到 tableenvironment 後,使用時會返回非同步函式進行呼叫,當返回 false 時,則使同步訪問函式。

可以看到 lookupabletablesource 這個介面中有三個方法

getlookupfunction 方法返回乙個同步訪問外部資料系統的函式,什麼意思呢,就是你通過 key 去查詢外部資料庫,需要等到返回資料後才繼續處理資料,這會對系統處理的吞吐率有影響。

getasynclookupfunction 方法則是返回乙個非同步的函式,非同步訪問外部資料系統,獲取資料,這能極大的提公升系統吞吐率。具體是否要實現非同步函式方法,這需要使用者自己判定是否需要對非同步訪問的支援,如果同步方法的吞吐率已經滿足要求,那可以先不用考慮非同步的實現情況。

getlookupfunction 會返回同步方法,這裡你需要自定義 tablefuntion 進行實現,tablefunction 本質是 udtf,輸入一條資料可能返回多條資料,也可能返回一條資料。使用者自定義 tablefunction 格式如下:

public

class

mylookupfunction

extends

tablefunction

public

void

eval

(object.

.. paramas)

}

open 方法在進行初始化運算元例項的進行呼叫,非同步外部資料來源的client要在類中定義為 transient,然後在 open 方法中進行初始化,這樣每個任務例項都會有乙個外部資料來源的 client。防止同乙個 client 多個任務例項呼叫,出現執行緒不安全情況。

eval 則是 tablefunction 最重要的方法,它用於關聯外部資料。當程式有乙個輸入元素時,就會呼叫eval一次,使用者可以將產生的資料使用 collect() 進行傳送下游。paramas 的值為使用者輸入元素的值,比如在 join 的時候,使用 a.id = b.id and a.name = b.name, b 是維表,a 是使用者資料表,paramas 則代表 a.id,a.name 的值。

getasynclookupfunction 會返回非同步訪問外部資料來源的函式,如果你想使用非同步函式,前提是 lookupabletablesource 的 isasyncenabled 方法返回 true 才能使用。

使用非同步函式訪問外部資料系統,一般是外部系統有非同步訪問客戶端,如果沒有的話,可以自己使用執行緒池非同步訪問外部系統。至於為什麼使用非同步訪問函式,無非就是為了提高程式的吞吐量,不需要每條記錄訪問返回資料後,才去處理下一條記錄。

非同步函式格式如下:

public

class

myasynclookupfunction

extends

asynctablefunction

public

void

eval

(completablefuture

> future, object.

.. params)

}

維表非同步訪問函式總體和同步函式實現類似,這裡說一下注意點:

外部資料來源非同步客戶端初始化。如果是執行緒安全的(多個客戶端一起使用),你可以不加 transient 關鍵字,初始化一次。否則,你需要加上 transient,不對其進行初始化,而在 open 方法中,為每個 task 例項初始化乙個。

eval 方法中多了乙個 completablefuture,當非同步訪問完成時,需要呼叫其方法進行處理.

為了減少每條資料都去訪問外部資料系統,提高資料的吞吐量,一般我們會在同步函式和非同步函式中加入快取,如果以前某個關鍵字訪問過外部資料系統,我們將其值放入到快取中,在快取沒有失效之前,如果該關鍵字再次進行處理時,直接先訪問快取,有就直接返回,沒有再去訪問外部資料系統,然後在進行快取,進一步提公升我們實時程式處理的吞吐量。

一般快取型別有以下幾種型別:

資料全部快取,定時更新。

lru cache,設定乙個超時時間。

使用者自定義快取。

flink 在 1.9 版本開源出維表功能,使用者可以結合自己的具體需求,自定義的去開發維表。flink 1.9 版本在flink sql方面的開源出很多功能,使用者可以自己選擇具體 planner進行使用,社群的planner、blink的 planner。希望 flink 在未來越來越好。

參考:

實時計算Flink 產品安全

實時計算 flink支援整體全鏈路實時計算的安全。賬號安全分為實時計算賬號安全以及資料儲存賬號安全,下面分別闡述。實時計算涉及到業務安全部分主要包含專案隔離安全以及業務流程安全。下面分別進行闡述。資料安全分為實時計算系統資料安全和業務資料安全,闡述如下。業務資料安全 實時計算本身不負責儲存使用者的業...

實時計算運維相關

切換到kafka後台安裝目錄 檢視所有topic kafka topics.sh list zookeeper xx.xx.xx.xx 2181 檢視所有groupid kafka consumer groups.sh new consumer bootstrap server xx.xx.xx.x...

實時計算Flink 獨享模式系統架構

獨享模式系統架構如下所示。系統架構說明 為了訪問您的vpc內服務,會在建立集群時,在您賬號下申請彈性網絡卡。通過彈性網絡卡,您即可訪問其vpc下所有資源。您的實時計算集群,如果需要訪問公網,可在彈性網絡卡上繫結nat閘道器及彈性公網ip,具體操作步驟,請參考阿里雲官網文件。實時計算在您的賬號下的乙個...