Stream基礎篇 Stream入門應用

2021-10-04 21:30:19 字數 2981 閱讀 5412

分割槽是有狀態處理中的乙個關鍵概念,無論是效能還是一致性的原因,分割槽都是至關重要的,當生產者將訊息資料傳送給多個消費者例項時,保證擁有共同特徵的訊息資料始終是由同乙個消費者例項接收和處理。。例如,在時間窗平均計算示例中,來自任何給定感測器的所有測量值都由相同應用程式例項處理是很重要的。

注:要設定分割槽處理方案,您必須配置資料生成和資料消耗兩端。

本章概要

1、工程結構說明;

2、構建消費者工程例項;

3、自定義channel;

4、生產者另一種實現;

工程結構說明

1、主要構建如下兩個模組:

其中receiver也就是我們常說的消費者,sender自然就是生產者。。本小節主要例子將集中在receiver工程實現。

2、在父pom.xml中新增如下主要依賴:

utf-8

1.8 org.springframework.boot spring-boot-starter-parent 1.5.6.release

org.springframework.boot

spring-boot-starter-test

test

org.springframework.boot

spring-boot-starter-logging

org.springframework.cloud spring-cloud-dependencies dalston.sr5 pom import

構建消費者工程例項

stream-receiver工程實現如下:

1、在pom.xml中新增如下依賴:

org.springframework.cloud

spring-cloud-starter-stream-rabbit

其等價使用spring-cloud-stream-binder-rabbit依賴。

@test

public void sinksendertester()

public inte***ce sinksender

}

note:

如上生產者定義主要參考org.springframework.cloud.stream.messaging.source定義:

public inte***ce source

為了與接收訊息的通道一致,故修改@output註解引數為sink.input;

6、啟動服務,可以看到如下log,宣告了乙個queue,並新增了訂閱者:

7、檢視rabbitmq控制台的queus頁籤中存在上述佇列:

8、並執行單元測試可以看到如下log資訊:

小節,以上即實現了乙個最簡單的demo示例。

自定義channel

上一小節採用預設預定義的sink已經實現了訊息的消費,並模擬source定義了訊息的生產者,同理,本小節將參考源**sink設計,實現自定義channel的訊息處理。

1、首先簡單看下sink的定義:

public inte***ce sink

note:

其通道名稱定義為input;

@input註解表示輸入通道,當前應用將接收訊息;對應的@output註解表示輸出通道,當前應用將傳送訊息;註解引數表示通道的名稱,如果不提供名稱,則當前註解方法名即作為通道名稱;

必須以subscribablechannel作為返回型別;

2、參考sink的定義,自定義如下,通道名稱為myinput:

package com.cloud.shf.stream.sink;

public inte***ce mysink

3、在@enablebinding註解中加入當前定義的mysink介面:

package com.cloud.shf.stream.sink;

@enablebinding(value = )

public class sinkreceiver channel : {}」, mysink.channel, payload.tostring());}}

note:

必須在@enablebinding註解加入新定義的消費訊息介面,否則無法被註冊;

5、啟動服務,可以看到如下log:

6、佇列中新增訊息佇列:

7、執行單元測試後,可以看到接收到如下訊息:

小節:以上即實現了自定義訊息通道,比較簡單。。

生產者另一種實現

上述兩個小節中的生產者均通過直接注入定義的生產介面獲取messagechannel例項,然後傳送訊息,其實也可以直接注入messagechannel例項來完成訊息的傳送,本小節將實現模擬。

1、在mysink繼續新增如下兩個通道定義:

package com.cloud.shf.stream.sink;

public inte***ce mysink channel : {}」, mysink.output1_channel, payload.tostring());

}@streamlistener(mysink.output2_channel)

public void myreceive2(object payload) channel : {}」, mysink.output2_channel, payload.tostring());

}

@resource(name = mysink.output2_channel)

private messagechannel send2;

@test

public void myoutputsourcetester()

public inte***ce myoutputsource

}

note:

直接注入messagechannel例項,並採用通道名稱;

4、執行單元測試:

nodeJS基礎 Stream用法

stream是nodejs的乙個核心模組,在nodejs中應用非常廣泛,比如http 伺服器request和response物件都是流 可讀流的用法 let fs require fs let path require path let rs fs.createreadstream path.joi...

Stream概述以及建立Stream物件

stream是用來操作容器中的資料,例如過濾,對映,規約,排序,查詢記錄等等 stream是和cpu打交道 集合關注的是資料的儲存,是和記憶體打交道 總結 集合說的是資料,stream說的是計算 注意 stream 自己不會儲存元素 資料 資料仍然是在集合當中。類似於迭代器,迭代器是用來遍歷集合的,...

Stream 1 關於Stream的知識分享

一 什麼是stream 查了一下msdn,他是這麼解釋的 提供位元組序列的一般檢視。這個解釋有點太籠統了,下面,我們來仔細的捋一下 1 什麼是位元組序列?位元組序列指的是 位元組物件被儲存為連續的位元組序列,位元組按照一定的順序進行排序組成了位元組序列。那麼關於流的解釋可以抽象為下列情況 一條河中有...