接收kafka訊息

2021-10-09 17:28:14 字數 1327 閱讀 3391

kafka:

server: 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083,127.0.0.1:8084

topics: eseal-hr-test-2

gourp:

id: hthr

@value("$")

string servers;

@value("$")

string groupid;

@value("$")

boolean autocommit;

@value("$")

int autocommitinterval;

@value("$")

int sessiontimeout;

@value("$")

string key;

@value("$")

string value;

@value("$")

string topic;

@value("$")

string resetoffset;

@postconstruct

public void initkafka(), key = {}, value = {}", record.offset(), record.key(), record.value());

messagenoticedto dto = jsonutil.tobean(record.value(), messagenoticedto.class);

log.info("返回資訊:"+jsonutil.tojson(dto));

if(null != dto)

}}catch (exception e)}}

});//啟動kafka增加開關控制

string kafkaflag = profileclient.getprofilevaluebyoptions("hhr.onboard.kafka.exc.flag");

if(stringutil.equalsignorecase(kafkaflag,"y"))

}

@postconstruct該註解被用來修飾乙個非靜態的void()方法。被@postconstruct修飾的方法會在伺服器載入servlet的時候執行,並且只會被伺服器執行一次。postconstruct在建構函式之後執行,init()方法之前執行。

通常我們會是在spring框架中使用到@postconstruct註解 該註解的方法在整個bean初始化中的執行順序:

constructor(構造方法) -> @autowired(依賴注入) -> @postconstruct(注釋的方法)

kafka 訊息傳送和接收

傳送 例項 public class kafkaproducerdemo extends thread override public void run else catch interruptedexception e catch executionexception e num try catc...

go 實現 kafka 訊息傳送 接收

kafka是訊息中介軟體的一種,是一種分布式流平台,是用於構建實時資料管道和流應用程式。具有橫向擴充套件,容錯,wicked fast 快 等優點。生產者 producer 將訊息記錄 record 傳送到kafka中的主題中 topic 乙個主題可以有多個分割槽 partition 訊息最終儲存在...

通過kafka傳送和接收訊息

生產者配置類 configuration enablekafka public class kafkaproducerconfig private string address value private string batchsize value private string linger pu...