Spark讀取Kafka 高低階API

2021-10-06 08:35:55 字數 1042 閱讀 7755

1、kafkautils.createdstream

建構函式為kafkautils.createdstream(ssc, [zk], [consumer group id], [per-topic,partitions] ) 

使用了receivers來接收資料,利用的是kafka高層次的消費者api,對於所有的receivers接收到的資料將會儲存在spark executors中,然後通過spark streaming啟動job來處理這些資料,缺省會丟失,可啟用wal日誌,該日誌儲存在hdfs上 

a、建立乙個receiver來對kafka進行定時拉取資料,ssc的rdd分割槽和kafka的topic分割槽不是乙個概念,故如果增加特定主體分割槽數僅僅是增加乙個receiver中消費topic的執行緒數,並不增加spark的並行處理資料數量 

b、對於不同的group和topic可以使用多個receivers建立不同的dstream 

c、如果啟用了wal,需要設定儲存級別,即kafkautils.createstream(….,storagelevel.memory_and_disk_ser)。

2.kafkautils.createdirectstream

區別receiver接收資料,這種方式定期地從kafka的topic+partition中查詢最新的偏移量,再根據偏移量範圍在每個batch裡面處理資料,使用的是kafka的簡單消費者api 

優點: 

a、 簡化並行,不需要多個kafka輸入流,該方法將會建立和kafka分割槽一樣的rdd個數,而且會從kafka並行讀取。 

b、高效,這種方式並不需要wal,wal模式需要對資料複製兩次,第一次是被kafka複製,另一次是寫到wal中 

c、恰好一次語義(exactly-once-semantics),傳統的讀取kafka資料是通過kafka高層次api把偏移量寫入zookeeper中,存在資料丟失的可能性是zookeeper中和ssc的偏移量不一致。eos通過實現kafka低層次api,偏移量僅僅被ssc儲存在checkpoint中,消除了zk和ssc偏移量不一致的問題。缺點是無法使用基於zookeeper的kafka監控工具。

c語言資料型別高低階

若參與運算量的型別不同,則先轉換成同一型別,然後進行運算。轉換按資料長度增加的方向進行,以保證精度不降低。如int型和long型運算時,先把int量轉成long型後再進行運算。a.若兩種型別的位元組數不同,轉換成位元組數高的型別 b.若兩種型別的位元組數相同,且一種有符號,一種無符號,則轉換成無符號...

kafka消費者低階API

實現使用低階api讀取指定topic,指定partition,指定offset的資料。1 消費者使用低階api 的主要步驟 步驟主要工作 1根據指定的分割槽從主題元資料中找到主副本 2獲取分割槽最新的消費進度 3從主副本拉取分割槽的訊息 4識別主副本的變化,重試 指定分割槽,指定offset 1 根...

Spark讀取檔案

spark預設讀取的是hdfs上的檔案。如果讀取本地檔案,則需要加file usr local spark readme.md。測試時候發現,本地檔案必須在spark的安裝路徑內部或者平行 讀取hdfs檔案,可以這樣指定路徑 hdfs ns1 tmp test.txt。如果不指定任何字首,則使用hd...