Kafka Broker原始碼解析二 API層設計

2022-09-13 03:30:10 字數 1949 閱讀 8993

2.1 核心邏輯

相對於網路層,api層的實現相當簡單,只是對底層實現的封裝,得益於此,從api層幾乎就能了解到kafka broker所能提供的全部功能

啟動num.io.threads個requesthandler

每個requesthandler都從全域性requestqueue中poll request

根據request的apikeys呼叫對應的handle*api進行實際的業務邏輯處理

2.2 核心類、方法介紹

kafkarequesthandler

|-- run()

kafkaapis

|-- handle() // 所有請求的入口,根據apikeys分發請求

3.1 啟動流程
啟動流程前半部分和網路層一致,這裡不再贅述

// kafkaserver.scala

def startup()

// kafkarequesthandler.scala

class kafkarequesthandlerpool(val brokerid: int,

val requestchannel: requestchannel,

val apis: kafkaapis,

time: time,

numthreads: int) extends logging with kafkametricsgroup

def createhandler(id: int): unit = synchronized

}

實際就是例項化api層的入口,再啟動num.io.threads個requesthandler執行緒,用於實際處理請求,值得注意的是這裡的執行緒池,其實不是真正意義上的執行緒池,執行緒數目是固定的,只有通過動態引數改變執行緒池大小時,才會重新調整執行緒數目

def resizethreadpool(newsize: int): unit = synchronized 

} else if (newsize < currentsize)

}threadpoolsize.set(newsize)

}

3.2 請求分發流程
requesthandler執行緒啟動後,每個執行緒都一直從網路層獲取request再交給kafkaapis進行請求分發

// kafkarequesthandler.scala

def run()

}shutdowncomplete.countdown()

}// requestchannel.scala

private val requestqueue = new arrayblockingqueue[baserequest](queuesize)

def receiverequest(timeout: long): requestchannel.baserequest =

requestqueue.poll(timeout, timeunit.milliseconds)

// kafkaapis.scala

def handle(request: requestchannel.request)

}

requesthandler從全域性阻塞佇列獲取網路層組裝完的request,並呼叫kafkaapi的handle方法

handle方法類似web開發中的dispatch,根據apikeys呼叫對應的handle*執行實際業務邏輯

可以看到api層只是對執行執行緒、根據apikeys進行請求路由的封裝。實際邏輯由kafkaapis類中的handle*方法實現,而其中最核心的方法為produce與fetch請求,分別對應訊息的生產與消費,下面的文章將專門針對這兩種api進行原始碼分析

azkaban web server原始碼解析

azkaban主要用於hadoop相關job任務的排程,但也可以應用任何需要排程管理的任務,可以完全代替crontab。azkaban主要分為web server 任務上傳,管理,排程 executor server 接受web server的排程指令,進行任務執行 1.資料表 projects 工...

JDK LinkedHashMap原始碼解析

今天來分析一下jdk linkedhashmap的源 public class linkedhashmapextends hashmapimplements map可以看到,linkedhashmap繼承自hashmap,並且也實現了map介面,所以linkedhashmap沿用了hashmap的大...

Redux原始碼createStore解讀常用方法

const store createstore reducer,preloadedstate enhancer 直接返回當前currentstate,獲取state值,return state 我覺得應該深轉殖乙個新的物件返回,不然有可能會被外部修改 function getstate consol...