Flink原始碼解析 資料來源讀入原理

2021-08-20 18:23:30 字數 2772 閱讀 3758

flink是分布式平行計算框架,所以flink程式內在是分布和並行的,其並行的特性可在下述**片段體現:

val env = executionenvironment.getexecutionenvironment

val text = env.readtextfile(inputpath)

val data = text.flatmap(_.split(" "))

var count1 = 0

val counter1 = data.map

counter1.print()

其中inputpath檔案中儲存的內容為以下格式:

a

b cd

該段**輸出為:

(b,1)

(c,2)

(a,1)

(d,1)

由以上簡單示例可知,flink程式在讀入文字時是並行讀入的,提交flink job後,每一行資料為dataset中的乙個資料單元,由某乙個taskmanager中的某乙個slot進行計算,因此常規的累加操作是針對乙個slot中需要處理的資料,無法對整體的資料進行累加操作。但是在程式設計過程中,發現一件很奇怪的事:如果不使用env.readtextfile讀取資料,而使用env.fromelements讀取資料,程式可以正常進行計數,其輸出結果為:

(a,1)

(b,2)

(c,3)

(d,4)

於是筆者檢視了flink的原始碼,發現fromelements的具體實現是這樣的:

def fromelements[t: classtag : typeinformation](data: t*): dataset[t] =
它呼叫了fromcollection建立的datasource,而fromcollection的具體實現是這樣的:

def fromcollection[t: classtag : typeinformation](

data: iterable[t]): dataset[t] =

readtextfile的具體實現是這樣的:

def readtextfile(filepath: string, charsetname: string = "utf-8"): dataset[string] =
比較fromcollection方法和readtextfile方法的具體實現,可以看出其大致過程其實基本一致,無非就是new乙個datasource然後返回,但是可以看出其構造datasource的引數型別有些不同,具體哪個引數型別有問題,我們可以繼續觀察datasource類的建構函式,如下:

public datasource(executionenvironment context, inputformatinputformat, typeinformationtype, string datasourcelocationname) 

this.inputformat = inputformat;

if (inputformat instanceof nonparallelinput)

}

ok,flink原始碼跟蹤到這基本要水落石出了,我們可以看出,建構函式中寫了乙個if判斷,如果inputformatnonparallelinput介面的乙個例項,則讀取資料的過程並行度設定為1。fromelements方法中的輸入型別引數為collectioninputformat,檢視該類實現了哪些介面,如下:

public class collectioninputformatextends genericinputformatimplements nonparallelinput {}
由此可見,fromelements方法之所以能夠對整體進行計數,是由於其底層實現將該過程的並行度設定為1。

綜上,我們如果需要使用readtextfile方法對資料進行有序讀取、計數,則可以根據flink原始碼中fromelements方法的實現思路,將讀取資料操作的並行度設定為1。當資料量龐大時,這樣的做法會可能會導致計算從資料來源處開始癱瘓,因此最好不要採用該種方法,**測試可以考慮採用該種方法。

那麼還有什麼方法可以在並行環境下對整體資料進行計數呢?可以參照很多種語言中都有的static靜態變數的思路,靜態變數可以在它的作用域內,被所有類例項共享。因此可以考慮將用於計數的count變數設定為被整個flink程式共享的乙個變數,保證在任意taskmanager的任意的slot中都是對同乙個count變數進行更新。一開始考慮使用廣播變數將用於計數的count變數廣播到每乙個並行度中,但廣播變數必須是dataset[t]型別的運算元,並且每乙個slot只能對廣播變數進行訪問,暫沒有找到可以修改廣播變數的方法,因此這個處理的想法夭折了。目前可考慮的方法只有設定並行度或者通過文字預處理達到計數目的,若有新的想法會在部落格更新,也歡迎討論。

Flink 原始碼解析

1 flink 原始碼解析 原始碼編譯執行 2 flink 原始碼解析 專案結構一覽 3 flink 原始碼解析 local 模式啟動流程 4 flink 原始碼解析 standalone session 模式啟動流程 5 flink 原始碼解析 standalone session cluster...

List原始碼解析之ArrayList原始碼分析

arraylist是基於陣列實現的,是乙個動態擴充套件的陣列,容量可自動增長。arraylist是非執行緒安全的,只能在單執行緒環境下使用,多執行緒環境考慮使用collections.synchronizedlist list list 函式返回乙個執行緒安全的arraylist類,也可以使用con...

Flink 原始碼解析 專案結構一覽

flink 原始碼專案結構一覽 1 flink 從0到1學習 apache flink 介紹 2 flink 從0到1學習 mac 上搭建 flink 1.6.0 環境並構建執行簡單程式入門 3 flink 從0到1學習 flink 配置檔案詳解 4 flink 從0到1學習 data source...