ActiveMQ訊息策略

2021-09-30 13:14:58 字數 4904 閱讀 3060

activemq訊息策略

activemq中提供了眾多的「策略」(policy),它們可以在broker端為每個通道「定製」訊息的管理方式。本文將簡單描述主要的幾種policy。

一. dispatchpolcicy: **策略(topic)

此策略表明broker端訊息**給多個consumer時,訊息被傳送的順序性,這個順序通常指consumer的順序,只對topic有效,它有3種常用的型別:

1) roundrobindispatchpolicy: 「輪詢」,訊息將依次傳送給每個「訂閱者」。「訂閱者」列表預設按照訂閱的先後順序排列,在**訊息時,對於匹配訊息的第乙個訂閱者,將會被移動到「訂閱者」列表的尾部,這也意味著「下一條」訊息,將會較晚的**給它。

2) strictorderdispatchpolicy: 嚴格有序,訊息依次傳送給每個訂閱者,按照「訂閱者」訂閱的時間先後。它和roundrobin最大的區別是,沒有移動「訂閱者」順序的操作。

3) prioritydispatchpolicy: 基於「property」權重對「訂閱者」排序。它要求開發者首先需要對每個訂閱者指定priority,預設每個consumer的權重都一樣。

4) ******dispatchpolicy: 預設值,按照當前「訂閱者」列表的順序。其中prioritydispatchpolicy是其子類。

"輪詢"是比較常用的策略。

">               

二.subscriptionrecoverypolicy: 恢復策略(topic)

在非耐久訂閱者「失效」期間或乙個新的topic,broker可以保留的可追溯的訊息量。前提是topic必須是「retroactive」,我們可以在distination位址中指定此屬性,例如:"order.topic?consumer.retroactive=true"。預設情況下,訂閱者只能獲取「訂閱」開始之後的訊息,如果retroactive=true,那麼訂閱者就可以獲取其建立之前的訊息列表。此policy就是用來控制「retroactive」的訊息量。

1) fixedsizedsubscriptionrecoverypolicy: 儲存一定size的訊息,broker將為此topic開闢定額的ram用來儲存最新的訊息。

2) fixedcountsubscriptionrecoverypolicy: 儲存一定條數的訊息。

3) lastimagesubscriptionrecoverypolicy: 只保留最新的一條資料

4) querybasedsubscriptionrecoverypolicy: 符合置頂selector的訊息都將被儲存,具體能夠「恢復」多少訊息,由底層儲存機制決定;比如對於非持久化訊息,只要記憶體中還存在,則都可以恢復。

5) timedsubscriptionrecoverypolicy: 保留最近一段時間的訊息。

6) nosubscriptionrecoverypolicy: 關閉「恢復機制」。預設值。

">               

三. deadletterstrategy: 「死信」策略

broker將如何管理「死信」。當訊息過期後,或者「重發」幾次之後仍然不能被正常訊息,那麼此訊息將會被移除到deadletter佇列中。此後,我們可以通過偵聽死信佇列,來獲取相關通知或者對訊息做額外的操作。

1)  individualdeadletterstrategy: 把deadletter放入各自的死信通道中,對於queue而言,死信通道的字首預設為「activemq.dlq.queue.」,topic為「activemq.dlq.topic.」;比如佇列order,那麼它對應的死信通道為「activemq.dlq.queue.order」。我們使用「queueprefix」「topicprefix」來指定上述字首。

預設情況下,無論是topic還是queue,broker將使用queue來儲存deadleader,即死信通道通常為queue;不過開發者也可以指定為topic。

上述將佇列order**現的deadletter儲存在dlq.order中,不過此時dlq.order為topic。individualdeadletterstrategy還有乙個屬性為「usequeuefortopicmessages」,此值表示是否將topic的deadletter儲存在queue中,預設為true。 

2) shareddeadletterstrategy: 將所有的deadletter儲存在乙個共享的佇列中,這是activemq broker端預設的策略。共享佇列預設為「activemq.dlq」,可以通過「deadletterqueue」屬性來設定。

3) discardingdeadletterstrategy: broker將直接拋棄deadleatter。如果開發者不需要關心deadletter,可以使用此策略。acitvemq提供了乙個便捷的外掛程式:discardingdlqbrokerplugin,來拋棄deadletter。

對於上述三種策略,還有2個很重要的可選引數,「proces***pired」表示是否將過期訊息放入死信佇列,預設為true;「processnonpersistent」表示是否將「非持久化」訊息放入死信佇列,預設為false。

四. pendingmessagelimitstrategy: 訊息限制策略(面向slow consumer)

此策略只對topic有效,當通道中有大量的訊息積壓時,broker可以保留的訊息量。

1) constantpendingmessagelimitstrategy: 保留固定條數的訊息,如果訊息量超過limit,將使用「messageevictionstrategy」移除訊息。

">		

2) prefetchratependingmessagelimitstrategy: 保留prefetchsize倍數條訊息。

五. messageevictionstrategy: 訊息剔除策略(面向slow consumer)

配合pendingmessagelimitstrategy,只對topic有效。當pendingmessage的數量超過限制時,broker該如何剔除多餘的訊息。

1) oldestmessageevictionstrategy: 移除舊訊息,預設策略。

2) oldestmessagewithlowestpriorityevictionstrategy: 舊資料中權重較低的訊息,將會被移除。(message.getpriority())

3) uniquepropertymessageevictionstrategy: 移除具有指定property的舊訊息。開發者可以指定property的名稱。

">		

上述配置,針對sku.price通道中,只保留10000個最新的訊息,如果有新訊息繼續加入,將會移除具有price屬性的舊訊息。在每條訊息中,封裝多個商品的**列表,我們可以實現「最新商品**」的功能。 

六. slowconsumerstrategy: 慢速消費者策略

broker將如何處理慢消費者。broker將會啟動乙個後台執行緒用來檢測所有的慢速消費者,並定期關閉關閉它們。

1) abortslowconsumerstrategy: 中斷慢速消費者,慢速消費將會被關閉。

2) abortslowconsumerstrategy: 如果慢速消費者最後乙個ack距離現在的時間間隔超過閥值,則中斷慢速消費者。

七. pendingqueuemessagestoragepolicy: 待訊息訊息轉存策略

當queue中有大量的「non_persistent」訊息時,將會消耗jvm記憶體,直到oom或者達到broker端設定的systemusage。為了避免這些問題,我們需要將這些訊息「轉存」到特定的儲存器中。

1) vmqueuecursor: 將訊息轉存到基於記憶體(jvm linkelist)的儲存結構中。預設設定。有oom風險。

2) storecursor: 將訊息轉存到storeengine中,如果broker使用kahadb/leveldb,那麼訊息將會被轉存到相應的儲存引擎中。這是強烈推薦的策略,也是效率最好的策略。

3) filequeuecursor: 將訊息轉存到臨時檔案中。 

" producerflowcontrol="true" memorylimit="512mb">

八. pendingsubscribermessagestoragepolicy:(topic)

針對「非耐久」訂閱者。概念和(七)一樣,支援三種策略:storecursor, vmcursor和filecursor。

九. pendingdurablesubscribermessagestoragepolicy: (topic)

針對「耐久」訂閱者,支援三種策略:storedurablesubscribercursor, vmdurablecursor和 filedurablesubscribercursor。

" producerflowcontrol="false" memorylimit="10mb">  

ActiveMQ中訊息許可權策略

在activemq傳送訊息的時候,可以通過messageauthorizationpolicy驗證訊息是否可以傳送到訊息消費者。package org.apache.activemq.security import org.apache.activemq.broker.connectionconte...

ActiveMQ訊息佇列

這個東西沒接觸到的時候挺懵的,用過一次之後,哇哦 public class producter catch jm ception e public void sendmessage string disname,listlist else thread.sleep 1000 int num coun...

Activemq訊息型別

activemq訊息型別 jms規範中的訊息型別包括textmessage mapmessage objectmessage bytesmessage 和streammessage 等五種。activemq也有對應的實現,下面我們結合spring jms分別來看一下五種訊息型別的收發 1 textm...