RocketMq刷盤機制

2022-03-28 13:56:30 字數 2918 閱讀 4485

// synchronization flush 同步刷盤

if (flushdisktype.sync_flush == this.defaultmessagestore.getmessagestoreconfig().getflushdisktype())

} else

}// asynchronous flush ②

else else

}}①同步刷盤使用groupcommitservice

②非同步刷盤 且開啟了transientstorepoolenable且不是從伺服器,使用commitlogservice 否則使用flushcommitlogservice刷盤

public synchronized void putrequest(final groupcommitrequest request) 

if (hasnotified.compareandset(false, true))

}

public void run()  catch (exception e) 

}// under normal circumstances shutdown, wait for the arrival of the

// request, and then flush

try catch (interruptedexception e)

synchronized (this)

this.docommit();

commitlog.log.info(this.getservicename() + " service end");

}

private void docommit() 

}//喚醒等待刷盤完成的阻塞執行緒

req.wakeupcustomer(flushok);

}if (storetimestamp > 0)

this.requestsread.clear();

} else

}}

public boolean flush(final int flushleastpages) 

}return result;

}

①根據刷盤指標找到對應的檔案

public int flush(final int flushleastpages)  else 

} catch (throwable e)

//設定刷盤指標

this.flushedposition.set(value);

this.release();

} else

}//返回刷盤指標位置

return this.getflushedposition();

}

未開啟transientstorepoolenable

public void run() 

try else

if (printflushprogress)

long begin = system.currenttimemillis();

//至少滿4頁才刷盤 但是每10秒將會強制刷盤一次,flushphysicqueueleastpages會被設定為0

if (storetimestamp > 0)

long past = system.currenttimemillis() - begin;

if (past > 500) ms", past);

}} catch (throwable e)

}

非同步將writebuffer的資料刷到filechannel

public void run() 

try

if (end - begin > 500) ms", end - begin);

}this.waitforrunning(interval);

} catch (throwable e)

}boolean result = false;

for (int i = 0; i < retry_times_over && !result; i++)

commitlog.log.info(this.getservicename() + " service end");

}

public boolean commit(final int commitleastpages) 

return result;

}

public int commit(final int commitleastpages) 

//是否可以進行commit,至少堆積commitleastpages頁資料 為0的話表示強制commit

if (this.isabletocommit(commitleastpages)) else

}// all dirty data has been committed to filechannel. 檔案已經寫滿且已經全commit,可以把writebuffer歸還給池子裡了

if (writebuffer != null && this.transientstorepool != null && this.filesize == this.committedposition.get())

//返回commit指標

return this.committedposition.get();

}

protected void commit0(final int commitleastpages)  catch (throwable e) 

}}

RocketMQ訊息刷盤

刷盤策略 commitlog在初始化的時候,會根據配置,啟動兩種不同的刷盤服務。1.broker 同步刷盤 if flushdisktype.sync flush this defaultmessagestore.getmessagestoreconfig getflushdisktype catc...

RocketMQ 主從同步機制

主從同步 ha 高可用 主從同步原理 為了保證系統的高可用,訊息到達主伺服器後,需要將訊息同步到從伺服器。如果主伺服器宕機,消費者可用從從伺服器拉取訊息。大體步驟 1 主伺服器啟動,監聽從伺服器的鏈結。2 從伺服器主動鏈結主伺服器,建立tcp相關鏈結。3 從伺服器主動向主伺服器傳送待拉取訊息偏移量,...

RocketMQ 的訊息傳遞機制及AOP

1,mq中訊息投遞分為兩種,一種是生產者往mq broker種投遞,另一種是broker往消費者投遞 乙個訊息主題對應了多個訊息佇列,所以會產生兩個問題,生成者應該把訊息放入到哪個佇列種,消費者應該從哪個訊息佇列中拉取訊息。因為訊息在系統之間傳遞的時候,跨越網路,訊息的傳播無法保證其有序 生產者投遞...