kafka系列 多執行緒消費者實現

2022-02-27 02:47:09 字數 3016 閱讀 1868

看了一下kafka,然後寫了消費kafka資料的**。感覺自己功力還是不夠。

不能隨心所欲地運算元據,資料結構沒學好,spark的rdd操作沒學好。

不能很好地組織**結構,設計模式沒學好,物件導向思想理解不夠成熟。

用佇列來儲存要消費的資料。

用佇列來儲存要提交的offest,然後處理執行緒將其給回消費者提交。

每個分割槽開乙個處理執行緒來處理資料,分割槽與處理器的對映放在map中。

當處理到一定的數量或者距離上一次處理一定的時間間隔後, 由poll執行緒進行提交offset。

不好的地方:

每次處理的資料太少,而且每個資料都進行判斷其分割槽是否已經有處理執行緒在處理了。

獲取topic不太優雅。

流程圖

下面是多執行緒消費者實現:

/**

* 負責啟動消費者執行緒msgreceiver, 儲存消費者執行緒msgreceiver, 儲存處理任務和執行緒recordprocessor, 以及銷毀這些執行緒

* created by stillcoolme on 2018/10/12.

*/public class kafkamultiprocessormain

public static void main(string args)

private void init(string consumerproppath)

for (int i = 0; i < threadsnum; i++)

logger.info("finish creating" + threadsnum + " threads to consume kafka warn msg");

}//銷毀啟動的執行緒

public void destroy()

private void closerecordprocessthreads()

logger.debug("finish interrupting record process threads");

}private void closekafkaconsumer()

logger.debug("finish interrupting consumer threads");

}private mapgetconsumerconfig()

/*** 獲取消費者引數

** @param propath

*/private void getconsumerprops(string propath) else

consumerprops.load(instream);

} catch (ioexception e) finally catch (ioexception e) }}

}}

/**

* 負責呼叫 recordprocessor進行資料處理

* created by zhangjianhua on 2018/10/12.

*/public class msgreceiver implements runnable

@override

public void run()

//最多輪詢1000ms

consumerrecordsrecords = kafkaconsumer.poll(1000);

if (records.count() > 0)

for (consumerrecord record : records)

//有 processor 可以處理該分割槽的 record了

processtask.addrecordtoqueue(record);

}} catch (exception e)

}} finally }}

public class recordprocessor implements runnable

@override

public void run()

//提交偏移給queue中

committoqueue();

} catch (interruptedexception e) }}

//將當前的消費偏移量放到queue中, 由msgreceiver進行提交

private void committoqueue()

//如果消費了設定的條數, 比如又消費了commitlength訊息

boolean arrivedcommitlength = this.completetask % commitlength == 0;

//獲取當前時間, 看是否已經到了需要提交的時間

localdatetime currenttime = localdatetime.now();

boolean arrivedtime = currenttime.isafter(lasttime.plus(committime));

if(arrivedcommitlength || arrivedtime)

}//consumer執行緒向處理執行緒的佇列中新增record

public void addrecordtoqueue(consumerrecordrecord) catch (interruptedexception e)

}private void process(consumerrecordrecord)

}

對處理程式recordprocessor進行抽象,抽象出basepropessor父類。以後業務需求需要不同的處理程式recordprocessor就可以靈活改變了。

反射來構建recordprocessor??在配置檔案配置具體要new的recordprocessor類路徑,然後在建立msgreceiver的時候傳遞進去。

參考kafka consumer多執行緒例項 : 如這篇文章所說的維護了多個worker來做具體業務處理,這篇文章用的是threadpoolexecutor執行緒池。

Kafka學習筆記 多執行緒開發消費者

從 kafka 0.10.1.0 版本開始,kafkaconsumer 就變為了雙線程的設計,即使用者主線程和心跳執行緒。所謂使用者主線程,就是你啟動 consumer 應用程式 main 方法的那個執行緒,而新引入的心跳執行緒 heartbeat thread 只負責定期給對應的 broker 機...

多執行緒系列之生產者和消費者

在之前接觸過pv操作的,應該對於生產者和消費者的情況有乙個了解,這裡學到多執行緒同步的時候,最恰當的乙個例子。pv操作就不多做解釋。生產者和消費者 author bobo public class producerconsumer 產品 窩頭 class wotou public string to...

Java多執行緒實現,生產者消費者

根據自己的理解簡單的實現了乙個,生產者,消費者模式的多執行緒,請大家多提寶貴意見 sleep wait 比較 sleep 是thread的靜態方法,是用來修改執行緒自身的執行方式。執行緒睡眠時間不會釋放鎖,睡眠完成自動開始執行。wait 是object類中的方法,用作執行緒之間的通訊,被其他執行緒呼...