Kafka Stream簡單示例(一)

2021-09-11 19:59:42 字數 2437 閱讀 4822

最近想統計一些訊息資料,原計畫接收kakfa訊息後自行統計然後存入資料庫(統計相對比較簡單,所以沒有考慮使用apache storm), 突然想起來kafka已經提供kakfa stream功能,於是開始看kafka stream。 下面的例子非常簡單,只是在kafka提供的例子上做了一點修改。

因為我們使用的kafka stream所以新增的依賴是kafka-streams, 不是以前經常使用的kafka-clients.

我的kafka安裝在windows 10上面(為了方便測試,平時在公司時可以直接連線到kafka集群,開發時先在本地執行,於是在windows10上安裝了kafka)。 版本kafka_2.12-1.0.0

>

>

org.apache.kafkagroupid

>

>

kafka-streamsartifactid

>

>

1.0.2version

>

dependency

>

官方示例的**在`這裡

官方示例中向topic直接傳送了溫度資料。 我修改一下。 向topic傳送json格式的資料,裡面包含了溫度和濕度。例如

注意:該**只在官方示例上修該資料格式,其他部分和官方示例一樣。啟動程式後直接向topic iot-temperature傳送格式為的訊息即可看到執行效果。

public

class

temperaturedemo})

.groupbykey()

.windowedby

(timewindows.

of(timeunit.seconds.

tomillis

(temperature_window_size)))

.reduce

(new

reducer

()else}}

).tostream()

//過濾條件就是溫度大於20

stream執行結果存放在topic iot-temperature-max中, 我們檢視該topic的資料。 只有大於temperature_threshold (20)被存入該topic

oracle job簡單示例

廢話不說,本篇記錄乙個簡單job示例,採用oracle 10i與pl sql developer工具。完成乙個job必須具備三元素 1 table 使用者關心的資料表,用於job更新等 2 procedure 封裝使用者對table的操作 3 job 描述什麼時間 執行頻率使用procedure來操...

jsoncpp簡單示例

scons platform linux gcc 編譯出來的庫檔案在其libs linux gcc 4.4.2目錄下,有libjson linux gcc 4.4.2 libmt.so和libjson linux gcc 4.4.2 libmt.a。標頭檔案在解壓目錄下的include中。我的jso...

jsoncpp簡單示例

1 編譯jsoncpp mkdir usr jsoncpp cp r include usr jsoncpp cp r libs usr jsoncpp 2 jsoncpp簡單例項 1 反序列化json物件 比如乙個json物件的字串序列如下,其中 array 表示json物件中的陣列 那怎麼分別取...