golang實現kafka的訊息推送

2022-08-11 07:09:13 字數 4555 閱讀 8344

kafka中涉及的名詞

訊息記錄:由乙個key,乙個value和乙個時間戳構成,訊息最終儲存在主題下的分割槽中,記錄在生產中稱為生產者記錄,在消費者中稱為消費記錄。kafka集群保持了所有發布的訊息,直到它們過期,無論訊息是否被消費了,在乙個可配置的時間段內,kafka集群保留了所有發布的訊息。比如訊息的儲存策略被設定為2天,那麼在乙個訊息被發布的兩天時間內,它都是可以被消費的。kafka的效能是和資料量無關的常量級的,所以保留太多資料並不是問題

生成者:生產者用於發布訊息

消費者:消費者用於訂閱訊息

消費者組:相同的groupid的消費者將視為同乙個消費者組,每個消費者都需要設定乙個組id,每條訊息只能被consumer group中的乙個consumer消費,但是可以被多個consumer group消費

主題(topic):訊息的一種邏輯分組,用於對訊息分門別類,每一類訊息稱之為乙個主題,相同主題的訊息放在乙個佇列中

分割槽(partition):訊息的一種物理分組,乙個主題被拆成多個分割槽,每乙個分割槽就是乙個順序的,不可變的訊息佇列,並且可以持續新增,分割槽中的每個訊息都被分配了乙個唯一的id,稱之為偏移量(offset),在每個分割槽中偏移量都是唯一的。每個分割槽對應乙個邏輯log,有多個segment組成

偏移量:分割槽中每個訊息都有乙個唯一的id,稱之為偏移量,代表已經消費的位置

**(broker):一台kafka伺服器稱之為乙個broker

副本(replica):副本只是乙個分割槽(partition)的備份。副本不讀取或寫入資料。它們用於防止資料丟失

領導者:leader是負責給定分割槽的所有讀取和寫入的節點

追隨者:跟隨領導者指令的節點被稱為follower。

zookeeper:kafka**是無狀態的,所以它們使用zookeeper來維護它們的集群狀態。zookeeper用於管理和協調kafka**

kafka功能

一. mac版安裝

brew install kafka
安裝kafka需要依賴zookeeper的,所以安裝kafka的時候也會包含zooker

server.properties中重要配置

broker.id=0

listeners=plaintext://:9092

advertised.listeners=plaintext:

log.dirs=/usr/local/var/lib/kafka-logs

zookeeper.properties重要配置

datadir=/usr/local/var/lib/zookeeper

clientport=2181

maxclientcnxns=0

二. 啟動zookeeper
新建立終端啟動zookeeper

cd /usr/local/cellar/kafka/2.1.0

./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

列印台顯示:info reading configuration from: /usr/local/etc/kafka/zookeeper.properties (org.apache.zookeeper.server.quorum.quorumpeerconfig)

...即是啟動成功

三.啟動kafka
新建立終端啟動kafka(啟動kafka之前必須先啟動zookeeper)

cd /usr/local/cellar/kafka/2.1.0

./bin/kafka-server-start /usr/local/etc/kafka/server.properties

列印台顯示:info registered kafka:type=kafka.log4jcontroller mbean (kafka.utils.log4jcontrollerregistration$)

...即啟動成功

啟動了kafka之後,zookeeper端會報一些error:keepererrorcode = nonode for /config/topics/test之類的錯誤,這個是沒有問題的,這是因為kafka向zookeeper傳送了關於該路徑的一些請求資訊,但是不存在,所以這是沒有問題的

四.建立topic
新建立終端

cd /usr/local/cellar/kafka/2.1.0

建立乙個名為「test」的主題:./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

檢視所有的topic:./bin/kafka-topics --list --zookeeper localhost:2181

檢視某個topic的資訊,比如test:./bin/kafka-topics --describe --zookeeper localhost:2181 --topic test

五.傳送訊息
新建立乙個終端,作為生產者,用於傳送訊息,每一行就是一條資訊,將訊息傳送到kafka伺服器

cd /usr/local/cellar/kafka/2.1.0

./bin/kafka-console-producer --broker-list localhost:9092 --topic test

send one message

send two message

六.消費訊息(接受訊息)
新建立乙個終端作為消費者,接受訊息

cd /usr/local/cellar/kafka/2.1.0

./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning

send one message

send two message(這些便是從生產者獲得的訊息)

注意:傳送訊息與接受訊息必須啟動kafka與zookeeper生產者
import (

//構建傳送的訊息,

msg := &sarama.producermessage

var value string

var msgtype string

for

fmt.scanf("%s",&msgtype)

fmt.println("msgtype = ",msgtype,",value = ",value)

msg.topic = msgtype

//將字串轉換為位元組陣列

msg.value = sarama.byteencoder(value)

//sendmessage:該方法是生產者生產給定的訊息

//生產成功的時候返回該訊息的分割槽和所在的偏移量

//生產失敗的時候返回error

partition, offset, err := producer.sendmessage(msg)

if err != nil

fmt.printf("partition = %d, offset=%d\n", partition, offset)

}}

消費者
import (

//partitions(topic):該方法返回了該topic的所有分割槽id

partitionlist, err := consumer.partitions("test")

if err != nil

for partition := range partitionlist

defer pc.asyncclose()

wg.add(1)

go func(sarama.partitionconsumer)

}(pc)

} wg.wait()

consumer.close()

}

流量削峰在訊息佇列中也是常用場景,一般在秒殺或**活動中使用比較廣泛。當流量太大的時候達到伺服器瓶頸的時候可以將事件放在kafka中,下游伺服器當接收到訊息的時候自己去消費,有效防止伺服器被擠垮

訊息佇列一般都內建了高效的通訊機制,因此也可以用在純的訊息通訊中,比如客戶端a跟客戶端b都使用同一佇列進行訊息通訊,客戶端a,客戶端b,客戶端n都訂閱了同乙個主題進行訊息發布和接受不了實現類似聊天室效果

參考**

如何基於sqlite實現kafka延時訊息詳解

目錄 延時訊息 或者說定時訊息 是業務系統裡乙個常見的功能點。常用業務場景如 1 訂單超時取消 2 離線超過指定時間的使用者,召回通知 3 手機消失多久後通知監護人 現流行的實現方案主要有 1 資料庫定時輪詢,掃瞄到達到延時時間的記錄,業務處理,刪除該記錄 2 jdk 自帶延時佇列 delayque...

kafka 檢視待消費資料 kafka檢視消費資料

kafka檢視消費資料 一 如何檢視 在老版本中,使用kafka run class.sh 指令碼進行檢視。但是對於最新版本,kafka run class.sh 已經不能使用,必須使用另外乙個指令碼才行,它就是kafka consumer groups.sh 普通版檢視所有組 要想查詢消費資料,必...

kafka 檢視待消費資料 kafka檢視消費資料

一 如何檢視 在老版本中,使用kafka run class.sh 指令碼進行檢視。但是對於最新版本,kafka run class.sh 已經不能使用,必須使用另外乙個指令碼才行,它就是kafka consumer groups.sh 普通版檢視所有組 要想查詢消費資料,必須要指定組。那麼線上執行...