Spring Cloud Stream使用細節

2021-09-14 06:34:09 字數 4230 閱讀 8359

上篇文章我們看了spring cloud stream的基本使用,小夥伴們對spring cloud stream應該也有了乙個基本的了解,但是上篇文章中的訊息我們是從rabbitmq的web管理頁面發來的,如果我們想要從**中傳送訊息呢?本文我們就來看看spring cloud stream的一些使用細節。

1.使用spring cloud搭建服務註冊中心

2.使用spring cloud搭建高可用服務註冊中心

3.spring cloud中服務的發現與消費

4.eureka中的核心概念

5.什麼是客戶端負載均衡

6.spring resttemplate中幾種常見的請求方式

7.resttemplate的逆襲之路,從傳送請求到負載均衡

8.spring cloud中負載均衡器概覽

9.spring cloud中的負載均衡策略

10.spring cloud中的斷路器hystrix

11.spring cloud自定義hystrix請求命令

12.spring cloud中hystrix的服務降級與異常處理

13.spring cloud中hystrix的請求快取

14.spring cloud中hystrix的請求合併

15.spring cloud中hystrix儀錶盤與turbine集群監控

16.spring cloud中宣告式服務呼叫feign

17.spring cloud中feign的繼承特性

18.spring cloud中feign配置詳解

19.spring cloud中的api閘道器服務zuul

20.spring cloud zuul中路由配置細節

21.spring cloud zuul中異常處理細節

22.分布式配置中心spring cloud config初窺

23.spring cloud config服務端配置細節(一)

24.spring cloud config服務端配置細節(二)之加密解密

25.spring cloud config客戶端配置細節

26.spring cloud bus之rabbitmq初窺

27.spring cloud bus整合rabbitmq

28.spring cloud bus整合kafka

29.spring cloud stream初窺

上篇文章我們提到了sink和source兩個介面,這兩個介面中分別定義了輸入通道和輸出通道,而processor通過繼承source和sink,同時具有輸入通道和輸出通道。這裡我們就模仿sink和source,來定義乙個自己的訊息通道。

還是在上文的基礎上,首先我們定義乙個介面叫做mysink,如下:

public inte***ce mysink
這裡我們定義了乙個名為mychannel的訊息輸入通道,@input註解的引數則表示了訊息通道的名稱,同時我們還定義了乙個方法返回乙個subscribablechannel物件,該物件用來維護訊息通道訂閱者。然後,我們再定義乙個名為mysource的介面,如下:

public inte***ce mysource
@output註解中描述了訊息通道的名稱,還是mychannel,然後這裡我們也定義了乙個返回messagechannel物件的方法,該物件中有乙個向訊息通道傳送訊息的方法。

最後我們定義乙個訊息接收類,如下:

@enablebinding(value = )

public class sinkreceiver2

}

ok,我們在這裡繫結訊息通道,然後監聽自定義的訊息通道,最後來乙個單元測試測試一下,如下:

@runwith(springjunit4classrunner.class)

@enablebinding(mysource.class)

@autowired

private mysource mysource;

@test

public void contextloads()

}

執行單元測試,我們可以看到如下日誌,表示訊息傳送成功了:

如果想要傳送物件也可以直接傳送,不用進行物件轉換,如下:

傳送:

book book = new book(1l, "三國演義", "羅貫中");

mysource.output().send(messagebuilder.withpayload(book).build());

接收:

@streamlistener(mysink.input)

public void receive(book playload)

如果我們想要在接收成功後給乙個回執,也是ok的,如下:

@streamlistener(mysink.input)

@sendto(source.output)//定義回執傳送的訊息通道

public string receive(book playload)

方法的返回值就是回執訊息,回執訊息在系統預設的output通道中,我們如果想要接收這個訊息,當然就要監聽這個通道,如下:

@streamlistener(source.output)

public void receive2(string msg)

當然要記得source類也要在@enablebinding註解中進行繫結。此時執行結果如下:

由於我們的服務可能會有多個例項同時在執行,如果不做任何設定,此時傳送一條訊息將會被所有的例項接收到,但是有的時候我們可能只希望訊息被乙個例項所接收,這個需求我們可以通過訊息分組來解決。方式很簡單,給專案配置訊息組和主題,如下:

spring.cloud.stream.bindings.mychannel.group=g1

spring.cloud.stream.bindings.mychannel.destination=dest1

這裡我們設定該工程都屬於g1消費組,輸入通道的主題名則為dest1。這裡配置完成之後,我們在訊息傳送方做如下配置:

spring.cloud.stream.bindings.mychannel.destination=dest1
也配置訊息主題名為dest1(如果傳送和接收就在同乙個應用中,則這裡可以不配置)。ok,此時我們將我們的專案啟動兩個例項,注意兩個例項的埠不一樣,此時如果我們再傳送訊息,則只會被兩個例項中的乙個接收到,另外乙個應用則接收不到,但是到底是兩個例項中的哪乙個接收,則是不確定的。

有的時候,我們可能需要相同特徵的訊息能夠總是被傳送到同乙個消費者上去處理,如果我們只是單純的使用消費組則無法實現功能,此時我們需要借助於訊息分割槽,訊息分割槽之後,具有相同特徵的訊息就可以總是被同乙個消費者處理了,配置方式如下(這裡的配置都是在消費組的配置基礎上完成的):

在消費者上新增如下配置:

spring.cloud.stream.bindings.mychannel.consumer.partitioned=true

spring.cloud.stream.instance-count=2

spring.cloud.stream.instance-index=0

關於這個配置我說三點:

1.第一行表示開啟訊息分割槽

2.第二行表示當前訊息者的總的例項個數

3.第三行表示當前例項的索引,從0開始,當我們啟動多個例項時,需要在啟動時在命令列配置索引

然後在訊息生產者上新增如下配置:

spring.cloud.stream.bindings.mychannel.producer.partitionkeyexpression=payload

spring.cloud.stream.bindings.mychannel.producer.partitioncount=2

第一行配置設定了分割槽鍵的表示式規則,第二行則設定了訊息分割槽數量。

ok,此時我們再次啟動多個消費者例項,然後重**送多條訊息,這些訊息都將被同乙個消費者處理掉。

Spring Cloud Stream使用入門

前面博文嘗試使用了srping cloudbus,裡面引入了spring cloud starter bus kafka和spring cloud starter bus amqp,實時上它們分別依賴了spring cloud starter stream kafka和spring cloud st...

spring cloud stream基本使用

spring cloud stream 通過定義繫結器作為中間層,完美地實現了應用程式與訊息中介軟體細節之間的隔離。通過向應用程式暴露統一的channel通道,使得應用程式不需要再考慮各種不同的訊息中介軟體的實現 spring cloud stream 中的訊息通訊方式遵循了發布 訂閱模式,當一條訊...

SpringCloud stream 訊息分割槽

1.stream的分割槽是當訊息的提供者傳送了相同的訊息的時候,如果被集群的中的某個節點消費了那麼如果提供者在此傳送相同的訊息的時候 一致會被同乙個的消費者消費掉 分割槽的配置 提供者端需要配置的資訊 新增rabbitmq資訊 spring.rabbitmq.host 192.168.177.140...