springboot kafka最簡易入門Demo

2021-09-10 09:07:40 字數 3203 閱讀 3397

1.pom.xml中加入kafka依賴:

org.springframework.kafka

spring-kafka

## kafka ##

spring.kafka.bootstrap-servers=127.0.0.1:9092

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.stringserializer

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.stringserializer

spring.kafka.consumer.group-id=test

spring.kafka.consumer.enable-auto-commit=true

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.stringdeserializer

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.stringdeserializer

spring.kafka.topic=mqtt_location_data

3.kafka訊息生產者:

import lombok.extern.slf4j.slf4j;

import org.springframework.beans.factory.annotation.autowired;

import org.springframework.beans.factory.annotation.value;

import org.springframework.kafka.core.kafkatemplate;

import org.springframework.scheduling.annotation.enablescheduling;

import org.springframework.scheduling.annotation.scheduled;

import org.springframework.stereotype.component;

import org.springframework.util.concurrent.listenablefuture;

/** * @description: kafka生產者

* @author:

* @create: 2019-01-29 17:38

**/@component

@slf4j

public class kafkaproducer ")

private string topic;

/*** 傳送kafka訊息

** @param jsonstring

*/public void send(string jsonstring)

}

4.kafka訊息消費者:

import lombok.extern.slf4j.slf4j;

import org.apache.kafka.clients.consumer.consumerrecord;

import org.springframework.kafka.annotation.kafkalistener;

import org.springframework.stereotype.component;

/** * @description: kafka消費者

* @author:

* @create: 2019-01-29 17:47

**/@component

@slf4j

public class kafkaconsumer ")

public void listen(consumerrecord, ?> record) , offset={}, message={}", record.topic(), record.offset(), record.value());}}

5.啟動類測試:

import kafka.producer.kafkaproducer;

import org.springframework.beans.factory.annotation.autowired;

import org.springframework.boot.commandlinerunner;

/** * description:

* author:

* date: 2018-09-30 09:15

**/public static void main(string args)

@autowired

kafkaproducer kafkaproducer;

@override

public void run(string... args) throws exception }}

結果:

配置引數補充:

#acks = 0 如果設定為零,則生產者將不會等待來自伺服器的任何確認,該記錄將立即新增到套接字緩衝區並視為已傳送。在這種情況下,無法保證伺服器已收到記錄,並且重試配置將不會生效(因為客戶端通常不會知道任何故障),為每條記錄返回的偏移量始終設定為-1。

#acks = 1 這意味著leader會將記錄寫入其本地日誌,但無需等待所有副本伺服器的完全確認即可做出回應,在這種情況下,如果leader在確認記錄後立即失敗,但在將資料複製到所有的副本伺服器之前,則記錄將會丟失。

#acks = all 這意味著leader將等待完整的同步副本集以確認記錄,這保證了只要至少乙個同步副本伺服器仍然存活,記錄就不會丟失,這是最強有力的保證,這相當於acks = -1的設定。

#可以設定的值為:all, -1, 0, 1

spring.kafka.producer.acks=1

#該值大於零時,表示啟用重試失敗的傳送次數

spring.kafka.producer.retries=3

CMake簡易入門

首發於fxm5547的部落格 cmake minimum required version 2.6 project itest c 標準 set cmake cxx standard 11 指定參與編譯的原始檔 add executable itest src main.cpp src cal ca...

MySQL簡易入門

mysql 是乙個網際網路繞不過去的坎,總覺得很簡單,一切似乎都圍繞著 curd,但是不能脫離這個核心,本次的部落格其 mysql 的一些基本概念作為主題,力求用自己的語言,將其中的概念說清楚。今天是在銀川的第二天,第一次坐完飛機,現在在見家長的過程中,偷得片刻悠閒,寫點東西作為總結 mysql 可...

Python簡易入門

字串可用單引號 雙引號和三引號。轉義字元 可以轉義很多字元,比如 n表示換行,t表示製表符,b表示退格符,字元 本身也需要轉義,因此,表示的字元就是 取子字串有兩種方法,一種是用 索引,一種是用切片運演算法 檢視變數型別 type 型別轉換 列表 list 元組 tuple 集合 set 字典 di...