rocketmq原始碼分析之broker心跳檢測

2021-10-17 19:20:08 字數 3297 閱讀 6243

1.brokercontroller傳送心跳包

org.apache.rocketmq.broker.brokercontroller#start()

// 向所有的namesrv註冊broker資訊

this.registerbrokerall(true, false, true);

// 週期性註冊broker資訊,broker伺服器會每間隔30秒(不會低於10秒,高於60秒)向集群中的所有nameserver傳送心跳包

this.scheduledexecutorservice.scheduleatfixedrate(new runnable() catch (throwable e)

}}, 1000 * 10, math.max(10000, math.min(brokerconfig.getregisternameserverperiod(), 60000)), timeunit.milliseconds);

傳送心跳包邏輯:

public listregisterbrokerall(

final string clustername,

final string brokeraddr,

final string brokername,

final long brokerid,

final string haserveraddr,

final listfilterserverlist,

final boolean oneway,

final int timeoutmills,

final boolean compressed)

log.info("register broker to name server {} ok", namesrvaddr);

} catch (exception e) ", namesrvaddr, e);

} finally

}});

}try catch (interruptedexception e)

}return registerbrokerresultlist;

}2.namesrvcontroller儲存broker註冊的心跳包

org.apache.rocketmq.namesrv.processor.defaultrequestprocessor#processrequest()

// broker註冊請求

case requestcode.register_broker:

version brokerversion = mqversion.value2version(request.getversion());

if (brokerversion.ordinal() >= mqversion.version.v3_0_11.ordinal()) else

註冊broker資訊邏輯:

org.apache.rocketmq.namesrv.routeinfo.routeinfomanager#registerbroker()

/** * 註冊broker資訊

*/public registerbrokerresult registerbroker(

final string clustername,

final string brokeraddr,

final string brokername,

final long brokerid,

final string haserveraddr,

final listfilterserverlist,

final channel channel)

brokernames.add(brokername);

boolean registerfirst = false;

brokerdata brokerdata = this.brokeraddrtable.get(brokername);

if (null == brokerdata)

string oldaddr = brokerdata.getbrokeraddrs().put(brokerid, brokeraddr);

registerfirst = registerfirst || (null == oldaddr);

&& mixall.master_id == brokerid) }}

}// 註冊broker資訊

brokerliveinfo prevbrokerliveinfo = this.brokerlivetable.put(brokeraddr,

new brokerliveinfo(

system.currenttimemillis(),

channel,

haserveraddr));

if (null == prevbrokerliveinfo) haserver: {}", brokeraddr, haserveraddr);

}if (filterserverlist != null) else

}if (mixall.master_id != brokerid) }}

} finally

} catch (exception e)

return result;

}3.namesrvcontroller定時檢測broker存活

org.apache.rocketmq.namesrv.namesrvcontroller#initialize()

// 路由刪除:定時任務,每10秒會發起一次檢測broker,剔除不活躍的broker

this.scheduledexecutorservice.scheduleatfixedrate(new runnable()

}, 5, 10, timeunit.seconds);

org.apache.rocketmq.namesrv.routeinfo.routeinfomanager#scannotactivebroker()

/** * 路由刪除:每10秒會發起一次檢測broker,剔除不活躍的broker

*/public void scannotactivebroker() {}ms", next.getkey(), broker_channel_expired_time);

// 從brokerlivetable,brokeraddrtable,topicqueuetable移除broker相關資訊

// todo hongyihui

this.onchanneldestroy(next.getkey(), next.getvalue().getchannel());}}

}

RocketMQ原始碼分析 訊息儲存

訊息儲存的地方,資料夾下有多個檔案,每個檔案的大小預設為1g 訊息的組成 欄位名 長度 備註totalsize 4 訊息的長度 magiccode 4 bodycrc 4 body的校驗碼 queueid 4 佇列id flag 4 queueoffset 8 儲存著佇列下訊息的數量,該值儲存在co...

RocketMQ 位移提交原始碼分析

rocketmq 訊息消費進度是如何提交的,併發消費的時候,一次從 乙個佇列拉 32 條訊息,這 32 條訊息會提交到執行緒池中處理,如果偏移量 m5 比 m4 先執行完成,訊息消費後,提交的消費進度是哪個?是提交訊息 m5 的偏移量?下面跟著我的節奏,擼一波原始碼。rocketmq 每次拉取完訊息...

除錯RocketMQ原始碼

拷貝namesrv broker的配置檔案到指定目錄,為了避免直接修改 中的配置檔案。1.1 在f盤建立rocketmq資料夾,建立三個子資料夾conf logs store,我的 中多了dev data的資料夾 1.2 將distribution原始碼conf目錄下的broker.conf log...