Flink state應用 實現topN

2021-10-22 08:54:47 字數 2671 閱讀 9057

獲取資料源,自定義下沉器本處暫時不贅述,主要是對核心topn的**進行解析

獲取資料流並轉化成物件

datastream

datastream = datastreamsource.

map(value-

> jsonobject.

parseobject

(value,useraction.

class))

;

將亂序資料抽取出來,設定watermark

datastream

timeddata = datastream.

assigntimestampsandwatermarks

(new

useractiont***tractor()

)public

static

class

useractiont***tractor

extends

boundedoutofordernesstimestampextractor

@override

public

long

extracttimestamp

(useraction useraction)

}

過濾出購買行為

datastream

filterdata = timeddata.

filter

(new

filterfunction

})

視窗統計購買數量

datastream

windoweddata = filterdata.

keyby

("itemid").

timewindow

(time.

minutes

(60l)

,time.

minutes

(5l)).

aggregate

(new

countagg()

,new

windowresultfunction()

)//商品購買實體類

public

static

class

itembuycount

//count聚合函式

public

static

class

countagg

implements

aggregatefunction

@override

public long add

(useraction useraction,long acc)

@override

public long getresult

(long acc)

@override

public long merge

(long acc1,long acc2)

}public

static

class

windowresultfunction

implements

windowfunction

}

計算topn

datastream

> topitems = windoweddata.

keyby

("windowend").

process

(new

topnhotitems(3

));

public

static topnhotitems extends

keyedprocessfunction

>

@override

public

void

open

(configuration parameters)

throws exception

@override

public

void

processelement

(itembuycount input,context context, collector

> collector)

throws exception

@override

public

void

ontimer

(long timestamp,ontimercontext ctx,collector

> out)

throws exception

//請空狀態,釋放空間

itemstate.

clear()

; allitems.

sort

(new

comparator

()})

; list

itembuycounts =

newarraylist

<

>()

;for

(int i=

0;i) out.

collect

(itembuycounts);}

}

10.最後把得到的流輸出到自定義的sink中

python 簡單的socket應用(針對TCP)

出處 可以總結為以下五個步驟 1.建立socket套接字 2.繫結ip和port 3.listen使套接字變為被動連線 4.accept等待客戶端連線 5.recv send接受和傳送資料 先做一些準備工作 import socket server socket.socket family,type...

Linux共享目錄 特殊許可權t的應用

建立乙個共享目錄,使其他使用者可以在這個目錄下建立檔案,但是不能刪除別的使用者建立的檔案 建立乙個共享目錄,並設定許可權 root catyuan mkdir test1 root catyuan chmod 777 var test root catyuan chmod o t var test ...

python實現excel內容逐行寫入txt

最近在做文字分類,拿到的資料很亂。要做下一步,不管是分詞還是tfidf都要先做資料的分類。3萬篇文章,在乙個excel中,每行有每篇文章的id 內容 title content 分類 relative breeds 共三列 按分類建立子目錄,文章按分類放入子目錄中,每篇文章寫入乙個txt檔案,txt...