RocketMQ(04) 傳送順序訊息

2021-10-02 07:26:40 字數 1556 閱讀 3773

如果你的業務上對訊息的傳送和消費順序有較高的需求,那麼在傳送訊息的時候你需要把它們放到同乙個訊息佇列中,因為只有同乙個佇列的訊息才能確保消費的順序性。下面**我們在傳送訊息的時候,呼叫的是需要傳遞messagequeueselector的send(),該方法還可以傳遞乙個額外的引數,其對應messagequeueselector的select()的最後乙個引數。下面**中我們一共傳送了10條訊息,從1開始算順序為奇數的都放到第乙個佇列中,順序為偶數的都放第二個佇列中。所以最終第乙個佇列放了順序號為1/3/5/7/9的訊息,第二個佇列中放了順序號為2/4/6/8/10的訊息。

@test

public

void

testordersend()

throws exception

}, i)

; assert.

asserttrue

(sendresult.

getsendstatus()

== sendstatus.send_ok);}

producer.

shutdown()

;}

在消費的時候如果需要確保佇列中的訊息是按照順序消費的,註冊訊息***時不能再選擇併發消費的messagelistenerconcurrently,而需要選擇按順序消費的messagelistenerorderly。按順序消費時每個執行緒會鎖定當前佇列,只有一條訊息消費完了才會釋放鎖,這樣確保同一佇列同時只能有乙個執行緒消費一條訊息。而併發消費時是不會一直鎖佇列的。有序消費時同乙個佇列裡面的訊息會按照順序進行消費,但是它們可能被不同的執行緒消費。如訊息的順序是1/2/3/4/5/6,則按照順序消費可以保證訊息的消費順序一定是1/2/3/4/5/6,但是消費它們的執行緒有可能是執行緒6/5/4/3/2/1。如果要保證有序的消費是在同乙個執行緒完成的,則消費者執行緒只能有乙個,可以通過setconsumethreadmax()定義消費執行緒的最大數,可以通過setconsumethreadmin設定消費者執行緒的最小數。下面的**中就定義了按照順序進行訊息的消費。

@test

public

void

testorderconsume()

throws exception })

; consumer.

start()

; timeunit.seconds.

sleep

(120);

consumer.

shutdown()

;}

順序訊息消費時返回的consumeorderlystatus只能是success和suspend_current_queue_a_moment。success表示消費成功,可以繼續消費下一條,而suspend_current_queue_a_moment表示消費失敗,需要等待一下再繼續消費。它不能像併發消費那樣跳過消費失敗的訊息,因為那樣就破壞了訊息消費的順序性。

(注:本文是基於rocketmq4.5.0所寫)

RocketMQ 04 簡單運維

mqadmin updatetopic b localhost 10911 t topiccmd.mqadmin deletetopic n localhost 9876 c localhost 10911 t topiccmd.mqadmin topiclist n localhost 9876....

04 傳送訊息和對話方塊

一.傳送訊息 1.sendmessage 呼叫視窗過程函式,並且等待視窗過程函式處理完畢才返回。2.postmessage 將訊息投遞到訊息佇列,不等待處理,立即返回。3.自定義訊息 大於等於wm user 二.對話方塊 1.模態對話方塊 1.寫法 1.設計對話方塊資源模板 2.實現對話方塊過程函式...

八 傳送郵件

傳送郵件 如下 from email.mime.text import mimetext from email.header import header import smtplib import datetime from poseidon.myutil import myutil import ...