storm程式設計指南

2022-07-13 18:27:12 字數 3529 閱讀 1344

@(部落格文章)[storm|大資料]

目錄本文介紹了storm的基本程式設計,關於trident的程式設計,請見???

本示例使用storm執行經典的wordcount程式,拓撲如下:

sentence-spout—>split-bolt—>count-bolt—>report-bolt

分別完成句子的產生、拆分出單詞、單詞數量統計、統計結果輸出

完整**請見

以下是關鍵**的分析。

public class sentencespout extends baserichspout ;

public void open(map conf, topologycontext context,

spoutoutputcollector collector)

public void declareoutputfields(outputfieldsdeclarer declarer)

public void nexttuple()

try catch (interruptedexception e)

}}

上述類中,將string陣列中內容逐行傳送出去,主要的方法有:

(1)open()方法完成spout的初始化工作,與bolt的prepare()方法類似

(2)declareoutputfileds()定義了傳送內容的欄位名稱與字段數量,bolt中的方法名稱一樣。

(3)nexttuple()方法是對每乙個需要處理的資料均會執行的操作,也bolt的executor()方法類似。它是整個邏輯處理的核心,通過emit()方法將資料傳送到拓撲中的下乙個節點。

public class splitsentencebolt extends baserichbolt

public void declareoutputfields(outputfieldsdeclarer declarer)

public void execute(tuple input)

}}

三個方法的含義與spout類似,這個類根據空格把收到的句子進行拆分,拆成乙個乙個的單詞,然後把單詞逐個傳送出去。

input.getstringbyfield("sentence」)可以根據上一節點傳送的關鍵字獲取到相應的內容。

public class wordcountbolt extends baserichbolt

public void declareoutputfields(outputfieldsdeclarer declarer)

public void execute(tuple input)

count++;

this.counts.put(word, count);

this.collector.emit(new values(word,count));

}}

本類將接收到的word進行數量統計,並把結果傳送出去。

這個bolt傳送了2個filed:

declarer.declare(new fields("word","count"));

this.collector.emit(new values(word,count));

public class reportbolt extends baserichbolt

public void declareoutputfields(outputfieldsdeclarer declarer)

public void execute(tuple input)

public void cleanup()

super.cleanup();

}

}

本類將從wordcount-bolt接收到的資料進行輸出。

先將結果放到乙個map中,當topo被關閉時,會呼叫cleanup()方法,此時將map中的內容輸出。

public class wordcounttopology  catch (interruptedexception e) 

cluster.killtopology(topology_name);

cluster.shutdown();

} else catch (alreadyaliveexception e) catch (invalidtopologyexception e) }}

}

關鍵步驟為:

(1)建立topologybuilder,並為這個builder指定spout與bolt

builder.setspout(sentence_spout_id, spout);

builder.setbolt(split_bolt_id, splitbolt).shufflegrouping(

sentence_spout_id);

builder.setbolt(count_bolt_id, countbolt).fieldsgrouping(split_bolt_id,

new fields("word"));

builder.setbolt(report_bolt_id, reportbolt).globalgrouping(

count_bolt_id);

(2)建立conf物件

config conf = new config();
這個物件用於指定一些與拓撲相關的屬性,如並行度、nimbus位址等。

(3)建立並執行拓撲,這裡使用了2種方式

一是當沒有引數時,建立乙個localcluster,在本地上直接執行,執行10秒後,關閉集群:

localcluster cluster = new localcluster();

cluster.submittopology(topology_name, conf,builder.createtopology());

thread.sleep(10000);

cluster.killtopology(topology_name);

cluster.shutdown();

二是有引數是,將拓撲提交到集群中:

stormsubmitter.submittopology(args[0], conf,builder.createtopology());
第乙個引數為拓撲的名稱。

6、本地執行

直接在eclipse中執行即可,輸出結果在console中看到

7、集群執行

(1)編譯並打包

mvn clean compile
(2)把編譯好的jar包上傳到nimbus機器上,然後

storm jar com.ljh.storm.5_stormdemo  com.ljh.storm.wordcount.wordcounttopology  topology_name
將拓撲提交到集群中。

Storm 程式設計模型

元組 tuple 是訊息傳遞的基本單元,是乙個命名的值列表,元組中的字段可以是任何型別的對 象。storm使用元組作為其資料模型,元組支援所有的基本型別 字串和位元組陣列作為字段值,只要實現 型別的序列化介面就可以使用該型別的物件。元組本來應該是乙個key value的map,但是由於各個組 件間傳...

Storm 程式設計模型

元組 tuple 是訊息傳遞的基本單元,是乙個命名的值列表,元組中的字段可以是任何型別的對 象。storm使用元組作為其資料模型,元組支援所有的基本型別 字串和位元組陣列作為字段值,只要實現 型別的序列化介面就可以使用該型別的物件。元組本來應該是乙個key value的map,但是由於各個組 件間傳...

Storm核心元件 程式設計模型

storm簡介 storm是用來做實時計算的框架,所以介紹storm之前需要知道什麼是流式計算。流式計算 資料實時產生 資料實時傳輸 資料實時計算 實時展示 代表技術 flume實時獲取資料 kafka metaq實時資料儲存 storm jstorm實時資料計算 redis實時結果快取 持久化儲存...