kafka 請求處理與RPC原理解析 四

2021-08-22 03:24:53 字數 3045 閱讀 9065

kafka server啟動後,會監聽一些埠,然後開始接收請求進行日常的工作。

與請求處理相關的元件有 socketserver、kafkaapis、kafkarequesthandlerpool。這些都是在kafka server啟動時初始化並開始執行的。socketserver是乙個nio服務,基於n+m的執行緒模型,由n個acceptor執行緒以及m個processor執行緒組成,和netty的網路模型有點像。n個acceptor執行緒專門用於監聽連線事件,連線建立後將連線交給其他m個processor執行緒繼續監聽讀事件,這樣的執行緒模型使kafka可以很輕鬆的處理高併發的場景。

kakfa server在啟動時呼叫socketserver#startup()方法,這個方法內會初始化n個acceptor開始監聽op_accept事件,等待客戶端連線。初始化的acceptor數量取決於使用者配置的listeners有幾個。在初始化每個acceptor的同時,還會初始化m個processor,並分配給acceptor用於監聽連線事件。processor的數量取決於num.network.threads配置,該配置預設值是3,表示每個acceptor分配3個processor。

acceptor接收到乙個新的連線時,會將這個請求以輪詢的方式分配給它管理的其中乙個processor處理

processor收到乙個連線時,便開始監聽它的op_read事件

如果processor發現有請求發過來,就將這個請求放入request佇列中,等待處理。該request佇列的容量由配置queued.max.requests決定,改配置預設值是500.

kakfa server在啟動時會初始化kafkarequesthandlerpool類,該類在初始化時會構造一些的kafkarequesthandler執行緒並啟動,構造的kafkarequesthandler執行緒數量取決於配置num.io.threads的值,該配置預設值是8.。

kafkarequesthandler執行緒啟動後,會不斷自旋,從request queue中獲取請求,然後交給kafkaapis進行處理。kafkaapis根據請求的型別進行不同的業務處理

kafkaapis元件處理完後,會將結果放入對應的processor的response queue中,等待processor處理

processor也是乙個不斷自旋的執行緒,在自旋的過程中,processor會檢查自己的response queue中是否有新的結果,如果有新的結果就將其從佇列中取出,準備發回給客戶端

processor通過niochannel將結果寫回客戶端,自此乙個通訊流程結束

def startup() ", acceptor, false).start()

acceptor.awaitstartup()

processorbeginindex = processorendindex}}

info("started " + acceptors.size + " acceptor threads")

}

socketserver啟動時,會初始化n個acceptor,並為其分配好對應數量的processor,然後啟動acceptor執行緒。

def run()  catch }}

}catch

}} finally

}def accept(key: selectionkey, processor: processor) catch

}

acceptor執行緒啟動後,就開始監聽埠看有沒有新的連線進來。這裡使用nio實現無阻塞的監聽請求。收到請求後就分發給它管理的其中乙個processor執行緒處理。

def accept(socketchannel: socketchannel) 

override def run() catch

} debug("closing selector - processor " + id)

swallowerror(closeall())

shutdowncomplete()

}

processor執行緒拿到acceptor傳過來的請求後開始監聽該連線的讀請求。同時還會做許多事情。比如傳送響應、讀取請求、關閉連線等等。

kakfa server在啟動時會初始化kafkarequesthandlerpool類,該類在初始化時會構造一些的kafkarequesthandler執行緒並啟動,構造的kafkarequesthandler執行緒數量取決於配置num.io.threads的值,該配置預設值是8。

下面是kafkarequesthandler執行緒的run方法

def run() 

if(req eq requestchannel.alldone)

req.requestdequeuetimems = time.milliseconds

trace("kafka request handler %d on broker %d handling request %s".format(id, brokerid, req))

//使用kafkaapis處理請求

apis.handle(req)

} catch

}}

kafkarequesthandler執行緒不斷的從請求佇列中取出請求處理。具體的請求最後交給kafkaapis處理。

def handle(request: requestchannel.request) 

} catch else

} finally

request.apilocalcompletetimems = time.milliseconds

}

kafkaapis根據請求的型別執行不同的操作來處理請求。

在0.10.2版本中,kafkaapis可以處理21種型別的請求。

RPC實戰與核心原理之非同步RPC

提公升吞吐量,其實關鍵就兩個字 非同步 提高cpu等資源的利用率 非同步,最常用的方式就是返回 future 物件的 future 方式,或者入參為 callback 物件的 方式,而 future 方式可以說是最簡單的一種非同步方式了。我們發起一次非同步請求並且從請求上下文中拿到乙個 future...

IIS 處理請求 原理

有時候我們會發現當我們訪問乙個iis 時,使用 可以正常訪問,但是使用ip卻不行,這是什麼原因呢?原來iis可以使用乙個ip位址和埠繫結多個 這些 的ip位址與埠都一樣,因此在客戶端或瀏覽器中使用ip訪問iis會失敗,因為單單根據ip位址iis無法確定你要訪問的具體是哪乙個 此時必須使用主機名也就是...

Kafka 請求是怎麼被處理的

kafka broker 端處理請求的全流程 1.順序處理請求。while true 這個方法吞吐量太差,只適用於請求傳送非常不頻繁的系統。每個請求使用單獨執行緒處理。為每個入站請求都建立乙個新的執行緒來非同步處理。while true thread.start 該方法為每個請求都建立執行緒的做法開...