使用Storm實現WordSum

2021-09-24 17:17:30 字數 2152 閱讀 9598

3.2 mybolt 類

3.3 測試類

spout類extends baserichspout,baserichspout extends basecomponent implements irichspout,irichspout extends ispout;分析ispout幾個方法:

1、void open(map conf, topologycontext context, spoutoutputcollector collector);

任務呼叫的時候,在乙個worker上初始化;提供了集群拓撲作業的配置資訊、當前作業的任務資訊、collector用來傳送封裝的tuples單元

2、void nexttuple();

strom要求spout傳送資料給output collector,非阻塞式方法,如果沒有資料傳送,該方法就會return;

bolt類extends baserichbolt,baserichbolt實現irichbolt,irichbolt繼承ibolt,接下來分析ibolt幾個方法:

1、 void prepare(map stormconf, topologycontext context, outputcollector collector);

根據注釋,當任務來了的時候,這個會在集群中的某個worker節點被初始化,他提供了bolt的執行環境。3個引數:stormconf給當前bolt準備的配置物件;context可以獲取任務的位置資訊,包括任務id和元件id、輸入輸出流的資訊;collector傳送資料

2、 void execute(tuple input);

處理乙個單一的tuple輸入流,元組物件包含了metadata元資料資訊(封裝了傳送的資料的來自於哪個元件、哪個流、哪個任務),value值可以被獲取到tuple物件的getvalue方法

當然,它們都有乙個共同的方法-declareoutputfields,負責給所有的流宣告了output輸出策略。

在這個類中,我們首先要在open方法中初始化,然後在nexttuple方法中,不停的採集資料、向後傳送資料。在呼叫collector的emit方法向後發射資料的時候,要對後面的bolt宣告傳送資料的欄位名稱。類似於android中使用intent、sp傳值時定義的型別、名稱。

public class myspout extends baserichspout

/*** 採集,向後傳送資料

* */

@override

public void nexttuple()

/*** 向接收資料的邏輯處理單元傳送資料的欄位名稱

* */

@override

public void declareoutputfields(outputfieldsdeclarer declarer)

}

看values()的實現,它其實就是乙個可變陣列,裡面在不停的迴圈:

public

values

(object.

.. vals)

}

這個類的作用就是接受上乙個spout傳送過來的資料,並求和累加。過程還是首先在prepare方法中進行初始化,然後在execute方法中根據spout定義的傳送資料欄位名稱,來獲取到傳遞過來的資料。很顯然這一步就能滿足需求,無需再繼續向後發射資料了。

public

class

mybolt

extends

baserichbolt

/** * 獲取資料,有必要的話,向後繼續傳送資料

* */

@override

public

void

execute

(tuple input)

@override

public

void

declareoutputfields

(outputfieldsdeclarer declarer)

}

首先要構建拓撲結構,並設定spout、bolt,指定分發策略。這裡才用的是shufflegrouping的分發策略。最後建立本地化集群,將我們的作業提交到集群執行即可。

public

class

test

}

使用Storm實現WordCount

這裡用到的bolt可能會多一些,乙個spout負責推送資料,乙個bolt負責切詞,再來乙個bolt負責統計。最關鍵的是,相同的單詞應該交給同乙個bolt來處理,分發策略的選用就得嚴謹一些了,依據分發的單詞來分發 field 這個類就負責將準備的資料向後傳送,除此之外,什麼都不做。public cla...

storm使用範例

此案例實現從陣列中隨機讀取字串傳送到bolt,bolt將字串變成大寫傳送到下乙個bolt,bolt將字串加上時間戳然後寫到檔案中 public class randomwordspout extends baserichspout 初始化方法,在spout元件例項化時呼叫一次 override pu...

storm 使用streamid的例子

有時同乙個spout或者bolt需要發出多類不同的訊息。如對乙個字串拆分為單詞後,將各單詞一一傳送給各節點,傳送完後再傳送一條結束的任務。使用方法如下 1.spout的declareoutputfields方法中定義多組stream override publicvoid declareoutput...