五分鐘帶你玩轉rocketMQ(五)實戰廣播與集群

2021-10-01 07:24:40 字數 2986 閱讀 9990

1.集群消費方式

乙個consumergroup中的consumer例項平均分攤消費生產者傳送的訊息。例如某個topic有九條訊息,其中乙個consumer group有三個例項(可能是3個程序,或者3臺機器),那麼每個例項只消費其中的3條訊息,consumer不指定消費方式的話預設是集群消費的,適用於大部分訊息的業務

2.廣播消費方式

一條訊息被多個consumer消費,幾十這些consumer屬於同乙個consumergroup,訊息也會被consumergroup中的每個consumer消費一次,廣播消費中的consumergroup概念可以認為在訊息劃分層面沒有意義,適用於一些分發訊息的場景,比如我訂單下單成功了,需要通知財務系統,客服系統等等這種分發的場景,可以通過修改consumer中的messagemodel來設定消費方式為廣播消費

package cn.baocl.rocketmq.consumer;

import cn.baocl.rocketmq.processor.mqconsumemsglistenerprocessor;

import com.alibaba.rocketmq.client.consumer.defaultmqpushconsumer;

import com.alibaba.rocketmq.client.exception.mqclientexception;

import com.alibaba.rocketmq.common.consumer.consumefromwhere;

import com.alibaba.rocketmq.common.protocol.heartbeat.messagemodel;

import org.slf4j.logger;

import org.slf4j.logge***ctory;

import org.springframework.beans.factory.annotation.autowired;

import org.springframework.beans.factory.annotation.value;

import org.springframework.boot.springbootconfiguration;

import org.springframework.context.annotation.bean;

import org.springframework.util.stringutils;

@springbootconfiguration

public class mqconsumerconfiguration ")

private string namesrvaddr;

@value("$")

private string groupname;

@value("$")

private int consumethreadmin;

@value("$")

private int consumethreadmax;

@value("$")

private string topics;

@value("$")

private int consumemessagebatchmaxsize;

@autowired

private mqconsumemsglistenerprocessor mqmessagelistenerprocessor;

@bean

public defaultmqpushconsumer testrocketmqconsumer() throws exception

if (stringutils.isempty(namesrvaddr))

if(stringutils.isempty(topics))

defaultmqpushconsumer consumer = new defaultmqpushconsumer(groupname);

consumer.setnamesrvaddr(namesrvaddr);

consumer.setconsumethreadmin(consumethreadmin);

consumer.setconsumethreadmax(consumethreadmax);

consumer.registermessagelistener(mqmessagelistenerprocessor);

/*** 設定consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費

* 如果非第一次啟動,那麼按照上次消費的位置繼續消費

*/consumer.setconsumefromwhere(consumefromwhere.consume_from_last_offset);

/*** 設定消費模型,集群還是廣播,預設為集群

*///廣播

consumer.setmessagemodel(messagemodel.broadcasting);

//集群

/*** 設定一次消費訊息的條數,預設為1條

*/consumer.setconsumemessagebatchmaxsize(consumemessagebatchmaxsize);

try

consumer.start();

logger.info("consumer is start !!! groupname:{},topics:{},namesrvaddr:{}",groupname,topics,namesrvaddr);

}catch (mqclientexception e),topics:{},namesrvaddr:{}",groupname,topics,namesrvaddr,e);

throw new exception(e);

}return consumer;

}}

//廣播 

consumer.setmessagemodel(messagemodel.broadcasting);

//集群

五分鐘帶你玩轉mybatis(二)常用標籤

open 以什麼開始 close 以什麼結束 separator 分隔符 collection list名稱 item index名稱 引用sql標籤 typename sort 在set時候省略最後乙個符號 code itemname criteria prefix 字首覆蓋並增加其內容 suff...

五分鐘玩轉git

許多人認為git太混亂,或認為它是一種複雜的版本控制系統,其實不然,這篇文章有助於大家快速上手使用git。使用git前,需要先建立乙個倉庫 repository 您可以使用乙個已經存在的目錄作為git倉庫或建立乙個空目錄。使用您當前目錄作為git倉庫,我們只需使它初始化。git init使用我們指定...

來吧,1分鐘帶你玩轉Kafka

摘要 kafka讓人又愛又恨?來吧,一分鐘帶你玩轉它 說起kafka,許多使用者對它是又愛又恨。kafka是一種分布式的 基於發布 訂閱的訊息系統,其極致體驗讓人欲罷不能,但操心的運維 複雜的安全策略 可靠性易用性的缺失 算不上極致的效能發揮 並不豐富的訊息服務功能,仍需要使用者付出諸多的背後工作。...