RocketMQ訊息刷盤

2021-10-09 22:19:33 字數 4550 閱讀 1164

刷盤策略

commitlog在初始化的時候,會根據配置,啟動兩種不同的刷盤服務。

1. broker 同步刷盤

if

(flushdisktype.sync_flush ==

this

.defaultmessagestore.

getmessagestoreconfig()

.getflushdisktype()

)catch

(interruptedexception

| executionexception | timeoutexception e)

if(flushstatus != putmessagestatus.put_ok)

}else

}//先放到內部的乙個請求佇列中,並利用waitpoint通知新請求到來

//客戶端提交同步刷盤任務到 groupcommi tservic 執行緒,如果廢執行緒處於等待狀態則將其喚醒

public

synchronized

void

putrequest

(final groupcommitrequest request)

if(hasnotified.

compareandset

(false

,true))

}//由於避免同步刷盤消費任務與其他訊息生產者提交任務直接的鎖競爭, groupcommitservice 提供讀容器與寫容器,這兩個容器每執行完一次任務後,互動,繼續消費任務

private

void

swaprequests()

構建groupcommitrequest 同步任務,並提交到groupcommitservice.

同步等待刷盤結果,刷盤失敗也會標誌訊息儲存失敗,返回 flush_disk_timeo. 進行同步刷盤的服務為 groupcommitservice,當groupcommitrequest請求被提交給groupcommitservice後,groupcommitservice並不是立即處理,而是先放到內部的乙個請求佇列中,並利用waitpoint通知新請求到來

等待同步刷盤任務完成,如果超時則返回刷盤錯誤, 刷盤成功後正常返 回給呼叫方groupcommitrequest

public

boolean

waitforflush

(long timeout)

catch

(interruoptedexception e)

消費傳送執行緒將訊息追加到記憶體對映檔案後,將同步任務 groupcommitrequest 提交到groupcommitservice 執行緒,然後呼叫阻塞等待刷盤結果,超時時間預設 5s

public

void

wakeupcustomer

(final

boolean flushok)

4.groupcommitservice 執行緒處理 groupcommitrequest 物件後將呼叫 wakeupcustomer法將消費傳送執行緒喚醒,並將刷盤告知 groupcommitrequest

public

void

run(

)catch

(exception e)

}}

groupcommitservice 每處理一 批同步刷盤請求( requestsread 容器中請求)後「休息」 1oms 然後繼續處理 一批,其任務的核心實現為 do commit 方法

for

(groupcommitrequest req :

this

.requestsread)

} req.

wakeupcustomer

(flushok)

;}

3))處理完所有同步刷盤任務後,更新刷盤檢測點 storecheckpoint 中的 ph ysicmsgtimestamp ,但並沒有執行檢測點的刷盤操作,刷盤檢測點的刷盤操作將在 寫訊息佇列檔案時觸發

同步刷盤的任務雖然也是在非同步執行緒中執行,但是訊息儲存的主流程中會同步等待刷盤結果,所以本質上還是同步操作。

2.broker 非同步刷盤

非同步刷盤根據是否開啟 transientstorepoolenable 機制 ,刷盤實現會有細微差別. 如果transientstorepoolenable為true, rocketmq 會單獨申請乙個與目標物理文 commitlog) 同樣大小的堆外記憶體, 該堆外記憶體將使用 記憶體鎖定,確保不會被置換到虛擬記憶體中去,訊息首先追加到堆外記憶體,然後提交到與物理檔案的記憶體對映記憶體中,再 flush 磁碟,如果transientstorepoolenable為 flalse ,訊息直接追加到與物理檔案直接對映的記憶體中,然後刷寫到磁碟

// asynchronous flush

else

else

} flushcommitlogservice.

wakeup()

;

flushrealtimeservice 在啟動後,會在死迴圈中週期性的進行刷盤操作

while(!

this

.isstopped()

)try

else

if(printflushprogress)

long begin = system.

currenttimemillis()

;// 休眠結束,開始執行刷盤操作

commitlog.

this

flush

(flushphysicqueueleastpages)

;long storetimestamp = commitlog.

this

getstoretimestamp()

;if(storetimestamp >0)

long past = system.

currenttimemillis()

- begin;

if(past >

500)

ms", past);}

}catch

(throwable e)

}

獲取四個配置引數

如果距上次提交間隔超過 flushphysicqueuethoroughinterval ,則本次刷盤任務將忽略flushphysicqueueleastpages 也就是如果待刷 寫資料小於指定頁數也執行刷寫磁碟操作

:執行一次刷盤任務前先等待指定時間間隔, 然後再執行刷盤任務

呼叫 flush 方法將記憶體中資料刷寫到 盤,並且更新儲存檢測點檔案的comm1tlog 檔案的更新時間戳,檔案檢測點檔案( checkpoint 檔案)的刷盤動作在刷盤訊息消費佇列中執行, 其入口為 defaultmessagestore#flushconsumequeueservice

通過上面這段邏輯可知,非同步刷盤就在非同步執行緒中,週期性的將記憶體緩衝區的內容刷到檔案中,在訊息主流程中,只會喚醒非同步刷盤執行緒,而不會同步等待刷盤結果,所以稱為非同步刷盤。

無論是上面哪種刷盤策略,最終都呼叫了下面這個方法進行刷盤:

commitlog.

this

flush

(flushphysicqueueleastpages)

;public

boolean

flush

(final

int flushleastpages)

}return result;

}

public

intflush

(final

int flushleastpages)

else

}catch

(throwable e)

this

.flushedposition.

set(value)

;this

.release()

;}else

}return

this

.getflushedposition()

;}

判斷是否滿足刷盤條件,isabletoflush()其實就是判斷當前剩餘未刷盤內容長度,是否超過最小刷盤長度:flushleastpages,避免不必要的刷盤操作。

如果滿足刷盤條件,則將記憶體中的內容刷到檔案中

同步刷盤的任務雖然也是在非同步執行緒中執行,但是訊息儲存的主流程中會同步等待刷盤結果,所以本質上還是同步操作。

非同步刷盤就在非同步執行緒中,週期性的將記憶體緩衝區的內容刷到檔案中,在訊息主流程中,只會喚醒非同步刷盤執行緒,而不會同步等待刷盤結果,所以稱為非同步刷盤

RocketMq刷盤機制

synchronization flush 同步刷盤 if flushdisktype.sync flush this.defaultmessagestore.getmessagestoreconfig getflushdisktype else asynchronous flush else el...

RocketMQ訊息型別

普通資訊也叫做無序訊息,簡單來說就是沒有順序的訊息,producer 只管傳送訊息,consumer 只管接收訊息,至於訊息和訊息之間的順序並沒 可能先傳送的訊息先消費,也可能先傳送的訊息後消費。舉個簡單例子,producer 依次傳送 order id 為 1 2 3 的訊息到 broker,co...

RocketMQ 事務訊息

一 事務訊息實現方式 應用使用事務訊息的步驟 1 應用傳送訊息,使用prepare欄位標示準備訊息 2 應用執行本地業務邏輯 3 應用傳送事務提交或回滾訊息 broker收到prepare訊息後會將topic替換為rmq sys trans half topic,queueid替換為0,然後寫入co...