RocketMq傳送訊息Producer配置

2021-10-05 16:13:25 字數 1774 閱讀 2195

在使用阿里的rocketmq傳送資料時,大資料量,多執行緒情況下,會產生並非網路原因的傳送失敗.故使用以下傳送方式,方法是採用內部靜態型別的單例模式.

@component

public class producerbeansingleton ")

private string producerid;

@value("$")

private string accesskey;

@value("$")

private string secretkey;

@value("$")

private string onsaddr;

private static producer producer;

private static class singletonholder

private producerbeansingleton (){}

public static final producerbeansingleton getinstance()

@postconstruct

public void init()

public producer getproducer()

public mapsendmsg2mq(string topic, string tag, string object,

string key) throws interruptedexception ,tag={},key={},object={}", topic, tag, key,object);

if (!isstartmq)

mapresult = new hashmap<>();

//producer init = init();

message msg = null;

try else

// 非同步傳送訊息, 傳送結果通過 callback 返回給客戶端。

// producer.sendasync(msg, new sendcallback() ,tag={},

key={},msgid={}", topic, tag, key, sendresult.getmessageid());

//// }

//// @override

// public void onexception(onexceptioncontext context) ,tag={},key={},msgid={}", topic, tag, key, context.getmessageid());

// }

// });

mapresultmap=new hashmap<>();

resultmap.put("msgid",sendresult.getmessageid());

return responseinfo.dook("傳送成功!");

} catch (onsclientexception e) ,topic={},tag={},msgid={}", e.getmessage(), topic, tag, msg.getmsgid());

return responseinfo.dofail(e);

} catch (exception e) ", e.getmessage());

return responseinfo.dofail(e);

}}

RocketMQ 訊息傳送

訊息傳送基本流程 1 訊息驗證 驗證主題 topic 訊息體不能為空和大小不能超過4m。2 路由查詢 a 檢視快取,是否有topic的路由資訊。b 如果沒有則到nameserver中獲取路由資訊,如果快取內能找到則獲取相應路由資訊。c 從快取中獲取上一次異常的broker節點資訊,跟獲取到的節點資訊...

RocketMQ訊息順序傳送和消費問題

事故現場分析 由於創新業務產品上線,運營產品想通過一些活動來刺激使用者,採用註冊邀請機制即可獲取積分的相關活動。考慮到後續可能還有其他可能的活動來發放積分,所以設計的時候,採用mq訊息模式發放積分,非同步解耦,並能夠保證資料的最終一致性。考慮到使用者積分計算的時候可能存在併發操作的情況,想到兩種解決...

RocketMQ(04) 傳送順序訊息

如果你的業務上對訊息的傳送和消費順序有較高的需求,那麼在傳送訊息的時候你需要把它們放到同乙個訊息佇列中,因為只有同乙個佇列的訊息才能確保消費的順序性。下面 我們在傳送訊息的時候,呼叫的是需要傳遞messagequeueselector的send 該方法還可以傳遞乙個額外的引數,其對應messageq...