Storm常見模式 流聚合

2022-08-31 11:57:11 字數 3436 閱讀 8825

流聚合(stream join)是指將具有共同元組(tuple)欄位的資料流(兩個或者多個)聚合形成乙個新的資料流的過程。

從定義上看,流聚合和sql中表的聚合(table join)很像,但是二者有明顯的區別:table join的輸入是有限的,並且join的語義是非常明確的;而流聚合的語義是不明確的並且輸入流是無限的。

資料流的聚合型別跟具體的應用有關。一些應用把兩個流發出的所有的tuple都聚合起來——不管多長時間;而另外一些應用則只會聚合一些特定的tuple。而另外一些應用的聚合邏輯又可能完全不一樣。而這些聚合型別裡面最常見的型別是把所有的輸入流進行一樣的劃分,這個在storm裡面用fields grouping在相同字段上進行grouping就可以實現。

下面是對storm-starter(**見:中有關兩個流的聚合的示例**剖析:

先看一下入口類singlejoinexample

(1)這裡首先建立了兩個發射源spout,分別是genderspout和agespout:

feederspout genderspout = new feederspout(new fields("id", "gender"));

feederspout agespout = new feederspout(new fields("id", "age"));

topologybuilder builder = new topologybuilder();

builder.setspout("gender", genderspout);

builder.setspout("age", agespout);

其中genderspout包含兩個tuple欄位:id和gender,agespout包含兩個tuple欄位:id和age(這裡流聚合就是通過將相同id的tuple進行聚合,得到乙個新的輸出流,包含id、gender和age欄位)。

(2)為了不同的資料流中的同乙個id的tuple能夠落到同乙個task中進行處理,這裡使用了storm中的fileds grouping在id欄位上進行分組劃分:

builder.setbolt("join", new singlejoinbolt(new fields("gender", "age")))

.fieldsgrouping("gender", new fields("id"))

.fieldsgrouping("age", new fields("id"));

從中可以看到,singlejoinbolt就是真正進行流聚合的地方。下面我們來看看:

(1)singlejoinbolt構造時接收乙個fileds物件,其中傳進的是聚合後將要被輸出的字段(這裡就是gender和age欄位),儲存到變數_outfileds中。

(2)接下來看看完成singlejoinbolt的構造後,singlejoinbolt在真正開始接收處理tuple之前所做的準備工作(**見prepare方法):

a)首先,將儲存outputcollector物件,建立timecachemap物件,設定超時**介面,用於tuple處理失敗時fail訊息;緊接著記錄資料來源的個數:

_collector = collector;        int timeout = ((number) conf.get(config.topology_message_timeout_secs)).intvalue();

_pending = new timecachemap, map>(timeout, new expirecallback());

_numsources = context.getthissources().size();

b)遍歷topologycontext中不同資料來源,得到所有資料來源(這裡就是genderspout和agespout)中公共的filed欄位,儲存到變數_idfields中(例子中就是id欄位),同時將_outfileds中欄位所在資料來源記錄下來,儲存到一張hashmap中_fieldlocations,以便聚合後獲取對應的字段值。

setidfields = null;        for(globalstreamid source: context.getthissources().keyset()) }}

}_idfields = new fields(new arraylist(idfields));        

if(_fieldlocations.size()!=_outfields.size()) 

(3)好了,下面開始兩個spout流的聚合過程了(**見execute方法):

首先,從tuple中獲取_idfields欄位,如果不存在於等待被處理的佇列_pending中,則加入一行,其中key是獲取到的_idfields欄位,value是乙個空的hashmap物件,記錄globalstreamid到tuple的對映。

listid = tuple.select(_idfields);

globalstreamid streamid = new globalstreamid(tuple.getsourcecomponent(), tuple.getsourcestreamid());        if(!_pending.containskey(id)) 

從_pending佇列中,獲取當前globalstreamid streamid對應的hashmap物件parts中:

mapparts = _pending.get(id);
如果streamid已經包含其中,則丟擲異常,接收到同乙個spout中的兩條一樣id的tuple,否則將該streamid加入parts中:

if(parts.containskey(streamid)) throw new runtimeexception("received same side of single join twice");

parts.put(streamid, tuple);

如果parts已經包含了聚合資料來源的個數_numsources時,從_pending佇列中移除這條記錄,然後開始構造聚合後的結果字段:依次遍歷_outfields中各個字段,從_fieldlocations中取到這些outfiled欄位對應的globalstreamid,緊接著從parts中取出globalstreamid對應的outfiled,放入聚合後的結果中。

if(parts.size()==_numsources)
最後通過_collector將parts中存放的tuple和聚合後的輸出結果發射出去,並ack這些tuple已經處理成功。

_collector.emit( arraylist
否則,繼續等待兩個spout流中這個streamid都到齊後再進行聚合處理。

(4)最後,宣告一下輸出字段(**見declareoutputfields方法):

declarer.declare(_outfields);

storm的流分組

1.shuffle grouping 隨機分組 這種方式會隨機分發tuple給bolt的各個task,每個bolt例項接收到的相同數量的tuple。2.fields grouping 按欄位分組 根據指定欄位的值進行分組。比如說,乙個資料流根據 word 字段進行分組,所有具有相同 word 字段值...

Storm系列 十 聚流示例

功能 將多個資料來源的資料匯集到乙個處理單元進行集中分類處理 入口類testmain 1 publicclass testmain 19 資料來源類randomwordspout1 輸出欄位為name 1 publicclass randomwordspout1 extends baserichsp...

storm的流分組策略

關於storm的基礎,參照我這篇文章 流式計算storm 關於併發和並行,參照我這篇文章 併發和並行 關於storm的並行度解釋,參照我這篇文章 storm的並行度解釋 關於storm的流分組策略,參照我這篇文章 storm的流分組策略 關於storm的訊息可靠機制,參照我這篇文章 storm的訊息...