訊息的寫入和讀取流程

2021-08-06 04:35:49 字數 2617 閱讀 7505

接之前幾篇訊息中介軟體元件的模組劃分,本篇內容講述訊息的寫入和讀取流程。

佇列模型

在描述訊息的寫入和讀取流程之前,首先要弄清楚訊息佇列的模型是怎麼樣的,包括訊息是怎麼儲存的。

簡化的佇列模型大致如上圖所示。

儲存模型

為了便於理解,上面的佇列模型中僅僅將訊息的乙個寫入佇列抽象成乙個topic partition,但在實踐中這是不夠的。

訊息有使用者產生並寫入訊息佇列,每一條訊息都是不一樣的,在實踐中這樣「一層」的結構是無法滿足要求的。

如上圖,直接採用「一層」的結構儲存訊息。那麼:

「一層」的儲存模型在實踐中是無法使用的。實踐中對訊息儲存的模型往往是分為索引+儲存的兩層結構,rocketmq也是這種實現。

訊息的儲存模型分為兩層,其中:

storage queue為儲存佇列,儲存實際的訊息(完成的訊息,包含各種屬性和內容)

index queue是訊息的索引佇列,元素長度是固定的,比如元素內容為訊息位置和訊息大小(這樣12個位元組可以完整的定位出一條訊息)

這樣做的優勢:

有了訊息佇列模型的認識之後,來梳理訊息寫入流程會清晰的多。

幾點共識:

topic是有多分割槽的,一條訊息只會落到乙個分割槽中,所以這裡包含了乙個路由策略;

訊息儲存包含了索引佇列和儲存佇列,所以寫入一條訊息時除了儲存訊息本身,還需要構建訊息索引

訊息是寫到broker的磁碟上的,會涉及到刷盤操作

訊息的寫入流程大致如下:

(流程中忽略了非核心的步驟和錯誤的處理,比如訊息合法性的驗證、元資料獲取失敗的處理等)

其中1-4步為producer上的操作;5-8步為服務端流程。

元資料通過nameserver獲取。元資料中需要包含topic分割槽的分布情況,即topic有多少個分割槽,每個分割槽落在哪台伺服器上

路由部分實現了訊息和分割槽的對應關係。因為訊息佇列只會保證分區內資料的順序性,所以當一些訊息需要保證順序時,我們需要將這些訊息寫入到同乙個分割槽,路由策略需要保證這一點

序列化和網路包的處理包含了訊息儲存協議的內容和網路相關協議的內容,這塊看kafka和rcoektmq都是自定義協議,之後會專門抽篇幅講怎麼設計這塊的協議

對producer而言,寫入訊息就是向broker傳送乙個請求,對producer而言,這裡需要支援非同步寫入和同步寫入兩種操作

接收寫入請求和反序列就是按照上面的自定義協議獲取到訊息內容進行驗證、寫入等後續處理(這裡會有很多優化,比如減少記憶體拷貝、減少記憶體開銷等)

訊息是寫磁碟的,所以這裡會寫pagecache之後刷盤(這部分之後也會單獨展開講)

訊息是需要持久化之後才能響應客戶端寫入完成的,所以這裡需要等待刷盤;等待刷盤和索引佇列是可以同時進行的,從producer的角度來說它並不關心索引是否構建,只需要資料寫入儲存成功即可

consumer相對producer來說會多乙個協同工作的部分,所以會有乙個分割槽分配的過程(類似producer的寫入訊息的分割槽路由)。另外,consumer會涉及到消費語義(most-once、least-once、exactly-once),還有訊息的獲取模式(pull、push、long-polling、pull-push),不過這這部分的消費流程中不會展開討論這些內容,而是簡要的描述流程,現有大致的認識。

(同樣這裡忽略了一些非核心的流程)

consumer端流程包括1-5及9、10,6-8位broker端流程。

具體每一步操作的內容如下:

元資料的獲取和producer類似

consumer因為需要和其他相同group的consumer協同工作,所以需要知道有多少個同組的consumer存在

consumer需要「固定」分割槽消費,這裡有乙個分配策略需要實現,即根據存在的consumer例項和topic的元資料,計算出每個consumer需要消費的分割槽,consumer和分割槽的對應關係在正常情況下是不應該發生變動的

consumer在每次獲取訊息的時候都需要告知broker從哪個位點開始獲取,所以在初始化時需要獲取到讀取的位置(之後直接從記憶體獲取每次要讀取的位置即可)

這裡也是乙個互動協議的部分,可以採用自定義協議,也可以採用json之類的協議(可以和元資料操作之類的保持一致)

consumer提交的消費進度是indexqueue的序列號,indexqueue元素是定長的,所以可以直接計算出讀取的偏移量,然後讀取indexqueue的元素

indexqueue的元素包含了訊息儲存的資訊,通過這些資訊可以讀取到完整的一條訊息(這裡會一次讀取一批訊息給客戶端,所以會按照indexqueue的元素順序讀取storagequeue的內容,然後返回;為了權衡延遲,在讀取不到下一條訊息的時候也會返回,這裡會有很多策略)

按照協議將讀取的storagequeue的內容返回給consumer(這裡會涉及到zero copy的內容來優化效能,之後再講)

consumer需要知道儲存協議,然後按照協議解析出訊息內容

消費和獲取訊息是非同步的過程,獲取訊息的執行緒在獲取訊息提交到consumer的buffer後就可以開始讀取下一批訊息,而消費執行緒非同步來從buffer獲取訊息進行消費(這裡消費後需要提交消費進度到broker,也可以在獲取訊息的請求中將消費進度帶上去)

以上是訊息寫入和訊息讀取的簡要流程,在寫入流程中會涉及到一些記憶體池、mmap的技術,讀取時會有zerocopy等,這些都會在之後進行分析。

HDFS讀取和寫入流程

1 讀檔案的過程 首先 client 通過 file system 的 open 函式開啟檔案,distributed file system 用 rpc呼叫 namenode 節點,得到檔案的資料塊資訊。對於每乙個資料塊,namenode 節點返回儲存資料塊的資料節點的位址。distributed...

HDFS寫入與讀取流程

那麼問題來了,如果他們之間的乙個datanode突然壞掉了怎麼辦。1 如果傳輸過程中,有某個datanode出現了故障,那麼當前的pipeline會被關閉,出現故障的datanode會從當前的pipeline中移除,剩餘的block會繼續剩下的datanode中繼續以pipeline的形式傳輸,同時...

HDFS的讀取流程 寫入流程 刪除流程

讀流程 1 客戶端通過rpc訪問namenode。呼叫filesystem的open方法,獲取distributedfilesystem例項 2 namenode查詢元資料,獲取元資料路徑,將檔案的全部或部分檔案塊的儲存路徑,放入佇列,傳送給客戶端。3 客戶端收到佇列 fsdatainputstre...