kafka基本操作

2021-09-19 16:04:35 字數 2447 閱讀 2513

一、topic

1.檢視全部topic

kafka-topics.sh --zookeeper localhost:2181 --list

2.建立topic,指定分割槽數目和備份因子

kafka-topics.sh --zookeeper localhost:2181 --create --topic topic_name --partitions 3 --replication-factor 1

3.描述當前topic

kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic_name

4.指定topic進行刪除

kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name

二、生產與消費

1.指定topic,開啟乙個生產視窗

kafka-console-producer.sh --broker-list localhost:9092 --topic topic_name

2.指定topic,開啟乙個消費視窗,--from-beginning表從頭消費,沒有beginning則監聽實時消費

kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic_name --from-beginning

3.新的消費命令,zookeeper換成bootstrap-server,也可以給出group的名稱

kafka-console-consumer.sh --bootstrap-server borker_host1:port1,borker_host2:port2 --topic topic_name --group group_name --from-beginning --max-messages 100

三、消費組

1.檢視所有的消費組

kafka-consumer-groups.sh --bootstrap-server borker_host:9092 --list

2.檢視當前consumer_group下的消費情況

kafka-consumer-groups.sh --bootstrap-server borker_host:9092 --describe --group kafka_consumer_group_v1

3.重置offset的工具

a. kafka-consumer-groups.sh --bootstrap-server borker_host1:port1 --group group_name --reset-offsets --execute --to-offset new_offset --topic topic_name

b. kafka-consumer-groups.sh --bootstrap-server borker_host1:port1 --group group_name --reset-offsets --execute --to-earliest/--to-latest --topic topic_name

第一條,指定group和topic的offset修改到new_offset的位置,下次消費啟動後,消費將從指定的offset處消費。所有的分割槽都將使用這個值。

第二條,指定group和topic的offset修改到earliest或者latest位置,使得消費者從頭或者從尾部消費。

例如,消費者將test_topic的資訊全部都消費了,假設此時我想將每個分割槽的offset都拉到200去,從200往後重新消費,則通過命令:

c. kafka-consumer-groups.sh --bootstrap-server localhost:6067 --group yan --reset-offsets --execute --to-offset 200 --topic shxx_qq_info_old  --max-messages 10

對設定的offset進行檢查

d. kafka-consumer-groups.sh --bootstrap-server localhost:6067 --group yan --describe

4.刪除group

kafka-consumer-groups.sh --bootstrap-server localhost:6067 --group test-1 --delete

四、kafka版本檢視方法

kafka沒有像別的軟體一樣有,kafka -verison的命令,但是你可以檢視kafka/libs 下的庫檔案,知道kafka的版本號:

命令一find ./libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'

前面2.9.2是scala 的版本,後面0.8.1.1就是你kafka的版本。

命令二find / -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'

kafka基本操作例項

1 啟動命令 任意節點建立 nohup kafka server start.sh 2 建立topic kafka topics.sh create zookeeper hadoop1 2181,hadoop2 2181,hadoop3 2181 replication factor 3 parti...

Kafka 基本概述 操作

點對點 一對一,消費者消費後立刻清除訊息 發布 訂閱模式 一對多,消費者消費資料之後不會清除訊息 producer 訊息生產者,想kafka broker傳送訊息的客戶端 consumer 訊息消費者,想kafka broker取訊息的客戶端 consumer group cg 消費者組,多個con...

Kafka 基本介紹

源自小夥伴的分享,我本身也不會使用這個東西,但是通過她的介紹,對kafka有乙個簡單的了解,基於此做乙個整理。1.什麼是kafka?kafka是乙個分布式 分割槽的 多副本的 多訂閱者,基於zookeeper協調的分布式日誌系統 也可以當做mq系統 常見可以用於web nginx日誌 訪問日誌,訊息...