kafka安裝與測試

2022-03-22 14:53:54 字數 2975 閱讀 7750

基於linux-centos7.0環境先進行測試學習

producer即生產者,向kafka集**送訊息,在傳送訊息之前,會對訊息進行分類,即topic,

topic即主題,通過對訊息指定主題可以將訊息分類,消費者可以只關注自己需要的topic中的訊息

consumer即消費者,消費者通過與kafka集群建立長連線的方式,不斷地從集群中拉取訊息,然後可以對這些訊息進行處理。

kafka官網

安裝

將二進位制檔案解壓到指定目錄

tar -zxvf kafa_2.11.tgz -c /opt/module

執行

kafka執行依賴於zookeeper,高版本內建zookeeper環境

a.啟動zookeeper環境

bin/zookeeper-server-start.sh   config/zookeeper.properties

啟動後不要關閉終端 新建連線執行命令

b.啟動kafka程序

bin/kafka-server-start.sh  config/server.properties

注意保持程序狀態活躍,新建終端執行操作

c.新建連線 ,建立topic

kafka_2.11-2.0.0]$ bin/kafka-topics.sh  --create --zookeeper localhost:2181

--replication-factor 1 --partitions 1 --topic test

replication-factor 副本的個數

partitions 分割槽數目

topic  主題名稱

檢視已經建立的topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

d.建立生產者

bin/kafka-console-producer.sh  --broker-list localhost:9092 --topic test

broker-list 生產者的唯一標示

topic 訊息主題名稱

>輸入內容+enter

e.新建連線,建立消費者

bin/kafka-console-consumer.sh  --bootstrap-server localhost:9092 --topic  test --from-beginning

from-beginning 從一開始就接收

顯示 輸入內容

kafka的儲存,分割槽partitions,建立乙個topic時,同時可以指定分割槽數目,分割槽數越多,其吞吐量也越大

kafka在接收到生產者傳送的訊息之後,會根據均衡策略將訊息儲存到不同的分割槽中。

生產者在向kafka集**送訊息的時候,可以通過指定分割槽來傳送到指定的分割槽中

也可以通過指定均衡策略來將訊息傳送到不同的分割槽中

如果不指定,就會採用預設的隨機均衡策略,將訊息隨機的儲存到不同的分割槽中

kafka常用命令:

1.檢視topic的詳細資訊

./kafka-topics.sh -zookeeper 127.0.0.1:2181 -describe -topic testkj1

2、為topic增加副本

./kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 -reassignment-json-file json/partitions-to-move.json -execute

3、建立topic

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testkj1

4、為topic增加partition

./bin/kafka-topics.sh –zookeeper 127.0.0.1:2181 –alter –partitions 20 –topic testkj1

5、kafka生產者客戶端命令

./kafka-console-producer.sh --broker-list localhost:9092 --topic testkj1

6、kafka消費者客戶端命令

./kafka-console-consumer.sh -zookeeper localhost:2181 --from-beginning --topic testkj1

7、kafka服務啟動

./kafka-server-start.sh -daemon ../config/server.properties

8、下線broker

./kafka-run-class.sh kafka.admin.shutdownbroker --zookeeper 127.0.0.1:2181 --broker #brokerid# --num.retries 3 --retry.interval.ms 60

shutdown broker

9、刪除topic

./kafka-run-class.sh kafka.admin.deletetopiccommand --topic testkj1 --zookeeper 127.0.0.1:2181

./kafka-topics.sh --zookeeper localhost:2181 --delete --topic testkj1

10、檢視consumer組內消費的offset

./kafka-run-class.sh kafka.tools.consumeroffsetchecker --zookeeper localhost:2181 --group test --topic testkj1

./kafka-consumer-offset-checker.sh --zookeeper 192.168.100.1:12181 --group group1 --topic group1

Kafka安裝與測試

tar zxf kafka 2.11 2.1.0.tgz c usr local sudo mv usr local kafka 2.11 2.1.0.tgz usr local kafka sudo chown r hadoop usr local kafka 啟動zookeeper cd usr...

kafka 集群安裝與安裝測試

一 集群安裝 解壓 tar zxvf kafka 2.9.2 0.8.1.tgz rpm ivh sbt.rpm 3.更新scala環境 sbt update sbt package sbt assembly package dependency sbt sbt dependency 不同版本命令不...

Kafka安裝測試

wget 2 上傳至linux下並解壓 3 啟動服務 3.1 啟動zookeeper 啟動zk有兩種方式,第一種是使用kafka自己帶的乙個zk。bin zookeeper server start.sh config zookeeper.properties 另一種是使用其它的zookeeper,...