Flink維度關聯的幾種思路

2021-10-10 18:42:45 字數 3832 閱讀 9881

在實際生產中,我們經常會有這樣的需求,需要以原始資料流作為基礎,然後關聯大量的外部表來補充一些屬性。例如,我們在訂單資料中,希望能得到訂單收貨人所在省的名稱,一般來說訂單中會記錄乙個省的 id,那麼需要根據 id 去查詢外部的維度表補充省名稱屬性。

在 flink 流式計算中,我們的一些維度屬性一般儲存在 mysql/hbase/redis 中,這些維表資料存在定時更新,需要我們根據業務進行關聯。根據我們業務對維表資料關聯的時效性要求,有以下幾種解決方案:

實時查詢維表是指使用者在 flink 運算元中直接訪問外部資料庫,比如用 mysql 來進行關聯,這種方式是同步方式,資料保證是最新的。但是,當我們的流計算資料過大,會對外部系統帶來巨大的訪問壓力,一旦出現比如連線失敗、執行緒池滿等情況,由於我們是同步呼叫,所以一般會導致執行緒阻塞、task 等待資料返回,影響整體任務的吞吐量。而且這種方案對外部系統的 qps 要求較高,在大資料實時計算場景下,qps 遠遠高於普通的後台系統,峰值高達十萬到幾十萬,整體作業瓶頸轉移到外部系統。

這種方式的核心是,我們可以在 flink 的 map 運算元中建立訪問外部系統的連線。下面以訂單資料為例,我們根據下單使用者的城市 id,去關聯城市名稱,核心**實現如下:

public class order 

public class dimsync extends richmapfunction

public order map(string in) throws exception

pst.close();

return new order(cityid,username,items,cityname);

}public void close() throws exception

}

在上面這段**中,richmapfunction 中封裝了整個查詢維表,然後進行關聯這個過程。需要注意的是,一般我們在查詢小資料量的維表情況下才使用這種方式,並且要妥善處理連線外部系統的執行緒,一般還會用到執行緒池。最後,為了保證連線及時關閉和釋放,一定要在最後的 close 方式釋放連線,否則會將 mysql 的連線數打滿導致任務失敗。

全量預載入資料是為了解決每條資料流經我們的資料系統都會對外部系統發起訪問,以及對外部系統頻繁訪問而導致的連線和效能問題。這種思路是,每當我們的系統啟動時,就將維度表資料全部載入到記憶體中,然後資料在記憶體中進行關聯,不需要直接訪問外部資料庫。

這種方式的優勢是我們只需要一次性地訪問外部資料庫,大大提高了效率。但問題在於,一旦我們的維表資料發生更新,那麼 flink 任務是無法感知的,可能會出現維表資料不一致,針對這種情況我們可以採取定時拉取維表資料。並且這種方式由於是將維表資料快取在記憶體中,對計算節點的記憶體消耗很高,所以不能適用於數量很大的維度表。

我們還是用上面的場景,根據下單使用者的城市 id 去關聯城市名稱,核心**實現如下:

public class wholeload extends richmapfunction catch (exception e) 

}},5,5, timeunit.minutes);//每隔 5 分鐘拉取一次維表資料

}@override

public order map(string value) throws exception

public void load() throws exception

con.close();}}

在上面的例子中,我們使用 scheduledexecutorservice 每隔 5 分鐘拉取一次維表資料。這種方式適用於那些實時場景不是很高,維表資料較小的場景。

優點:實現簡單

缺點:僅支援小資料量維表

適用場景:維錶小,變更頻率低,對變更及時性要求低

通過 distributed cache 分發本地維度檔案到task manager後載入到記憶體關聯。

實現方式

通過env.registercachedfile註冊檔案。

實現richfunction,在open()中通過runtimecontext獲取cache檔案。

解析和使用檔案資料。

優點:不需要外部資料庫

缺點:支援維度資料量比較小,更新需要更改檔案並重啟作業

適用場景:維度資料是以檔案形式,資料量小,更新頻率低。比如:靜態碼表,配置檔案。

實時流與熱儲存上維度資料關聯, 使用 cache 減輕儲存訪問的壓力.

實現方式: 將維度資料匯入熱儲存 redis/tair/hbase/es, 通過非同步 io 查詢熱儲存, 利用 cache 機制將維度資料快取在記憶體:

//最多儲存10000條

.maximumsize(10000)

//過期時間為1分鐘

.expireafterwrite(60, timeunit.seconds)

.build();

整體的實現思路是:我們利用 flink 的 richasyncfunction 讀取 hbase 的資料到快取中,我們在關聯維度表時先去查詢快取,如果快取中不存在這條資料,就利用客戶端去查詢 hbase,然後插入到快取中。

首先我們需要乙個 hbase 的非同步客戶端:

org.hbase

asynchbase

1.8.2

核心**如下:

public class lru extends richasyncfunction

@override

public void asyncinvoke(string input, resultfutureresultfuture) throws exception else

return null;

});}}}

這裡需要特別注意的是,我們用到了非同步 io (richasyncfunction),這個功能的出現就是為了解決與外部系統互動時網路延遲成為系統瓶頸的問題。

我們在流計算環境中,在查詢外部維表時,假如訪問是同步進行的,那麼整體能力勢必受限於外部系統。正是因為非同步 io 的出現使得訪問外部系統可以併發的進行,並且不需要同步等待返回,大大減輕了因為網路等待時間等引起的系統吞吐和延遲問題。

我們在使用非同步 io 時,一定要使用非同步客戶端,如果沒有非同步客戶端我們可以自己建立執行緒池模擬非同步請求。

優點:維度資料不受限於記憶體,支援較多維度資料

缺點:需要熱儲存資源,維度更新反饋到結果有延遲(熱儲存匯入,cache)

適用場景:維度資料量大,可接受維度更新有一定的延遲。

除了上述常見的處理方式,我們還可以通過:

果沒有非同步客戶端我們可以自己建立執行緒池模擬非同步請求。

優點:維度資料不受限於記憶體,支援較多維度資料

缺點:需要熱儲存資源,維度更新反饋到結果有延遲(熱儲存匯入,cache)

適用場景:維度資料量大,可接受維度更新有一定的延遲。

除了上述常見的處理方式,我們還可以通過:

總體來講,關聯維表的方式就以上幾種方式,並且基於這幾種方式還會衍生出各種各樣的解決方案。我們在評價乙個方案的優劣時,應該從業務本身出發,不同的業務場景下使用不同的方式。

Flink 建立DataSet的幾種方式

flink 建立dataset的幾種方式 package woaixuexi import aa.myclass import org.apache.flink.api.scala.import org.apache.flink.configuration.configuration author ...

oracle update關聯表的思路總結

1 其中最普通的是update t1 set b select b from t2 where t1.a t2.a 但是,要注意空值的影響,如果怕空值的影響,要寫成 update t1 set tname select sname from t2 where t1.id t2.id where ex...

oracle update關聯表的思路總結

1 其中最普通的是update t1 set b select b from t2 where t1.a t2.a 但是,要注意空值的影響,如果怕空值的影響,要寫成 update t1 set tname select sname from t2 where t1.id t2.id where ex...