ActiveMQ高階原理

2021-10-04 17:54:21 字數 3328 閱讀 5955

1.應答模式

通過連線建立session時設定

// a是boolean型別,設定是否開啟事務。如果為true則第二個引數設定無效,應答模式自動為session.session_transacted

// b是應答模式

connection.

createsession

(a,b)

應答模式有:session.auto_acknowledge 自動ack(確認)

session.client_acknowledge 客戶端ack

session.session_transacted 事務

以消費端舉例:

如果為auto_acknowledge,則客戶端接收到訊息就自動ack了,不管消費端處理過程中是否會問題。

如果為client_acknowledge,則需要客戶端處理訊息後通過textmessage.acknowledge()方法反饋ack,否則broker則認為當前訊息沒有被消費。

訊息被ack後,才會認為被成功訊息,broker才會刪除訊息;如果消費了不ack則會重複消費

事務:在傳送時,訊息中會帶有transaction id,broker收到後會將訊息放到 trsancation store中,只有事務提交了,消費者才能消費訊息。

2.訊息持久化

單機的activemq預設是用的kahdb儲存,集群模式下可用leveldb或者共享儲存目錄(nfs).

一般我們在實際應用中都是會持久化的,在producer設定

producer.

setdeliverymode

(deliverymode.persistence)

// 不持久化 deliverymode.non_persistence

3.訊息同步與非同步傳送

預設情況下:持久化訊息是同步傳送(broker會響應ack),非持久化訊息是非同步傳送(存在丟訊息的風險)。

非同步我們都是同步傳送,也就是預設的。

修改為非同步傳送:

1.在broker連線後面加上引數,比如:

tcp://localhost:61616?jms.useasyncsend=true
2.也可在連線工廠或者連線上設定,比如:

((activemqconnection)connection).setuserasyncsend(true)
4.生產者流量控制

此引數的意思是:producer允許的最大未被ack的訊息,比如我設定為10k,producer已經傳送了10條1k的訊息,都還沒收到broker的ack影響,此時producer就阻塞不能傳送了,只有等到broker 響應ack後 才能繼續傳送了。

因為同步傳送是等待ack的,所以這個引數對非同步傳送才有意義,用來約束在非同步傳送時producer允許非同步傳送(未ack)的訊息最大尺寸

producerwindowsize值越大則越消耗broker的記憶體,因為沒有持久化的訊息肯定是存在內容的。

設定方式:

activemqconnectionfactory.

setproducerwindowsize()

;

tcp://localhost:61616?jms.producerwindowsize=1048576
還可以為每個queue設定:

test?producer.windowsize=1048576
5.消費者 prefetch機制

prefetch:設定broker一次批量推送多少條給consumer,此值預設是1000,如果消費者消費很快,再結合optimizeack,則將大大提高效率。這個引數只對queue有用。

如果consumer數量較多,或者消費速度較慢,或者訊息量較少時,我們應該設定prefetchsize為較小的值。

預設情況下,**broker採用非同步發現向consumer主動推送訊息(push)。**如果將prefetchsize設定為0,對consumer來說就是pull模式。

設定prefetchsize:

在queueurl中指定:

test?customer.prefetchsize=100
也可在brokerurl中指定

jms.prefetchpolicy.queueprefetch=12&jms.prefetchpolicy.topicprefetch=12
optimizeack:關係到是否批量確認訊息,只有session的ack_mode為auto_acknowledge時才會生效。

當consumer已經消費但未ack達到prefetch * 0.65 或者達到執行時間是會傳送ack。

在brokerurl中設定:

jms.optimizeacknowledge=true&jms.optimizeacknowledgetimeout=30000
6.訊息同步與非同步接收

同步接收

當consumer端使用receive()方法同步獲取訊息時,prefetch可以為0和任意正值;

當prefetch=0時,receive()方法將會首先傳送乙個pull指令並阻塞,直到broker端返回訊息為止,這

也意味著訊息只能逐個獲取,這也是activemq中pull訊息模式;

當prefetch > 0時,broker端將會批量push給client 一定數量的訊息(<= prefetch),client端會把這些訊息放入到本地的佇列中,只要此佇列有訊息,那麼receive方法將會立即返回,當一

定量的訊息ack之後,broker端會繼續批量push訊息給client端。

非同步接收

當consumer端使用messagelistener非同步獲取訊息時,這就需要開發設定的prefetch值必須 >=1,即至少為1;在

非同步消費訊息模式中,設定prefetch=0,是相悖的,也將獲得乙個exception

ActiveMQ的工作原理

如圖所示 首先來看本地通訊的情況,應用程式a和應用程式b執行於同一系統a,它們之間可以借助訊息佇列技術進行彼此的通訊 應用程式a向佇列1傳送一條資訊,而當應用程式b需要時就可以得到該資訊。其次是遠端通訊的情況,如果資訊傳輸的目標改為在系統b上的應用程式c,這種變化不會對應用程式a產生影響,應用程式a...

ActiveMq 高階特性的使用

消費者的 destination 可以使用 wildcards 生產者的 destination 可以使用 composite destinations virtualtopic 真是一大利器,當初讀這本書就是為了弄懂這個東西,沒想到書快讀完了才找到,然後找到了才發現是這麼簡單的概念,以前對 jms...

activemq實現通訊原理 ActiveMQ

1.什麼是activemq?訊息中介軟體。可以在分布式系統的不同服務之間進行訊息的傳送和接收 2.activemq的作用以及原理?activemq的作用就是實現跨網路的習性與系統劍通訊,可以將業務解耦,提供非同步訊息支援,增加系統併發量.比如原本執行乙個操作需要1s,那麼使用者請求後必須等待1s之後...