先在kafka輸入下列命令充當生產者
./bin/kafka-console-producer.sh --broker-list 192.168.147.133:9092,192.168.147.134:9092,192.168.147.135:9092 --topic test034
下列**當消費者
package day14
import org.apache.spark.streaming.dstream.receiverinputdstream
import org.apache.spark.sparkconf
import org.apache.spark.streaming.kafka.kafkautils
import org.apache.spark.streaming.
/** * kafka的receive方式實現wc
* receive
* 1.重點:首先會以dstream中的資料進行按key做reduce操作,然後再對各個批次的資料進行累加
2.updatestatebykey 方法中
updatefunc就要傳入的引數,他是乙個函式。seq[v]表示當前key對應的所有值,option[s] 是當前key的歷史狀態,返回的是新的
*/object kafkawc
some(currvaluesum + prevalue.getorelse(0))
}).print(50) // 預設10條 ,加引數可設定列印條數
// val result = words.reducebykey(_+_)
//new hashpartitioner(ssc.sparkcontext.defaultparallelism), true, data))
// result.print()
ssc.start()
ssc.awaittermination()
}}
hadoop執行自帶例項wordcount
作業系統 ubuntu hadoop版本 3.1.3 cd usr local hadoop bin hdfs namenode format 格式化namenode sbin start dfs.shbin hdfs dfs mkdir input 新建input資料夾 bin hdfs dfs ...
Hadoop偽分布式執行wordcount小例子
先說點小知識 hadoop fs 使用面最廣,可以操作任何檔案系統 hadoop dfs和hdfs dfs只能操作hdfs相關的 先建資料存放目錄和結果輸出目錄 guo guo opt hadoop hadoop 2.7.2 hdfs dfs mkdir data input guo guo opt...
Hadoop偽分布式執行wordcount例子
1.進入hadoop目錄,新建乙個test.log檔案,cat命令檢視檔案內容 2.啟動yarn和dfs,一種是全部啟動start all.sh,另外一種分別啟動,如下圖的提示 4.把新建的檔案傳到hdfs的data input中,用ls命令檢視是否傳遞成功 5.進入mapreduce目錄 6.ls...