kafka原始碼分析 scheduler分析

2021-09-19 13:51:20 字數 926 閱讀 7173

kafka scheduler用於執行作業的排程程式。控制乙個在後台定期重複執行或者延遲排程的作業!主要有一下操作:

初始化任務以便可以接受任務排程 def startup()

當任務排程完成關閉。def shutdown()   延遲佇列實現

排程任務:def schedule(name: string, fun: ()=>unit, delay: long = 0, period: long = -1, unit: timeunit = timeunit.milliseconds)

詳細**如下:
class kafkascheduler(val threads: int, 

val threadnameprefix: string = "kafka-scheduler-",

daemon: boolean = true) extends scheduler with logging )

}}

override def shutdown()

cachedexecutor.awaittermination(1, timeunit.days)

}} def scheduleonce(name: string, fun: () => unit): unit =

def schedule(name: string, fun: () => unit, delay: long, period: long, unit: timeunit) catch finally

}if(period >= 0)

executor.scheduleatfixedrate(runnable, delay, period, unit)

else

executor.schedule(runnable, delay, unit)

}}

kafka原始碼分析 kafkaconsumer

kafkaconsumer 對於多執行緒訪問是不安全的,通過使用acquire 跟release 方法來操作atomiclong currentthread字段 儲存當前訪問執行緒id 有多個執行緒同時訪問丟擲concurrentmodificationexception,來防止對個執行緒同時訪問。...

Kafka原始碼分析(一)

apache kafka 是 乙個分布式流處理平台.這到底意味著什麼呢?我們知道流處理平台有以下三種特性 它可以用於兩大類別的應用 為了理解kafka是如何做到以上所說的功能,從下面開始,我們將深入探索kafka的特性。首先是一些概念 kafka有四個核心的api 讓我們首先深入了解下kafka的核...

Kafka原始碼分析之KafkaProducer

kafkaproducer是乙個kafka客戶端實現,可以發布記錄records至kafka集群。kafkaproducer是執行緒安全的,多執行緒之間共享單獨乙個producer例項通常會比多個producer例項要快。kafkaproducer包含一組快取池空間,儲存尚未傳輸到集群的記錄reco...