storm 使用streamid的例子

2021-09-02 18:34:16 字數 1464 閱讀 5780

有時同乙個spout或者bolt需要發出多類不同的訊息。如對乙個字串拆分為單詞後,將各單詞一一傳送給各節點,傳送完後再傳送一條結束的任務。

使用方法如下:

1.spout的declareoutputfields方法中定義多組stream:

@override

publicvoid

declareoutputfields(outputfieldsdeclarer

declarer)

在emit時指定streamid,第乙個引數。如

_collector

.emit(

"working"

,new

values(

word

, filename

));

_collector

.emit(

"stop"

,new

values(

filename

));

注,是第乙個引數,和tupleid不是一回事,tupleid是在values引數後面。

2.bolt如何使用

publicvoid

execute(tuple

tuple)

if

(streamid

.equals(

"stop"))

} 3.topology中如何定義呢:

在流分組的第二個引數上:

builder

.setspout(

"wordspout"

,new

wordspout(), 4);

builder

.setbolt(

"countbolt"

,new

countbolt(), 4)

.fieldsgrouping(

"wordspout"

, "working"

,new

fields(

"word"))

.allgrouping(

"wordspout"

, "stop"

);

storm使用範例

此案例實現從陣列中隨機讀取字串傳送到bolt,bolt將字串變成大寫傳送到下乙個bolt,bolt將字串加上時間戳然後寫到檔案中 public class randomwordspout extends baserichspout 初始化方法,在spout元件例項化時呼叫一次 override pu...

使用Storm實現WordSum

3.2 mybolt 類 3.3 測試類 spout類extends baserichspout,baserichspout extends basecomponent implements irichspout,irichspout extends ispout 分析ispout幾個方法 1 vo...

使用Storm實現WordCount

這裡用到的bolt可能會多一些,乙個spout負責推送資料,乙個bolt負責切詞,再來乙個bolt負責統計。最關鍵的是,相同的單詞應該交給同乙個bolt來處理,分發策略的選用就得嚴謹一些了,依據分發的單詞來分發 field 這個類就負責將準備的資料向後傳送,除此之外,什麼都不做。public cla...