Kafka命令列操作及常用API

2022-09-17 04:06:12 字數 2097 閱讀 8325

一、kafka命令列操作

1.檢視當前集群已存在的主題

bin/kafka-topic.sh --zookeeper hd09-01:2181 --list

2.建立主題

bin/kafka-topic.sh --zookeeper hd09-01:2181 --create --relication-factor 3 \

>--partition 1 \

>---topic xinnian

3.刪除主題

bin/kafka-topic.sh --zookeeper hd09-01:2181 --delete --topic xinnian

4.啟動生產者傳送訊息(相當於直接建立主題)

bin/kafka-console-producer.sh --broker-list hd09-01:9092 --topic xinnian

5.啟動消費者接收訊息

bin/kafka-console-consumer.sh --bootstrap-server hd09-01:9092 \

>--topic xinnian

>--from-beginning

6.檢視主題的詳細資訊

bin/kafka-topic.sh --zookeeper hd09-01:2181 --describe --topic xinnian

二、kafka的常用api(生產者和消費者)

注意!!!

在本地編譯器上編寫kafka的生產者消費者api時,要在本地的hosts檔案中新增對映!!

1.生產者

1)配置生產者屬性(kafka節點位址,是否等待應答,傳送訊息失敗是否重試,批處理訊息大小,批處理資料延遲,記憶體緩衝,序列化等)生產者傳送資料,只需序列化

properties prop = new properties();

prop.put("","");

2)例項化生產者

kafkaproducerproducer = new kafkaproducer(prop);

3)生產者傳送訊息(使用只含有producerrecord的send方法)

for(~)

4)關閉生產者資源

producer.close();

/*

* * @author: princesshug

* @date: 2019/2/28, 16:36

* @blog:

*/public

class

prodecer

producer.close();

}}

2.消費者

1)配置消費者屬性(伺服器位址,消費者組,自動確認偏移量,反序列化)

properties prop = new properties();

2)例項化消費者(執行緒安全,把consumer定義為常量)

final kafkaconsumerconsumer = new kafkaconsumer(prop);

3)釋放資源,執行緒安全

public class runtime extends object

每個j**a應用程式都有乙個runtime類的runtime ,允許應用程式與執行應用程式的環境進行介面。

void addshutdownhook(thread hook) =>註冊乙個新的虛擬機關機掛鉤。

static runtime getruntime() =>返回與當前j**a應用程式關聯的執行時物件。   

4)訂閱訊息主題,消費者拉取訊息poll

while(true)

}

/**

* @author

: princesshug

* @date: 2019/2/28, 20:07

* @blog:

*/public

class

consumerdemo

}}));

//消費者訂閱主題

consumer.subscribe(arrays.aslist("xinnian"));

//消費者拉取訊息

while (true

) }

}}

Kafka命令列操作

1 檢視當前伺服器中的所有topic atguigu hadoop102 kafka bin kafka topics.sh zookeeper hadoop102 2181 list2 建立topic atguigu hadoop102 kafka bin kafka topics.sh zook...

Kafka 命令列操作

1.檢視當前伺服器中的所有 topic bin kafka topics.sh zookeeper localhost 2181 list2.建立 topic bin kafka topics.sh zookeeper localhost 2181 create replication factor...

Kafka命令列操作

1.檢視當前伺服器中的所有topicbin kafka topics.sh zookeeper bigdata13 2181 list2.建立topicbin kafka topics.sh zookeeper bigdata13 2181 create replication factor 3 p...