NSQ 原始碼分析之NSQD Channel

2021-10-04 12:25:14 字數 2781 閱讀 5956

今天主要講的是nsq channel 的**實現,channel 作為topic的重要組成部分,主要的作用是通過佇列的形式傳遞訊息,並等待訂閱者消費。

主要**檔案:

1.nsqd/channel.go

channel結構體

type channel struct
newchannel 主要實現channel的例項化 和 通知 nsqd  有新的 topic建立,讓 nsqd 上報 lookupd。

func newchannel(topicname string, channelname string, ctx *context,

deletecallback func(*channel)) *channel

// 建立記憶體佇列

if ctx.nsqd.getopts().memqueuesize > 0

//?????

if len(ctx.nsqd.getopts().e2eprocessinglatencypercentiles) > 0

//初始化優先順序佇列(延時佇列,消費確認佇列)

c.initpq()

if strings.hassuffix(channelname, "#ephemeral") else

//通知nsqd

c.ctx.nsqd.notify(c)

return c

}

putmessage/put 函式用於發布訊息

// putmessage writes a message to the queue

func (c *channel) putmessage(m *message) error

err := c.put(m) //發布訊息

if err != nil

//增加訊息累計

atomic.adduint64(&c.messagecount, 1)

return nil

}func (c *channel) put(m *message) error

} return nil

}

putmessagedeferred/startdeferredtimeout/putdeferredmessage/addtodeferredpq 四個函式實現訊息的延時, 這個佇列在nsqd中,會有專門的goroutine 維護,間隔時間掃瞄,如果最小堆的根元素小於當前時間,重新加入消費佇列。

func (c *channel) putmessagedeferred(msg *message, timeout time.duration) 

func (c *channel) startdeferredtimeout(msg *message, timeout time.duration) error

err := c.pushdeferredmessage(item) //記錄延時訊息到map

if err != nil

c.addtodeferredpq(item) //加入延時優先順序佇列(根據time 的早晚,實現的最小堆(完全二叉樹))

return nil

}func (c *channel) pushdeferredmessage(item *pqueue.item) error

c.deferredmessages[id] = item //記錄訊息

c.deferredmutex.unlock()

return nil

}func (c *channel) addtodeferredpq(item *pqueue.item)

processdeferredqueue 函式的作用是,處理延時佇列中哪些訊息可以加入消費佇列中進行消費(nsqd維護)

func (c *channel) processdeferredqueue(t int64) bool 

c.addtoinflightpq(msg) //加入優先順序佇列(最小堆)

return nil

}func (c *channel) pushinflightmessage(msg *message) error

func (c *channel) addtoinflightpq(msg *message)

processinflightqueue 函式的作用是,處理消費確認佇列中哪些訊息已超過消費時間需要重新加入消費佇列中進行消費(nsqd維護)

func (c *channel) processinflightqueue(t int64) bool
其他函式說明:

touchmessage:主要用於更新消費超時時間,延遲重新進入佇列的時間

requeuemessage:把在等待消費確認的訊息,重新加入佇列 或者 加入延時佇列,而不是等待時間到來 

popinflightmessage:彈出等待消費確認的訊息

removefrominflightpq:移除等待消費確認的訊息

popdeferredmessage:彈出延時佇列中的訊息

總結:channel主要實現三個佇列,乙個消費佇列(磁碟和記憶體佇列),另乙個是等待消費確認的佇列(inflight),以及延時訊息佇列(deffer)。 其中後面兩個佇列通過nsqd 呼叫 processinflightqueue 和 processdeferredqueue  維護,且它們實現優先順序佇列的方式都是通過完全二叉樹實現最小堆。

下次分享:nsqd對 等待消費確認佇列 和 延時佇列 的維護實現 queuescanloop

NSQ原始碼分析之概述

目前,看了nsqlookupd的 寫的真的很精美,我覺得 可以和redis相媲美,這等後續分析 時再詳說 關於nsq的特性,可以檢視nsq官網 這篇文章主要分析以下幾點 nsq提供了三大元件以及一些工具,三大元件為 nqsd nsq主要元件,用於儲存訊息以及分發訊息 nsqlookupd 用於管理n...

NSQ 原始碼分析之NSQD ProtocolV2

今天來說說nsqd.tcpserver中的核心函式ioloop的具體實現,ioloop主要的工作是接收和響應客戶的命令。同時開啟messagepump goroutine 進行心跳檢查,給訂閱者發生訊息等操作。詳細流程參考 中的邏輯流程圖。主要 檔案 1.nsqd protocol v2.go io...

NSQ 原始碼分析之NSQD Topic

今天主要講的是nsq topic 的 實現,topic作為mq的重要概念,了解它的實現,對我們理解其他mq的topic,有很多的益處。主要 檔案 1.nsqd topic.go topic結構體 type topic structnewtopic 函式 主要做三件事,一是例項化topic,二是開啟m...