關於golang接入Kafka訊息佇列的記錄

2021-09-26 01:52:02 字數 1850 閱讀 5522

version: '2'

services:

zookeeper:

image: wurstmeister/zookeeper

ports:

- "2181:2181"

kafka:

build: .

image: wurstmeister/kafka

volumes:

- /var/run/docker.sock:/var/run/docker.sock

ports:

- "9092:9092"

- "29092:29092"

environment:

kafka_zookeeper_connect: zookeeper:2181

kafka_listener_security_protocol_map: plaintext:plaintext,plaintext_host:plaintext

kafka_listeners: plaintext://:9092,plaintext_host://:29092

kafka_advertised_listeners: plaintext://kafka:9092,plaintext_host://localhost:29092

kafka-manager:

image: sheepkiller/kafka-manager

environment:

zk_hosts: zookeeper

ports:

- "9000:9000"

開發實施:

publisher 資訊發布模組

參考sarama的example, 操作的簡單程度與將key/value鍵值對放到redis類似。若沒有特殊要求, 以下的簡單配置便能滿足生產需求:

[kafka]

brokers = ["***.***.***.***:9092,"***.***.***.***:9092","***.***.***.***:9092"]

verbose = true

max_retry = 10

flush_frenquency = 500

topic = "***x"

publisher可分布式多例項部署和執行,kafka會處理多例項寫入問題。

consumer 消費者模組

參考sarama consumer group的example,sarama提供kafka consumer group的介面, 將example改為從配置檔案讀入, 關注consumer group的配置即可。

brokers =["***.***.***.***:9092,"***.***.***.***:9092","***.***.***.***:9092"]

group = "***"

topics = ["***"] # topic to be subscribed

clientid = "***"

verbose = true # debug info output

oldest = true # read from oldest log

注意:

關於kafka connect

kafka connect 可以幫助使用者將既有系統的資料轉換成資料流。如mysql -> kafka

關於kafka的使用場景

Elk接入Kafka資料線上實戰

線上部署配置 問題和錯誤 此篇文章記錄個人使用大資料開源框架elk,實際應用於公司日誌處理專案 logstash是資料收集系統。主要是用來日誌的蒐集 分析 過濾日誌的工具,支援大量的資料獲取方式。可以部署成c s架構,client端安裝在需要收集日誌的主機上,server端負責將收到的各節點日誌進行...

Kafka 集群 Golang 應用例項

專案見 kafka cluster example 這個例項做了些什麼?搭建了擁有 3 節點 kafka 3 節點 zookeeper 的 docker 集群服務 分別建立了 1 個訊息發布者和 2 個相同消費組的訊息訂閱者的 docker 應用 使用ab進行併發測試,驗證該例項訊息的訂閱 發布功能...

Golang之傳送訊息至kafka

3 重新命名conf zoo sample.cfg 為conf zoo.cfg 4 編輯 conf zoo.cfg,修改datadir d zookeeper 3.3.6 data 4 執行bin zkserver.cmd 啟動結果如下 2 開啟config目錄下的server.properties...