阿里RoceketMQ使用

2021-10-01 17:50:05 字數 3657 閱讀 7729

#該應用是否啟用生產者

rocketmq:

producer:

isonoff: on

#傳送同一類訊息的設定為同乙個group,保證唯一,預設不需要設定,rocketmq會使用ip@pid(pid代表jvm名字)作為唯一標示

#mq的nameserver位址

namesrvaddr: 127.0.0.1:9876

#訊息最大長度 預設1024*4(4m)

maxmessagesize: 4096

#傳送訊息超時時間,預設3000

sendmsgtimeout: 3000

#傳送訊息失敗重試次數,預設2

retrytimeswhensendfailed: 2

###consumer

##該應用是否啟用消費者

consumer:

isonoff: on

#mq的nameserver位址

namesrvaddr: 127.0.0.1:9876

#該消費者訂閱的主題和tags("*"號表示訂閱該主題下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*;

topics: futaotopic~*;

consumethreadmin: 20

consumethreadmax: 64

#設定一次消費訊息的條數,預設為1條

consumemessagebatchmaxsize: 1

#失敗重試次數

reconsumertimes: 3

/**

* @program: wx-job

* @description: roceketmq

* @author: pengyd

* @create: 2019-10-10 14:44

**/@configuration

@slf4j

public class mqproducerconfig ")

private string producergroupname;

@value("$")

private string producernamesrvaddr;

@value("$")

private int maxmessagesize;

@value("$")

private int sendmsgtimeout;

@value("$")

private int retrytimeswhensendfailed;

@bean

public defaultmqproducer producer()

if (this.producernamesrvaddr.isempty())

defaultmqproducer defaultmqproducer = new defaultmqproducer(producergroupname);

defaultmqproducer.setnamesrvaddr(producernamesrvaddr);

defaultmqproducer.setmaxmessagesize(maxmessagesize);

defaultmqproducer.setsendmsgtimeout(sendmsgtimeout);

defaultmqproducer.setvipchannelenabled(false);

//訊息傳送到mq伺服器失敗重試次數

defaultmqproducer.setretrytimeswhensendfailed(retrytimeswhensendfailed);

try ,producergroupname:{}", producernamesrvaddr, producergroupname);

} catch (exception e) ", e.getmessage(), e);

}return defaultmqproducer;

}}

/**

* @program: wx-job

* @description: roceketmq

* @author: pengyd

* @create: 2019-10-10 14:44

**/@configuration

@slf4j

public class mqconsumerconfig ")

private string consumernamesrvaddr;

@value("$")

private string consumergroupname;

@value("$")

private int consumethreadmin;

@value("$")

private int consumethreadmax;

@value("$")

private string topics;

@value("$")

private int consumemessagebatchmaxsize;

private static final string topic = "test";

private static final string tag = "test";

@bean

public defaultmqpushconsumer consumer()

if (this.consumernamesrvaddr.isempty())

if (this.topics.isempty())

try ", msg);

system.out.println(consumeconcurrentlystatus.consume_success);

} else

//todo("開始正常的業務邏輯")

system.out.println("開始正常的業務邏輯:" + new string(msg.getbody(), standardcharsets.utf_8));

}return consumeconcurrentlystatus.consume_success;

}});

defaultmqpushconsumer.start();

log.info("rocketmq consumer start success; namesrvaddr:{},groupname:{},topics:{}", consumernamesrvaddr, consumergroupname, topics);

return defaultmqpushconsumer;

} catch (exception e) ", e.getmessage(), e);

return new defaultmqpushconsumer();

}}

/**

* filename: mqcontroller

* author: pengyd

* date: 2019/12/26

* function:

*/@controller

public class mqcontroller catch (exception e)

return "success";

}}

RoceketMQ 普通資訊

rocketmq有2種常見的消費模式,分別是defaultmqpushconsumer和defaultmqpullconsumer模式。兩種模式其本質都是拉取訊息,只是實現機制不一樣。defaultmqpushconsumer 推薦使用 consumer向broker發出請求,保持了一種長鏈結,br...

阿里雲ECS使用

按流量計費設定使用ali映象源 使用 epel 庫安裝 docker rpm elvm2 7 2.02 105 14 el7.x86 64 curl ssl sh 配置docker加速器 您可以使用如下的指令碼將mirror的配置新增到docker daemon的啟動引數中。系統要求 centos ...

阿里雲 簡單使用

1.遠端登入 ssh 使用者名稱 id 2.檔案遠端傳輸 scp 本機檔案位址 使用者名稱 ip 遠端儲存的路徑scp index.html root 193.188.88.888 var www wwscp node v12.18.3 linux x64.tar.xz jacky 193.188....