Kafka擴充套件內容

2021-10-02 22:28:52 字數 2652 閱讀 2358

# 低階api:

props.

put(

"group.id"

,"01");

# offset自動重置,offset可能因為快取刪除,序號不一定從0開始

props.

put(consumerconfig.auto_offset_reset_config,

"earliest");

# 高階api:

consumer.

seek

(new

topicpartition

(topic, partititon, offset)

);

消費者組id、topic和partition唯一確定乙個offset。

可以檢視_consumer_offsets這個topic裡的資料。

intercetpor的實現介面是org.apache.kafka.clients.producer.producerinterceptor。

onsend(producerrecord)方法在訊息被序列號及計算分割槽之前呼叫。

onacknowledgement(recordmetadata, exception)在訊息被應答或傳送失敗時呼叫。

// 配置檔案新增***鏈

// props.put(producerconfig.interceptor_classes_config, list);

public

class

interceptor

implements

producerinterceptor

@override

public

void

onacknowledgement

(recordmetadata metadata, exception exception)

else

}@override

public

void

close()

@override

public

void

configure

(map?> configs)

}

輕量級(功能性弱) ,實時性(非微批次處理,視窗允許亂序資料,允許資料遲到),一條條資料處理。

>

>

org.apache.kafkagroupid

>

>

kafka-streamsartifactid

>

>

$version

>

dependency

>

// kafka streams處理案例

public

class

streams

,"source").

addsink

("sink"

,"second"

,"processor");

// 建立配置檔案

properties props =

newproperties()

; props.

put(

"bootstrap.servers"

,"hadoop01:9092");

props.

put(

,"kafkastreams");

// 建立kafka streams物件

kafkastreams kafkastreams =

newkafkastreams

(topologybuilder, props)

;// 開啟流處理

kafkastreams.

start()

;}}// topologybuilder.addprocessor中的prossorsupplier返回的processor類這裡做定製

public

class

myprocessor

implements

processor

<

byte

,byte

>

// 處理邏輯

@override

public

void

process

(byte

bytes,

byte

bytes2)

@override

public

void

punctuate

(long l)

@override

public

void

close()

}

資料傳輸層。

flume:cloudera公司研發,適合多個生產者,適合下游資料消費者不多的情況(費記憶體),適合資料安全性要求不高的操作,適合與hadoop生態圈對接的操作。

kafka:linkedin公司研發,適合資料下游消費眾多的情況(快取資料跟消費者個數無關),適合資料安全性較高的操作(資料在磁碟,備份),支援relication。

多個agent後台資料,交由乙個agent彙總,對接kafka,離線/實時兩條線消費。

物件擴充套件內容

const function def es6可以簡潔如下表示,上下是一樣的 const function def 我們知道訪問屬性的方式 點運算子和中括號運算子,區別在於點運算子後面不可以是變數或者數字,而中括號卻可以!let obj obj.name 3 obj 1 kangkang obj ad...

如何擴充套件Kafka的broker

背景 因為公司收集終端盒子資料的kafka服務偶爾會倒,所以考慮使用kafka的分布式,增加broker節點,來提高系統的可用性。當然,zookeeper服務節點也是可以增加的,但不在本文範圍內。具體步驟如下 1.新加kafka服務,並啟動 如果是同一伺服器,則可以拷貝新建server.proper...

oldboyshell程式設計擴充套件內容

oldboyshell程式設計擴充套件內容 一 命令的優先順序 命令分為 alias compound commands function build in hash path error command not found 獲取乙個命令會按照上述優先順序取尋找,先找同名的alias命令,再找com...