原始碼分析kafka是如何傳送訊息的(一)

2021-10-04 18:58:53 字數 3011 閱讀 8936

從這章開始講kafka生產者。首先講的是kafka的傳送訊息流程。kafka傳送訊息的流程和rocketmq有點類似,無非就是建立網路連線,獲取kafka集群broker 的ip port位址,topic 的分割槽partition資訊,選擇乙個分割槽partition然後將訊息傳送到該分割槽partition對應的broker上。事實上是不是這樣的呢?

上面我們簡單說了一下kafka傳送訊息的流程,現在讓我們看一下乙個傳送訊息的demo,如下圖,建立乙個kafkaproducer物件,指定kafka位址,建立一條訊息message,指定mesage的topic,調send方法將訊息傳送出去。

properties properties=new properties();

properties.put("bootstrap.servers","localhost:9092");//kafkabroker 位址

properties.put("key.serializer","org.apache.kafka.common.serialization.stringserializer");

properties.put("value.serializer","org.apache.kafka.common.serialization.stringserializer");

properties.put("acks","all");//isr集合都寫入才返回

// properties.put("enable.idempotence",true);

properties.put("batch.size","1");//

kafkaproducerproducer=new kafkaproducer(properties);

producerrecord record = new producerrecord("topic-test-1", "test-2", "value");//訊息

futurevalue = producer.send(record);//send 這裡非同步傳送

recordmetadata recordmetadata = value.get();//獲取結果

producer.flush();

system.out.println("producerdemo send result , topic="+recordmetadata.topic()+" ,partition="

+recordmetadata.partition()+", offset="+recordmetadata.offset());

調producer.send方法之後到底發生了什麼呢?  client是如何獲取topic所有分割槽資訊的呢?topic又會選擇topic哪個分割槽傳送訊息呢?訊息是同步傳送到kafka還是非同步傳送到kafka呢?我們下面進入 send方法看一下:

@override

public futuresend(producerrecordrecord)

public futuresend(producerrecordrecord, callback callback)

從上面可以看出呼叫過程,我們發現有個 producerinterceptors ***,這個***提供了幾個方法,會在訊息傳送前,和傳送後執行,如果我們需要在訊息傳送前或傳送成功後做點事情,可以實現這個***。

接著往下看,未完待續。。。。

private futuredosend(producerrecordrecord, callback callback)  catch (classcastexception cce) 

byte serializedvalue;

try catch (classcastexception cce)

int partition = partition(record, serializedkey, serializedvalue, metadata.fetch());

int serializedsize = records.log_overhead + record.recordsize(serializedkey, serializedvalue);

ensurevalidrecordsize(serializedsize);

tp = new topicpartition(record.topic(), partition);

long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();

log.trace("sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);

// producer callback will make sure to call both 'callback' and interceptor callback

callback interceptcallback = this.interceptors == null ? callback : new interceptorcallback<>(callback, this.interceptors, tp);

if (result.batchisfull || result.newbatchcreated) partition {} is either full or getting a new batch", record.topic(), partition);

this.sender.wakeup();

}return result.future;

// handling exceptions and record the errors;

// for api exceptions return them in the future,

// for other exceptions throw directly

} catch (apiexception e) {

kafka原始碼分析 kafkaconsumer

kafkaconsumer 對於多執行緒訪問是不安全的,通過使用acquire 跟release 方法來操作atomiclong currentthread字段 儲存當前訪問執行緒id 有多個執行緒同時訪問丟擲concurrentmodificationexception,來防止對個執行緒同時訪問。...

Kafka原始碼分析(一)

apache kafka 是 乙個分布式流處理平台.這到底意味著什麼呢?我們知道流處理平台有以下三種特性 它可以用於兩大類別的應用 為了理解kafka是如何做到以上所說的功能,從下面開始,我們將深入探索kafka的特性。首先是一些概念 kafka有四個核心的api 讓我們首先深入了解下kafka的核...

Kafka原始碼分析之KafkaProducer

kafkaproducer是乙個kafka客戶端實現,可以發布記錄records至kafka集群。kafkaproducer是執行緒安全的,多執行緒之間共享單獨乙個producer例項通常會比多個producer例項要快。kafkaproducer包含一組快取池空間,儲存尚未傳輸到集群的記錄reco...