使用flink實現讀取並放入mysql中

2021-10-01 19:29:49 字數 1902 閱讀 1378

//建立環境

executionenvironment env = executionenvironment.

getexecutionenvironment()

; dataset text = env.

readtextfile

("檔案路徑");

//讀取檔案,對檔案中的單詞進行計數

dataset> counts =

text.

flatmap

(new tokenizer()

)// group by the tuple field "0" and sum up tuple field "1"

.groupby(0

).sum(1)

;*//使用flink進行儲存mysql時,必須使用row的泛型進行儲存,這裡將處理的結果進行轉化

dataset map = counts.

map(new mapfunction

, row>()});

//mysql連線

string driverclass =

"com.mysql.cj.jdbc.driver"

; string dburl =

"jdbc:mysql://localhost:3306/資料庫?characterencoding=utf8&usessl=false&servertimezone=utc&rewritebatchedstatements=true "

; string usernmae =

"使用者名稱"

; string password =

"密碼"

; string sql =

"insert into 表名 (name,age) values (?,?)"

;//將資料寫入mysql資料庫

map.

output

(jdbcoutputformat.

buildjdbcoutputformat()

.setdrivername

(driverclass)

.setdburl

(dburl)

.setusername

(usernmae)

.setpassword

(password)

.setquery

(sql)

.finish()

);//這個必須要執行,否則看不到結果

1.將flink  job部署,將jar包上傳即可,可上傳名字相同的jar,他們會有不同的id

2.檢視jar的id

訪問http://localhost:8081/jars 位址即可看到

3.使用postman 進行測試

entryclass 定位到要執行的job類,programargs 請求的引數。

返回jobid說明任務執行成功,資料庫可看到結果。請求位址使用的是id進行訪問的,官網說明使用jar id。請求的引數在官網的api有詳細說明。

移步flink rest api檢視詳細資訊

NoIFS 使用shell讀取cfg並拆成陣列

需要解析乙個文字,裡面描述了客戶分割槽引數,形式如下 imageconfig.cfg env 1024 2048 env.fex kernel 2049 33791 boot.img recovery 33792 164863 recovery.img玩法就是把每一行拆成單獨的資料,送入對應陣列中,...

Jquery讀取xml並實現省市級聯

首先這是本人所使用的xml文件 provice city town.xml 位置在根目錄下的xml資料夾下。下面的是js 建議放在單獨js檔案中,本人暫時將放在html中。function city change function else function bindprovice 最下面的是htm...

使用NPOI讀取Excel資料並寫入SQLite

首先,我們來建乙個資料庫,我們就叫hello.db 不一定是db字尾,你可以sqlite,sqlite3,db3 都可以作為識別,然後往裡面建乙個空的 如下圖所示 然後建乙個excel 往 裡面寫入一些資料,我這裡只是demo形式,可以根據自己的實際情況,稍作修改 然後開始建乙個新的專案,我這裡用的...