go 實現 kafka 訊息傳送 接收

2021-09-11 00:21:28 字數 4804 閱讀 5257

kafka是訊息中介軟體的一種,是一種分布式流平台,是用於構建實時資料管道和流應用程式。具有橫向擴充套件,容錯,wicked fast(**快)等優點。

生產者(producer)將訊息記錄(record)傳送到kafka中的主題中(topic), 乙個主題可以有多個分割槽(partition), 訊息最終儲存在分割槽中,消費者(consumer)最終從主題的分割槽中獲取訊息。

本文主要針對mac系統進行的操作。

安裝

brew install kafka
kafka的安裝目錄:/usr/local/cellar/kafka

kafka的配置檔案目錄:/usr/local/etc/kafka

kafka服務的配置檔案:/usr/local/etc/kafka/server.properties

zookeeper配置檔案: /usr/local/etc/kafka/zookeeper.properties

#修改server.properties

vim /usr/local/etc/kafka/server.properties

#增加一行配置

listeners=plaintext://localhost:9092

啟動 zookeeper
# 新起乙個終端啟動zookeeper

zkserver start

或者

cd  /usr/local/cellar/kafka/2.1.0

# 新起乙個終端啟動zookeeper

./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

啟動 kafka
cd  /usr/local/cellar/kafka/2.1.0

# 新起乙個終端啟動zookeeper

./bin/kafka-server-start /usr/local/etc/kafka/server.properties

kafka 服務也可以很優雅的進行關閉,首先要把server配置檔案新增如下項:

vim /usr/local/etc/kafka/server.properties

#新增一行

ontrolled.shutdown.enable=true

然後就可以通過bin目錄下 zookeeper-server-stop.sh 關閉 kafka 服務了。

當 kafka 在啟動過程**現問題的時候,可以嘗試採用以下的操作:

1、到 /usr/local/var/lib 目錄下刪除 kafka-logs 目錄

2、重啟 kafka

當 zookeeper 和 kafka 完成啟動後,可以在命令終端輸入以下命令:

jps
可以看到如下內容,說明啟動成功。

建立 topic

# 建立乙個名為「test」的主題,該主題有1個分割槽

./bin/kafka-topics --create \

--zookeeper localhost:2181 \

--partitions 1 \

--replication-factor 1 \

--topic test

如果分割槽配置錯誤,可以進行下述操作進行刪除:

# 刪除分割槽

./bin/kafka-topics --create \

--zookeeper localhost:2181 \

--partitions 1 \

--replication-factor 1 \

--topic test \

--delete-config

刪除 topic的時候,首先要把 server 配置檔案新增如下項:

vim /usr/local/etc/kafka/server.properties

#新增一行

delete.topic.enable=true

#然後可以執行

./bin/kafka-topics --delete --topic test

檢視 topic
# 建立成功可以通過 list 列舉所有的主題

./bin/kafka-topics --list --zookeeper localhost:2181

# 檢視某個主題的資訊

./bin/kafka-topics --describe --zookeeper localhost:2181 --topic

生產訊息(傳送訊息)
# 新起乙個終端,作為生產者,用於傳送訊息,每一行算一條訊息,將訊息傳送到kafka伺服器

cd /usr/local/cellar/kafka/2.1.0

./bin/kafka-console-producer --broker-list localhost:9092 --topic test

> this is a message

消費訊息(接收訊息)
# 新起乙個終端作為消費者,接收訊息

cd /usr/local/cellar/kafka/2.1.0

./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning

this is a message

在生產者傳送訊息

在生產訊息(傳送訊息)中新起的終端屬於一條訊息(任意字元),輸入完回車就算一條訊息,可以看到在步驟7中的消費者端就會顯示剛才輸入的訊息。準備

生產者**

producer.go

var address = string

func main()

//同步訊息模式

func syncproducer(address string)

defer p.close()

topic := "test"

srcvalue := "sync: this is a message. index=%d"

for i:=0; i<10; i++

part, offset, err := p.sendmessage(msg)

if err != nil else

time.sleep(2*time.second) }}

func saramaproducer() , config)

if e != nil

defer producer.asyncclose()

//迴圈判斷哪個通道傳送過來資料.

fmt.println("start goroutine")

go func(p sarama.asyncproducer)

} }(producer)

var value string

for i:=0;;i++

//將字串轉化為位元組陣列

msg.value = sarama.byteencoder(value)

//使用通道傳送

producer.input()

}}

消費者**

consumer.go

func main()  

var wg = &sync.waitgroup{}

wg.add(2)

//廣播式消費:消費者1

go clusterconsumer(wg, address, topic, "group-1")

//廣播式消費:消費者2

go clusterconsumer(wg, address, topic, "group-2")

wg.wait()

}// 支援brokers cluster的消費者

func clusterconsumer(wg *sync.waitgroup,brokers, topics string, groupid string)

defer consumer.close()

// trap sigint to trigger a shutdown

signals := make(chan os.signal, 1)

signal.notify(signals, os.interrupt)

// consume errors

go func()

}()// consume notifications

go func()

}()// consume messages, watch signals

var successes int

loop:

for

case

break loop

} }fmt.fprintf(os.stdout, "%s consume %d messages \n", groupid, successes)

}

流程說明:

注意 topic 的名字要與 producer.go 和 consumer.go 一致。

kafka 訊息傳送和接收

傳送 例項 public class kafkaproducerdemo extends thread override public void run else catch interruptedexception e catch executionexception e num try catc...

通過kafka傳送和接收訊息

生產者配置類 configuration enablekafka public class kafkaproducerconfig private string address value private string batchsize value private string linger pu...

接收kafka訊息

kafka server 127.0.0.1 8081,127.0.0.1 8082,127.0.0.1 8083,127.0.0.1 8084 topics eseal hr test 2 gourp id hthr value string servers value string groupi...