docker 單kafka ,多分割槽

2022-07-28 06:57:13 字數 3487 閱讀 7867

pull wurstmeister/zookeeper

sudo docker pull wurstmeister/zookeeper
pull wurstmeister/kafka

sudo docker pull wurstmeister/kafka
啟動zookeeper

sudo docker run -d --name zookeeper -p 2181:2181 -t  wurstmeister/zookeeper
啟動kafka

sudo docker run  -d -t --name kafka -p 9092:9092 -e kafka_broker_id=0 -e kafka_zookeeper_connect=192.168.18.166:2181 -e kafka_advertised_listeners=plaintext: -e kafka_listeners=plaintext:  wurstmeister/kafka
server.properties 修改num.partitions=2 ,表示2個分割槽

num.partitions=2
重啟kafka container

product

import json

from kafka import kafkaproducer

def sendmsg(topic,msg_dict):

producer = kafkaproducer(bootstrap_servers=["192.168.18.166:9092"],value_serializer=lambda v: json.dumps(v).encode('utf-8'))

'''send json string to kafka '''

producer.send(topic, msg_dict)

producer.close()

if __name__ == '__main__':

for i in range(10):

sendmsg("peter.test1",str(str(i)+'11'))

print("over"+str(str(i)+'10'))

sendmsg("json",msg_dict)

兩個consumer指定分割槽消費,,如果不指定分割槽,則消費全部訊息

#consumer 1

from kafka import kafkaconsumer

import logging

import json

import datetime

from kafka import topicpartition

def main():

#consumer = kafkaconsumer( "peter.test_cluser",group_id="peter_consumer_cluser3", max_poll_records=5, max_poll_interval_ms=600000,

consumer = kafkaconsumer( group_id="peter_consumer_cluser1", max_poll_records=5, max_poll_interval_ms=600000,

#enable_auto_commit =false,

bootstrap_servers=["192.168.18.166:9092"], value_deserializer=json.loads)

print("start consumer",str(consumer))

consumer.assign([topicpartition('peter.test1', 0)]) # 指定topic 和指定分割槽消費

for message in consumer:

# print(str(message.offset()))

print("receive label message")

if message:

try:

print("@@@@@ ---> consumer_cluser1 get new message ",str(message.value))

#consumer.commit()

except exception as e:

logging.error("@@----> exception : ")

logging.error(e)

traceback.print_exc()

if __name__ == '__main__':

main()

consumer2

from kafka import kafkaconsumer

import logging

import json

import datetime

from kafka import topicpartition

def main():

#consumer = kafkaconsumer( "peter.test_cluser",group_id="peter_consumer_cluser3", max_poll_records=5, max_poll_interval_ms=600000,

consumer = kafkaconsumer( group_id="peter_consumer_cluser2", max_poll_records=5, max_poll_interval_ms=600000,

#enable_auto_commit =false,

bootstrap_servers=["192.168.18.166:9092"], value_deserializer=json.loads)

print("start consumer",str(consumer))

consumer.assign([topicpartition('peter.test1', 1)])# 指定topic 和指定分割槽消費

for message in consumer:

# print(str(message.offset()))

print("receive label message")

if message:

try:

print("@@@@@ ---> consumer_cluser1 get new message ",str(message.value))

#consumer.commit()

except exception as e:

logging.error("@@----> exception : ")

logging.error(e)

traceback.print_exc()

if __name__ == '__main__':

main()

kafka 修改分割槽 kafka分割槽

一 topic下引入partition的作用 topic是邏輯的概念,partition是物理的概念。為了效能考慮,如果topic內的訊息只存於乙個broker,那這個broker會成為瓶頸,無法做到水平擴充套件。kafka通過演算法盡可能的把partition分配到集群的不同伺服器上。partit...

基於docker環境搭建kafka集群(單機版)

如果沒有 zookeeper 映象,則拉去zookeeper映象 docker pull wurstmeister zookeeper拉取 kafka映象docker pull wurstmeister kafka啟動zookeeper映象 docker run d name zookeeper p...

kafka 分割槽數

kafka的分割槽,相當於把乙個topic再細分成了多個通道 對應 多個執行緒 部署的時候盡量做到乙個消費者 執行緒 對應乙個分割槽。如果你的分割槽數是n,那麼最好執行緒數也保持為n。kafkastream 它是consumer的關鍵類,提供了遍歷方法用於consumer程式呼叫實現資料的消費。其底...