RocketMQ 位移提交原始碼分析

2021-09-29 08:14:52 字數 4736 閱讀 8129

rocketmq 訊息消費進度是如何提交的,併發消費的時候,一次從 乙個佇列拉 32 條訊息,這 32 條訊息會提交到執行緒池中處理,如果偏移量  m5 比 m4 先執行完成,訊息消費後,提交的消費進度是哪個?是提交訊息 m5 的偏移量?

下面跟著我的節奏,擼一波原始碼。

rocketmq 每次拉取完訊息都會將訊息儲存到 pullrequest 物件中的 processqueue 中:

org.apache.rocketmq.client.consumer.pullcallback#onsuccess

boolean dispathtoconsume = processqueue.putmessage(pullresult.getmsgfoundlist());
接著將訊息放進消費執行緒中去執行:

org.apache.rocketmq.client.consumer.pullcallback#onsuccess

defaultmqpushconsumerimpl.this.consumemessageservice.submitconsumerequest(//

pullresult.getmsgfoundlist(), //

processqueue, //

pullrequest.getmessagequeue(), //

dispathtoconsume);

consumemessageservice 類實現訊息消費的邏輯,它有兩個實現類:

// 併發訊息消費邏輯實現類

org.apache.rocketmq.client.impl.consumer.consumemessageconcurrentlyservice;

// 順序訊息消費邏輯實現類

org.apache.rocketmq.client.impl.consumer.consumemessageorderlyservice;

這裡我們只分析併發消費:

org.apache.rocketmq.client.impl.consumer.consumemessageconcurrentlyservice#submitconsumerequest

consumerequest consumerequest = new consumerequest(msgthis, processqueue, messagequeue);

try catch (rejectedexecutionexception e)

將訊息消費任務封裝成 consumerequest 物件,然後將其交給消費執行緒池中去執行。

org.apache.rocketmq.client.impl.consumer.consumemessageconcurrentlyservice.consumerequest#run:

if (!processqueue.isdropped())  else , msgs={}", messagequeue, msgs);

}

consumerequest 是乙個實現了 runnable 的類,因此訊息消費的核心邏輯都寫在了 run 方法中,如上**是提交已消費位移的邏輯,當 processqueue 沒有被丟棄,則進行已消費位移的提交。

org.apache.rocketmq.client.impl.consumer.consumemessageconcurrentlyservice#processconsumeresult

// 移除已消費的訊息,並返回已消費的

long offset = consumerequest.getprocessqueue().removemessage(consumerequest.getmsgs());

if (offset >= 0 && !consumerequest.getprocessqueue().isdropped())

移除已消費的位移,並返回最小位移量,如果最小位移量大於 0,並且 processqueue 沒有被丟棄,則更新本地快取,

org.apache.rocketmq.client.impl.consumer.processqueue#removemessage

public long removemessage(final listmsgs) 

}// 訊息總量累加

msgcount.addandget(removedcnt);

// 返回訊息容器中最小元素 key

if (!msgtreemap.isempty())

}} finally

} catch (throwable t)

return result;

}

private final treemapmsgtreemap = new treemap<>();
它是乙個 treemap 結構,key 為訊息位移,value 為訊息資料,訊息容器中,訊息可以按照位移進行排序,那也就意味著,當訊息消費完,只需要在訊息容器中移除即可,然後返回訊息容器中最小元素(最小位移),如下:

由於訊息是按照位移進行排序,因此我們只需移除已消費的訊息,並且確保不會將未消費的位移提交,就可避免了位移大的訊息先消費導致訊息丟失的問題了。

org.apache.rocketmq.client.consumer.store.remotebrokeroffsetstore#updateoffset:

public void updateoffset(messagequeue mq, long offset, boolean increaseonly) 

if (null != offsetold) else }}

}

offsettable 為本地位移快取容器,它的結構如下:

private concurrentmapoffsettable = new concurrenthashmap<>();
它是乙個 concurrentmap,乙個執行緒安全容器,key 為 messagequeue,value 為當前 messagequeue 的消費位移,從原始碼看出,當前消費位移的更新,只能是遞增更新。

在更新完本地快取之後,rocketmq 是如何將其提交到 broker 的呢?

org.apache.rocketmq.client.impl.factory.mqclientinstance#startscheduledtask:

this.scheduledexecutorservice.scheduleatfixedrate(new runnable()  catch (exception e) 

}}, 1000 * 10, this.clientconfig.getpersistconsumeroffsetinterval(), timeunit.milliseconds);

以上,消費者在啟動的時候,開啟了乙個定時任務,定時將本地快取提交到broker。

org.apache.rocketmq.client.consumer.store.remotebrokeroffsetstore#persistall:

// 引數mqs是當前分配的佇列

public void persistall(setmqs) catch (exception e)

} else }}

} // 將未分配的佇列從位移快取中移除

if (!unusedmq.isempty()) , {}", mq, this.groupname);}}}

最終會呼叫以上方法,rocketmq 會從重平衡那裡獲取當前消費者已分配的佇列,如果位移快取容器包含在當前分配佇列,則進行消費位移提交,否則將從位移快取容器中移除。

broker 端處理:

org.apache.rocketmq.broker.offset.consumeroffsetmanager#commitoffset

private void commitoffset(final string clienthost, final string key, final int queueid, final long offset)  else , key={}, queueid={}, requestoffset={}, storeoffset={}", clienthost, key, queueid, offset, storeoffset);}}}

以上,offsettable 為 broker 端的消費位移快取容器,它的結構如下:

private concurrentmap> offsettable =

new concurrenthashmap<>(512);

它同樣是乙個 concurrentmap,乙個執行緒安全容器,key 為的形式為 「topic@group」,value 也是乙個 concurrentmap 它的 key 為 queueid,value 為位移,它會以 json 的形式持久化到磁碟$/store/config/consumeroffset.json檔案中,具體格式如下:

}

}

近期熱文

除錯RocketMQ原始碼

拷貝namesrv broker的配置檔案到指定目錄,為了避免直接修改 中的配置檔案。1.1 在f盤建立rocketmq資料夾,建立三個子資料夾conf logs store,我的 中多了dev data的資料夾 1.2 將distribution原始碼conf目錄下的broker.conf log...

rocketmq原始碼打包步驟

1,從git上面轉殖好原始碼之後,進入rocketmq目錄,執行 mvn prelease all dskiptests clean install 2,打包完成之後,進入distribution target目錄,可以看到生成的包 第乙個是未壓縮的包,第二個是linux上面的壓縮包 第三個是win...

RocketMQ原始碼分析 訊息儲存

訊息儲存的地方,資料夾下有多個檔案,每個檔案的大小預設為1g 訊息的組成 欄位名 長度 備註totalsize 4 訊息的長度 magiccode 4 bodycrc 4 body的校驗碼 queueid 4 佇列id flag 4 queueoffset 8 儲存著佇列下訊息的數量,該值儲存在co...