kafka實現wordcount並實現累加操作

2021-09-01 05:41:57 字數 1009 閱讀 6081

先在kafka輸入下列命令充當生產者

./bin/kafka-console-producer.sh  --broker-list 192.168.147.133:9092,192.168.147.134:9092,192.168.147.135:9092 --topic test034

下列**當消費者

package day14

import org.apache.spark.streaming.dstream.receiverinputdstream

import org.apache.spark.sparkconf

import org.apache.spark.streaming.kafka.kafkautils

import org.apache.spark.streaming.

/** * kafka的receive方式實現wc

* receive

* 1.重點:首先會以dstream中的資料進行按key做reduce操作,然後再對各個批次的資料進行累加

2.updatestatebykey 方法中

updatefunc就要傳入的引數,他是乙個函式。seq[v]表示當前key對應的所有值,option[s] 是當前key的歷史狀態,返回的是新的

*/object kafkawc

some(currvaluesum + prevalue.getorelse(0))

}).print(50) // 預設10條 ,加引數可設定列印條數

// val result = words.reducebykey(_+_)

//new hashpartitioner(ssc.sparkcontext.defaultparallelism), true, data))

// result.print()

ssc.start()

ssc.awaittermination()

}}

hadoop執行自帶例項wordcount

作業系統 ubuntu hadoop版本 3.1.3 cd usr local hadoop bin hdfs namenode format 格式化namenode sbin start dfs.shbin hdfs dfs mkdir input 新建input資料夾 bin hdfs dfs ...

Hadoop偽分布式執行wordcount小例子

先說點小知識 hadoop fs 使用面最廣,可以操作任何檔案系統 hadoop dfs和hdfs dfs只能操作hdfs相關的 先建資料存放目錄和結果輸出目錄 guo guo opt hadoop hadoop 2.7.2 hdfs dfs mkdir data input guo guo opt...

Hadoop偽分布式執行wordcount例子

1.進入hadoop目錄,新建乙個test.log檔案,cat命令檢視檔案內容 2.啟動yarn和dfs,一種是全部啟動start all.sh,另外一種分別啟動,如下圖的提示 4.把新建的檔案傳到hdfs的data input中,用ls命令檢視是否傳遞成功 5.進入mapreduce目錄 6.ls...