kafka集群安裝部署及常用命令

2021-08-14 05:53:52 字數 4032 閱讀 6069

安裝前的準備工作(zk集群已經部署完畢)

l 關閉防火牆

chkconfig iptables off  && setenforce 0

l 建立使用者

groupadd realtime &&useradd realtime

&& usermod -a -g realtime realtime

l 建立工作目錄並賦權

mkdir /export

mkdir /export/servers

chmod 755 -r /export

l 切換到realtime使用者下

su realtime

在linux中使用

wget

wget

安裝包 請參見

資料-安裝包

5.3.2 解壓安裝包

tar -zxvf kafka_2.11-1.0.0.tgz -c  /export/servers/

cd /export/servers/

mv kafka_2.11-1.0.0 kafka

5.3.3 修改配置檔案

cp   /export/servers/kafka/config/server.properties

/export/servers/kafka/config/server.properties.bak

vi  /export/servers/kafka/config/server.properties

輸入以下內容:

#broker的全域性唯一編號,不能重複

broker.id=0

#用來監聽鏈結的埠,

producer

或consumer

將在此埠建立連線

port=9092

#處理網路請求的執行緒數量

num.network.threads=3

#用來處理磁碟

io的執行緒數量

num.io.threads=8

#傳送套接字的緩衝區大小

socket.send.buffer.bytes=102400

#接受套接字的緩衝區大小

socket.receive.buffer.bytes=102400

#請求套接字的緩衝區大小

socket.request.max.bytes=104857600

#kafka執行日誌存放的路徑

log.dirs=/export/servers/logs/kafka

#topic在當前

broker

上的分片個數

num.partitions=2

#用來恢復和清理

data

下資料的執行緒數量

num.recovery.threads.per.data.dir=1

#segment檔案保留的最長時間,超時將被刪除

log.retention.hours=168

#滾動生成新的

segment

檔案的最大時間

log.roll.hours=168

#日誌檔案中每個

segment

的大小,預設為

1g log.segment.bytes=1073741824

#週期性檢查檔案大小的時間

log.retention.check.interval.ms=300000

#日誌清理是否開啟

log.cleaner.enable=true

#broker需要使用

zookeeper

儲存meta

資料 zookeeper.connect=192.168.52.106:2181,192.168.52.107:2181,192.168.52.108:2181

#zookeeper鏈結超時時間

zookeeper.connection.timeout.ms=6000

#partion buffer中,訊息的條數達到閾值,將觸發

flush

到磁碟 log.flush.interval.messages=10000

#訊息buffer

的時間,達到閾值,將觸發

flush

到磁碟 log.flush.interval.ms=3000

#刪除topic

需要server.properties

中設定delete.topic.enable=true

否則只是標記刪除

delete.topic.enable=true

#此處的

host.name

為本機ip(

重要),

如果不改

,則客戶端會丟擲

:producer connection to localhost:9092 unsuccessful 錯誤!

host.name=kafka01

5.3.4 分發安裝包

scp -r /export/servers/kafka  kafka02:/export/servers

然後分別在各機器上建立軟連

cd /export/servers/

scp -r /export/servers/kafka  kafka03:/export/servers

然後分別在各機器上建立軟連

cd /export/servers/

5.3.5 再次修改配置檔案(重要)

依次修改各伺服器上配置檔案的的broker.id,分別是

0,1,2

不得重複。

5.3.6 啟動集群

依次在各節點上啟動kafka

nohup /export/servers/kafka/bin/kafka-server-start.sh /export/servers/kafka/config/server.properties >/dev/null 2>&1 &

輸出錯誤日誌到黑洞

command >/dev/null 2>&1 & 

l 檢視當前伺服器中的所有topic

bin/kafka-topics.sh --list --zookeeper  zk01:2181

l 建立topic

bin/kafka-topics.sh --create --zookeeper zk01:2181

--replication-factor 1 --partitions 1 --topic test

l 刪除topic

bin/kafka-topics.sh --delete --zookeeper zk01:2181 --topic test

需要server.properties中設定

delete.topic.enable=true

否則只是標記刪除或者直接重啟。

l 通過shell命令傳送訊息

bin/kafka-console-producer.sh --broker-list kafka01:9092 --topic test

l 通過shell消費訊息

bin/kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning --topic test

l 檢視消費位置

bin/kafka-run-class.sh kafka.tools.consumeroffsetchecker --zookeeper zk01:2181 --group testgroup

l 檢視某個topic的詳情

bin/kafka-topics.sh --topic test --describe --zookeeper zk01:2181

l 對分割槽數進行修改

bin/kafka-topics.sh --zookeeper  zk01 --alter --partitions 2 --topic test

kafka集群安裝部署

wget 2.tar zxvf kafka 2.12 2.1.0.tgz 解壓安裝包 3.備份kafka zookeeper配置檔案 cp server.properties server.properties.bak cp zookeeper.properties zookeeper.proper...

kafka集群安裝部署

kakfa集群部署 實驗室3臺機器 34,35,36 1 到官網 2 解壓到36伺服器 cd utxt soft qydx tar zxvf kafka 2.12 1.1.0.tgz 3 cd kafka 2.12 1.1.0 config 4 修改配置檔案 vi server.properties...

安裝部署Kafka集群

kafka是乙個開源的分布式訊息訂閱系統 訊息中介軟體 安裝過程 2.上傳至 usr local src 3.解壓縮,並且移動到上級目錄 4.進入主目錄的config子目錄,5.修改server.properties配置檔案 vim server.properties內容如下 6.儲存並退出 7.主...