NSQ 原始碼分析之NSQD Topic

2021-10-04 05:04:01 字數 2065 閱讀 2927

今天主要講的是nsq topic 的**實現,topic作為mq的重要概念,了解它的實現,對我們理解其他mq的topic,有很多的益處。

主要**檔案:

1.nsqd/topic.go

topic結構體

type topic struct
newtopic 函式 主要做三件事,一是例項化topic, 二是開啟messagepump goroutine 進行訊息處理,三是通知 nsqd 有新的 topic建立,讓 nsqd 上報 lookupd

func newtopic(topicname string, ctx *context, deletecallback func(*topic)) *topic 

// 建立記憶體佇列

if ctx.nsqd.getopts().memqueuesize > 0

//ephemeral 有特殊的用途,暫時還不知道幹啥?

if strings.hassuffix(topicname, "#ephemeral") else

//messagepump 主要作用是,傳送msg給所有訂閱了這個 topic 下的 channel。(channelmap)

t.waitgroup.wrap(t.messagepump)

//通知 nsqd 有新的 topi c建立。

t.ctx.nsqd.notify(t)

return t

}

messagepump 函式,主要處理 topic/channel 的變動及發布訊息給 channel

func (t *topic) messagepump() 

break

} t.rlock()

//收集訂閱的channel

for _, c := range t.channelmap

t.runlock()

if len(chans) > 0 && !t.ispaused()

// main message loop

for

//將 msg 傳送給所有訂閱的 channel

for i, channel := range chans

if chanmsg.deferred != 0

//傳送 msg 到 channel

err := channel.putmessage(chanmsg)

...} }

exit:

t.ctx.nsqd.logf(log_info, "topic(%s): closing ... messagepump", t.name)

}

putmessage/putmessages 函式都是將訊息傳送(put)到topic的佇列(記憶體/磁碟)中,流程基本相同,都是要累計訊息條數和累計訊息的位元組總數。

func (t *topic) putmessage(m *message) error 

//傳送訊息

err := t.put(m)

if err != nil

atomic.adduint64(&t.messagecount, 1) //累計訊息條數

atomic.adduint64(&t.messagebytes, uint64(len(m.body))) //累計位元組總數

return nil

}

func (t *topic) put(m *message) error 

return nil

}

其他函式:

delete/close: topic 退出結束

pause/unpause:topic 暫停/重啟

flush:將記憶體佇列的訊息,全部重新整理到磁碟進行持久化(exit 操作的時候)

總結:今天主要分析了topic的**實現,在這裡主要需要關注的是 topic 如何接收訊息(pub),又如何將訊息傳送給 channel(messagepump),最後需要關注的是什麼時候將訊息儲存在記憶體,什麼時候儲存在磁碟。

下次分享:channel的**實現

NSQ原始碼分析之概述

目前,看了nsqlookupd的 寫的真的很精美,我覺得 可以和redis相媲美,這等後續分析 時再詳說 關於nsq的特性,可以檢視nsq官網 這篇文章主要分析以下幾點 nsq提供了三大元件以及一些工具,三大元件為 nqsd nsq主要元件,用於儲存訊息以及分發訊息 nsqlookupd 用於管理n...

NSQ 原始碼分析之NSQD ProtocolV2

今天來說說nsqd.tcpserver中的核心函式ioloop的具體實現,ioloop主要的工作是接收和響應客戶的命令。同時開啟messagepump goroutine 進行心跳檢查,給訂閱者發生訊息等操作。詳細流程參考 中的邏輯流程圖。主要 檔案 1.nsqd protocol v2.go io...

NSQ 原始碼分析之NSQD Channel

今天主要講的是nsq channel 的 實現,channel 作為topic的重要組成部分,主要的作用是通過佇列的形式傳遞訊息,並等待訂閱者消費。主要 檔案 1.nsqd channel.go channel結構體 type channel structnewchannel 主要實現channel...