DStreams輸入 從kafka消費資料 直連

2022-08-15 20:03:12 字數 1858 閱讀 6619

基本資料來源

檔案資料來源

自定義資料來源

本地檔案讀不出來

hdfs 讀取例項: 提前需要在 hdfs 上建好目錄。data輸入下面的命令

1 ./spark-shell --master spark://

linux04:7077

2 import

org.apache.spark.streaming._

3 val ssc=new streamingcontext(sc,seconds(1))

4 ssc.textfilestream("hdfs:").flatmap(_.split(" ")).map((_,1)).reducebykey(_+_).print()

5 hdfs dfs -put c.txt /data

6 ssc.start()

如下圖:

rdd佇列讀取資料

自定義資料來源

企業中如果沒有合適的抓取資料的手段,可以通過繼承 receiver,並實現 onstart、onstop 方法來自定義資料源採集。

注:啟動nc 的時候只啟動服務端

sparkstreaming 連線 kafka作為資料來源 reciver和直連方式

kafka版本的區別

資料從kafka中讀出來

spark讀取flume

要引入flume jar包

集群中

在fluem的conf下面 建立 新增內容
2.刪除flume中其中乙個jar刪除

再將下面的jar上傳到 flume的lib中

輸入命令啟動:[root@linux05 bin]# ./flume-ng agent -c conf -f ../conf/tail_spark.conf -n a1

輸入資料檢視結果? 看1.的圖

如何從ActiveMQ平滑遷移到Kafka?

直入主題,不討論為什麼遷移,直接談遷移方案。既然是從amq ativemq的簡稱 遷移到kafka,那麼遷移過程中肯定需要做到平滑遷移 對於業務沒有影響,對於上下游系統沒有依賴。由於系統一般會和多個上游,多個下游通過mq中介軟體保持依賴關係,遷移的過程中,肯定要做到各個系統上線沒有任何依賴。打個比方...

從scanf角度看待輸入

c primer plus中對scanf進行了一番詳解 假定使用了 d說明符來讀取乙個整數。scanf 函式開始試圖讀取乙個輸入字元,它跳過空白字元直到遇到乙個非空白字元,當碰到整數或者 或者 時,它就儲存並讀取下乙個字元 如果接下來的字元是乙個數字,它就儲存,並讀取下乙個字元直到遇到乙個非數字的字...

從文字特徵到輸入

建立乙個向量,向量的每一位表示某單詞的出現次數。步驟 1.先做詞嵌入,如word2vec 2.抽取一組與 輸出類別相關的向量,對特徵向量進行組合 拼接 加減乘除等 得到輸入向量x 3.將x輸入到非線性分類器中。大部分神經網路工具包不能很好滴處理高維係數向量,然而這一障礙可以通過工程方法解決。稠密表示...