kafka常用命令

2021-09-13 20:08:26 字數 4392 閱讀 2323

安裝

# 安裝zookeeper

brew info zookeeper

brew install zookeeper

# 安裝路徑

ls /usr/local/celler/zookeeper

# 配置資訊路徑

ls /usr/local/etc/zookeeper

啟動

# 啟動服務端

./zkserver start

# 啟動客戶端

./zkcli start

zkcli命令

# 檢視當前節點資訊

ls /

# 檢視當前節點資料及更新次數等資訊

ls2 /

# 建立節點並設定關聯值

create /node_test "test"

# 建立乙個新的空節點

create /node_test ""

# 獲取節點內容

get /node_test

# 修改節點內容

set /node_test "new text balabala..."

# 刪除節點

delete /node_test

# 刪除節點及子節點

rmr /node_test

# 連線到其他zookeeper伺服器

connect host:port

# 關閉連線

close

# 操作指令歷史

history

# 重複執行某個指令,使用格式 redo id,id為histroy命令查詢出來的編號

redo

四字命令

# 使用示例 查詢服務配置資訊

echo conf | nc 127.0.0.1 2181

安裝

brew info kafka

brew install kafka

# 安裝路徑

ls /usr/local/celler/kafka

# 配置資訊路徑

ls /usr/local/etc/kafka

啟動

# 後台啟動

./kafka-server-start -daemon /usr/local/etc/kafka/server.properties

./kafka-server-start /usr/local/etc/kafka/server.properties.

kafka-topic使用

# 查詢topics

kafka-topics --list --zookeeper localhost:2181

# 建立

kafka-topics --create --zookeeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test

# 查詢topic詳情

kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

kafka-console-producer/kafka-console-consumer

# 生產者 (阻塞)

kafka-console-producer --broker-list localhost:9092 --topic test

# 消費者 (阻塞)

kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic test

kafka-consumer-groups

# 檢視consumer group消費情況

kafka-consumer-groups --new-consumer --bootstrap-server 127.0.0.1:9092 --group testgroup --describe

# 檢視consumer group消費情況

kafka-consumer-groups -bootstrap-server localhost:9092 --group testgroup --describe

# 初始化consumer group 的offset,需先停掉消費者 --to-earliest

kafka-consumer-groups --bootstrap-server localhost:9092 --group testgroup --topic test --reset-offsets --to-earliest --execute

# 將consumer group 的offset置為最新(不消費舊的資料),需先停掉消費者 --to-latest

kafka-consumer-groups --bootstrap-server localhost:9092 --group testgroup --topic test --reset-offsets --to-latest --execute

# 設定任意偏移量 --to-offset

$ kafka-consumer-groups --bootstrap-server localhost:9092 --group testgroup --topic test --reset-offsets --to-offset 1 --execute

刪除topic步驟

如果被刪除的topic正在被程式使用(生產/消費),先停止這些占用的程式。因為如果有程式正在生產或者消費該topic,則該topic的offset資訊一致會在broker更新。呼叫kafka delete命令則無法刪除該topic。同時,需要設定 auto.create.topics.enable = false,預設設定為true。如果設定為true,則produce或者fetch 不存在的topic也會自動建立這個topic。這樣會給刪除topic帶來很多意向不到的問題。所以,這一步很重要,必須設定auto.create.topics.enable = false,並認真把生產和消費程式徹底全部停止。

server.properties設定 delete.topic.enable=true

如果沒有設定 delete.topic.enable=true,則呼叫kafka 的delete命令無法真正將topic刪除,而是顯示(marked for deletion)

呼叫命令刪除topic:

kafka-topics --delete --zookeeper 【zookeeper server:port】 --topic 【topic name】

刪除kafka儲存目錄(server.properties檔案log.dirs配置,預設為"/data/kafka-logs")相關topic的資料目錄。

注意:如果kafka 有多個 broker,且每個broker 配置了多個資料盤(比如 /data/kafka-logs,/data1/kafka-logs …),且topic也有多個分割槽和replica,則需要對所有broker的所有資料盤進行掃瞄,刪除該topic的所有分割槽資料。

找一台部署了zk的伺服器,使用命令:

bin/zkcli.sh -server 【zookeeper server:port】

登入到zk shell,然後找到topic所在的目錄:ls /brokers/topics,找到要刪除的topic,然後執行命令:

rmr /brokers/topics/【topic name】

即可,此時topic被徹底刪除。

如果topic 是被標記為 marked for deletion,則通過命令 ls /admin/delete_topics,找到要刪除的topic,然後執行命令:

rmr /admin/delete_topics/【topic name】

備註:rmr /consumers/【consumer-group】

rmr /config/topics/【topic name】

其實正常情況是不需要進行這兩個操作的,如果需要,那都是由於操作不當導致的。比如step1停止生產和消費程式沒有做,step2沒有正確配置。也就是說,正常情況下嚴格按照step1 – step5 的步驟,是一定能夠正常刪除topic的。

完成之後,呼叫命令:

./bin/kafka-topics.sh --list --zookeeper 【zookeeper server:port】

檢視現在kafka的topic資訊。正常情況下刪除的topic就不會再顯示。

但是,如果還能夠查詢到刪除的topic,則重啟zk和kafka即可。 |

kafka常用命令

kafka常用操作命令 l檢視當前伺服器中的所有topic bin kafka topics.sh list zookeeper hadoop02 2181 l建立topic kafka topics.sh create zookeeper hadoop02 2181 replication fac...

kafka常用命令

啟動集群 nohup bin kafka server start.sh config server.properties 建立topic kafka topics topictt replication factor3 partitions3 create zookeeper hadoop1 21...

Kafka 常用命令

kafka console producer broker list 127.0.0.1 9092 topic mytopic kafka console consumer bootstrap server 127.0.0.1 9092 topic mytopic 建立主題,replication ...