kafka基本特性簡介

2021-07-27 11:42:50 字數 2889 閱讀 5987

kafka是linkedin開發的用於日誌資料處理的流式訊息處理系統。官網上說kafka is a distributed、partitioned、replicated commit logservice.這句話充分體現了kafka的特性。kafka是首先是乙個用於處理流式資料的日誌處理系統,然後他是分布式的,他支援分割槽便於橫向拓展,他具有冗餘備份功能。

目前kafka主要有三個方面的用途,分別是訊息訂閱與發布系統、訊息儲存、流式資料處理。本文主要圍繞kafka的訊息訂閱與發布功能進行介紹。

topic主題是kafka對流的乙個抽象。主題是乙個分類,例如新聞可以分為體育、戰爭、科技等話題,不同的主題有不同的發布者和訂閱者。

partiton是kafka的乙個重要概念,是乙個主題在物理上的分組。如下圖所示,乙個topic被分成了3個partition,每個partition都是乙個佇列結構。partition可以存在於不同的機器上,這樣的好處是實現了負載均衡,不同的consumer可以從不同的partition中獲取資料,partition的這種結構還便於集群的橫向拓展。

producer用於向kafka集**送訊息,訊息必須具有主題。

consumers是消費者,向kafka獲取訊息。kafka作為訊息佇列靈活的地方在於它提出了consumer group的概念,consumer group內部可以有乙個到多個consumer。consumer group的好處在於:

1、負載均衡

乙個consumer group裡的consumer應該是功能相近的,這些consumer可以在不同的機器上,多個consumer共同完成同乙個group的任務能夠實現負載均衡。

2、保持佇列的順序一致性

乙個topic的乙個partition只對應乙個consumer,這樣的好處就是由於乙個partition裡的訊息是按照先後順序的,那麼交給乙個consumer來處理也會保持先後順序。

下面是乙個例子,假如某個topic具有3個partition,而乙個具有3個consumer的group訂閱了該訊息,此時,每個partition會一對一地被consumer消費。此時增加乙個partition,那麼3個consumer中會有乙個要多處理乙個partition的訊息。而如果增加的是乙個consumer的話,這個consumer會空閒。

properties props = new properties();

props.put("bootstrap.servers", "localhost:9092");

props.put("group.id", "test");

props.put("enable.auto.commit", "true");

props.put("auto.commit.interval.ms", "1000");

props.put("key.deserializer", "org.apache.kafka.common.serialization.stringdeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.stringdeserializer");

kafkaconsumerconsumer = new kafkaconsumer<>(props);

consumer.subscribe(arrays.aslist("foo","bar"))

while(true)

}

如下**所示為手動提交offset更改,值得注意的是在consumer的配置中,「enable.auto.commit」被設定成了false,通過consumer.commitsync()來提交offset。

properties props = new properties();

props.put("bootstrap.servers", "localhost:9092");

props.put("group.id", "test");

props.put("enable.auto.commit", "false");

props.put("key.deserializer", "org.apache.kafka.common.serialization.stringdeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.stringdeserializer");

kafkaconsumerconsumer = new kafkaconsumer<>(props);

consumer.subscribe(arrays.aslist("foo", "bar"));

final int minbatchsize = 200;

list> buffer = new arraylist<>();

while (true)

if (buffer.size() >= minbatchsize)

}

除此之外還能進行更加精細地更改,精細到partition級別,如下**所示針對每個partition的訊息,使用consumer.commitsync(collections.singletonmap(partition, new offsetandmetadata(lastoffset + 1)))。

try 

long lastoffset = partitionrecords.get(partitionrecords.size() - 1).offset();

consumer.commitsync(collections.singletonmap(partition, new offsetandmetadata(lastoffset + 1)));}}

} finally

Qt基本特性簡介

qt不只是介面庫,qt提供了功能豐富的c 類庫,比如網路程式設計,資料庫查詢,xml解析,md5加密等 從系統得到的訊息,比如滑鼠,鍵盤等。qt事件迴圈的時候讀取這些事件,轉換為qevent後依次派發到對應視窗進行處理。從低到高逐漸可以分為如下步驟 qmetaobject connection co...

kafka基本概念簡介

按照官方的說法,kafka是乙個分布式流平台。實際使用中kafak主要作為乙個中間人,提供資料非同步處理的能力,同時可以對資料進行不同的處理。比如不同的消費者,對資料進行不一樣的邏輯處理 topic 主題 就是同一類訊息的名稱,例如資料庫的乙個表裡面儲存了同一種型別的資料,同乙個主題處理的也是同一類...

Kafka 簡介及基本概念

6.other kafka 是乙個分布式流平台 apache kafka is a distributed streaming platform 功能 應用 幾個認識 1.zookeeper 註冊中心 zookeeper 乙個 分布式協調框架 管理 kafka 集群中的 broker 節點 bin ...