Kafka2 0生產者客戶端原始碼分析

2021-09-25 06:06:10 字數 3626 閱讀 1074

初始化引數配置。

初始化記錄累加器 recordaccumulator。

初始化 kafka 連線 kafkaclient,發現集群的所有節點加入快取。

初始化實現了 runnable 介面的 sender 物件,並在 iothread 中啟動執行緒。

執行訊息***

查詢 kafka 集群元資料

序列化 key、value

獲取分割槽

把訊息新增到記錄累加器中

當 batch 滿了,或者建立了新的 batch 後,喚醒 sender 執行緒

核心原始碼如下

public futuresend(producerrecordrecord, callback callback) 

private futuredosend(producerrecordrecord, callback callback)

return result.future;

}

如果根據指定的主題和分割槽能在快取中查詢到,則直接返回元資料,結束流程。

否則,設定需要更新元資料的標記 needupdate=true,並獲取當前的 version。

喚醒 sender 執行緒,當 sender 執行緒判斷 needupdate=true 時,傳送獲取元資料的請求到 broker,獲取到後更新 needupdate=true,version+1。

當前執行緒判斷,如果 version 變大,說明元資料已更新,則跳出迴圈,拉取新的元資料,判斷是否匹配到主題和分割槽,如果沒有匹配到,返回第2步。

如果 version 沒變大,說明元資料還沒更新,則呼叫 wait(long timeout) 方法,等待 timeout 時間後,返回第4步。

當第4步獲取到匹配的元資料後,返回給 dosend 方法。

核心原始碼如下

private clusterandwaittime waitonmetadata(string topic, integer partition, long maxwaitms)  catch (timeoutexception ex) 

cluster = metadata.fetch(); // 重新獲取元資料

elapsed = time.milliseconds() - begin;

if (elapsed >= maxwaitms) // 超出最大等待時間,丟擲異常

throw new timeoutexception("failed to update metadata after " + maxwaitms + " ms.");

remainingwaitms = maxwaitms - elapsed;

partitionscount = cluster.partitioncountfortopic(topic);

} while (partitionscount == null); // 分割槽數量是 0,繼續上述迴圈

if (partition != null && partition >= partitionscount)

return new clusterandwaittime(cluster, elapsed);

}// 等待更新

public synchronized void awaitupdate(final int lastversion, final long maxwaitms) throws interruptedexception

}

kafka 使用緩衝池技術給訊息分配堆位元組快取 heapbytebuffer,緩衝池的空閒佇列free存放了空閒的快取佇列,優先直接從中取出第乙個進行分配快取,如果緩衝池不夠了,利用 reentrantlock + condition 構造等待佇列,等待緩衝池足夠分配。

kafka 在處理訊息響應時,釋放分配的記憶體,並把加入空閒佇列 free。

// 緩衝池

public class bufferpool

// 位元組緩衝分配

public bytebuffer allocate(int size, long maxtimetoblockms) throws interruptedexception else

remainingtimetoblockns -= timens;

// 直接在空閒佇列分配

if (accumulated == 0 && size == this.poolablesize && !this.free.isempty()) else

}accumulated = 0; // 清空

}} }

if (buffer == null) // 沒有在空閒佇列分配到記憶體,需要在堆上分配記憶體

return new heapbytebuffer(size, size);

else

return buffer;

}private void freeup(int size)

// 處理生產者響應訊息時,釋放分配的記憶體

public void deallocate(bytebuffer buffer, int size) else

condition moremem = this.waiters.peekfirst();

if (moremem != null)

moremem.signal();

} finally

}

累加器使用 copyonwritemap 來快取訊息,key 是主題分割槽資訊,value 是個雙端佇列,佇列中的物件是壓縮後的批量訊息。

// 累加器快取

concurrentmap> batches = new copyonwritemap<>();

copyonwritemap 是執行緒安全的,是由 kafka 實現的寫時複製 map,內部定義了 volatile 的 map,讀時不用加鎖,直接讀取,寫時需要加鎖,然後拷貝乙個 map 副本進行實際的寫入,寫入完成後再把原來的 map 指向修改後的 map。

雙端佇列 deque 實際上就是 arraydeque,非執行緒安全的,需要手動同步。使用雙端佇列可以在訊息傳送失敗時,把訊息直接放回佇列頭部進行重試。

// 累加訊息到快取

long timestamp,

byte key,

byte value,

header headers,

callback callback,

long maxtimetoblock) throws interruptedexception

byte maxusablemagic = apiversions.maxusableproducemagic();

// 計算訊息大小

int size = math.max(this.batchsize, abstractrecords.estimatesizeinbytesupperbound(maxusablemagic, compression, key, value, headers));

buffer = free.allocate(size, maxtimetoblock); // 利用 bufferpool 分配位元組快取

synchronized (dq)

}}

Kafka2 0消費者客戶端使用

kafka 通過 kafkaconsumer 構造器初始化生產者客戶端的配置。常用的重要配置,詳見官網。group.id 消費組 id key.serializer 實現了 kafka 序列化介面的類,用來序列化 key。value.serializer 實現了 kafka 序列化介面的類,用來序列...

Kafka2 0消費者客戶端使用

kafka 通過 kafkaconsumer 構造器初始化生產者客戶端的配置。常用的重要配置,詳見官網。group.id 消費組 id key.serializer 實現了 kafka 序列化介面的類,用來序列化 key。value.serializer 實現了 kafka 序列化介面的類,用來序列...

kafka學習筆記(二) 生產者介紹

生產者傳送訊息的過程 分割槽原則 資料可靠性保證 acsk 0 producer不等待broker的ack,這樣可以提供最低的延遲,broker一接收到還沒有寫入到磁碟就返回ack,當broker出現故障的時候可能會丟失資料 1 producer等待broker返回ack,partition的lea...