RocketMQ 筆記2 檔案儲存

2021-08-19 23:57:45 字數 2747 閱讀 6233

defaultmessagestore儲存操作類

private final commitlog commitlog;

private final concurrentmap> consumequeuetable;

private final flushconsumequeueservice flushconsumequeueservice;//consumequeue檔案刷盤執行緒

private final cleancommitlogservice cleancommitlogservice;//定時清理commitlog執行緒

private final cleanconsumequeueservice cleanconsumequeueservice;//定時清理consumequeue執行緒

private final indexservice indexservice;//索引檔案

private final reputmessageservice reputmessageservice;//監控commitlog檔案有變化寫consumequeue

private final haservice haservice;

1.org.apache.rocketmq.store.defaultmessagestore.load()  載入檔案過程

這樣的話假如最後乙個檔案沒有寫滿,停機,再啟動好像不會接著寫這個檔案,會重新新建乙個後續檔案寫訊息

2.org.apache.rocketmq.store.defaultmessagestore.putmessage(messageextbrokerinner) 寫訊息過程->

如果writebuffer不為null,會先寫到writebuffer快取裡面,後續又執行緒定時commit寫到filechannel裡面,寫訊息格式

寫完後wroteposition增加寫的長度

同步刷盤:

向groupcommitservice提交乙個groupcommitrequest,同時阻塞執行緒,等待刷盤結果

非同步刷盤:

4.consumequeue檔案的寫入

org.apache.rocketmq.store.defaultmessagestore.reputmessageservice 啟動後監控commitlog檔案maxoffset有無變化,

當有變化時候

從bytebuffer中讀取訊息內容,構建dispatchrequest,包含topic,topic,commitlogoffset,msgsize,tagscode,有了這些就可以寫consumequeue檔案了

dispatchrequest dispatchrequest =

defaultmessagestore.this.commitlog.checkmessageandreturnsize(result.getbytebuffer(), false, false);

呼叫defaultmessagestore.this.dodispatch(dispatchrequest)

commitlogdispatcherbuildconsumequeue  寫consumequeue檔案

commitlogdispatcherbuildindex 寫index檔案

5.檔案結構

commitlog

每個檔案的大小預設為1g,commitlog的檔名filename,名字長度為20位,左邊補0,剩餘為起始偏移量,比如 00000000000000000000 代表了第乙個檔案,起始偏移量為 0, 檔案大小為 1g=1073741824; 當這個檔案滿了,第二個檔案名字為 00000000001073741824

consumequeue

訊息起始物理偏移量(physical offset, long 8位元組)+訊息大小(int,4位元組)+tagscode(long 8位元組) 

每個 cosumequeue 檔案的名稱 filename,名字長度為 20 位,左邊補零,剩餘為起始偏量; 比如 00000000000000000000 代表了第乙個檔案,起始偏移量為 0,檔案大小為 600w, 當第乙個檔案滿之後建立的第二個檔案的名字為00000000000006000000,起始偏移量為6000000,以此類推,第三個檔案名字為00000000000012000000,起始偏移量12000000。訊息儲存的時候會順序寫入檔案,當檔案滿了,寫入下乙個檔案。

6.broker讀訊息過程

org.apache.rocketmq.broker.processor.pullmessageprocessor.processrequest(channel, remotingcommand, boolean) 收到拉取訊息請求

org.apache.rocketmq.store.defaultmessagestore.getmessage(string, string, int, long, int, messagefilter) 使用defaultmessagestore讀訊息

consumequeue consumequeue = findconsumequeue(topic, queueid) 找到對應consumequeue

long offsetpy = bufferconsumequeue.getbytebuffer().getlong();

int sizepy = bufferconsumequeue.getbytebuffer().getint();

long tagscode = bufferconsumequeue.getbytebuffer().getlong();  得到訊息偏移位址,大小,tagscode

Linux筆記(2) 檔案許可權

1 user group others 2 chgrp 設定檔案所屬群 chgrp r groupname filename 3 chown 修改檔案擁有者 1 chown username filename 2 chown username groupname filename 同時修改擁有者和群...

linux筆記2 檔案許可權

許可權是什麼 檔名 drwxrwxrwx d代表資料夾 如果是 代表普通檔案 c代表字元裝置 l代表鏈結檔案 第乙個rwx代表擁有著的許可權 第二個rwx代表所屬組的許可權 第三個rwx代表其他使用者的許可權 怎麼改變許可權 1.去掉擁有者的許可權 chmod u r file u代表的是讀許可權 ...

1 檔案測試 2 檔案操作

1 檔案測試函式 2 檔案操作 新建檔案 fopen filename,w 以 寫 的方式開啟乙個不存在的檔案,就會新建該檔案 檔案刪除 unlink 檔案複製 copy filename,aaa bb.txt 盡量使用 和相對路徑,因為linux只認 也沒有磁碟分割槽,而windows 和 都認 ...