storm併發度理解

2022-09-15 04:24:13 字數 3573 閱讀 5448

乙個執行中的拓撲是由什麼組成的:worker程序,executors和tasks。

storm是按照下面3種主要的部分來區分storm集群中乙個實際執行的拓撲的:

worker程序、

executors (執行緒) 以及真正實施計算的

tasks(任務),先簡單回顧一下storm幾個核心概念:

同時還有幾個關鍵的元件:

下面給出一張經典的描述work,executor和task三個部分之間關係的示例

翻譯一下大致意思就是:

總結一下就是:乙個work(程序)裡可以包含多個executor(執行緒),乙個executor內部可以包含乙個或者多個task(任務),執行緒可以並行,但任務(task)只能序列執行

(1)1個worker程序執行乙個拓撲的子集,1個worker程序從屬於1個特定的拓撲,並執行著這個拓撲的1個或多個元件(spout或bolt)的1個或多個executor,乙個執行中的拓撲包括集群中的許多臺機器上的許多個這樣的程序。

(2)1個executor是1個worker程序生成的1個執行緒,它可能執行著某個元件(spout或bolt)的1個或多個task

(3)task執行著實際的資料處理,你用**實現的每乙個spout或bolt就相當於分布於整個集群中的許多個task。在1個拓撲的生命週期中,1個元件的task的數量總是一樣的,但是1個元件的executor(執行緒)的數量可以隨著時間而改變這意味著下面的條件總是成立:thread的數量 <= task的數量。預設情況下,task的數量與executor的數量一樣,例如,storm會在每1個執行緒執行1個task。

定義乙個簡單的拓撲

topologybuilder builder = new

topologybuilder();

builder.setspout(

"readspout

" , new

wordspout());

builder.setbolt(

"splitbolt

" , new wordsplit()).shufflegrouping("

readspout");

builder.setbolt(

"countbolt

" , new wordcounter()).fieldsgrouping("

splitbolt

" , new fields("

word

"));

stormtopology topology = builder .createtopology();

在這段**中,我們沒有設定併發度,也沒有設定worker的數量。storm預設就會給這個topology分配1個worker(程序),在這個worker啟動三個執行緒,1個用來執行wordspout,1個執行緒用來執行wordsplit,1個執行緒用來執行wordcounter。下面我們通過api設定該topology的併發度

builder.setbolt("

splitbolt

" , new wordsplit(),2).shufflegrouping("

readspout

" );

我們設定並行度為2的時候,意味著有2個 wordsplit例項,而storm會分配2個executer來分別執行乙個例項,表示有兩個wordsplit執行緒同時執行分詞操作,與此同時,我們也可以給wordspout和wordcount設定並行度。

wordspout:在wordcount案例中,簡單使用的是乙個文字檔案,如果我們給wordspout設定併發度為10,那麼就會有10個wordspout去爭搶讀取該文字檔案,最終導致我們的結果是實際的10倍,類似多執行緒售賣火車票。這實際上就是執行緒安全的問題,因為我們的資料來源無法保證一行資料只被讀取一次。

wordcounter:wordcounter是乙個彙總的bolt,統計個每個單詞出現的次數,如果我們將其併發度設定為2甚至更高,最終會導致每個wordcounter例項的統計的只是實際結果的一部分,因此也是不合適的,一般情況下,我們如果我們的topology中的最後乙個bolt如果是彙總型的,併發度一般都設定預設為1。

storm裡面有6種型別的stream grouping:

(1) shuffle grouping: 隨機分組, 隨機派發stream裡面的tuple, 保證每個bolt接收到的tuple數目相同。輪詢,平均分配。

(2) fields grouping:按欄位分組, 比如按userid來分組, 具有同樣userid的tuple會被分到相同的bolts, 而不同的userid則會被分配到不同的bolts。

(3)all grouping: 廣播傳送, 對於每乙個tuple, 所有的bolts都會收到。

(4)global grouping: 全域性分組, 這個tuple被分配到storm中的乙個bolt的其中乙個task。再具體一點就是分配給id值最低的那個task。

(5)non grouping: 不分組, 這個分組的意思是說stream不關心到底誰會收到它的tuple。目前這種分組和shuffle grouping是一樣的效果,不平均分配。

(6)direct grouping: 直接分組, 這是一種比較特別的分組方法,用這種分組意味著訊息的傳送者舉鼎由訊息接收者的哪個task處理這個訊息。 只有被宣告為direct stream的訊息流可以宣告這種分組方法。而且這種訊息tuple必須使用emitdirect方法來發射。訊息處理者可以通過topologycontext來或者處理它的訊息的taskid(outputcollector.emit方法也會返回taskid) 

程式設計方式配置並行程度

-------------------------

(1) 設定工作程序數

config.setnumworkers(2); // 該topology由兩個worker(程序)執行,在分布式中,兩個程序均衡的分別在storm集群的各個節點

(2) 設定executor數

topologybuiler builder = ... ;

bulder.setspout(..,3); //設定spout的執行執行緒數,表示該spout由三條執行緒來執行

bulder.setbolt(..,3); //設定bolt的執行執行緒數,表示該bolt由三條執行緒來執行

(3)task數

builder.setspout(..,2).setnumtasks(3); //設定spout的task數,表示該spout的兩條執行緒來執行其3個任務,其中一條執行緒執行兩個task,另外乙個執行1個task,這說明task在executor內部是序列執行的

builder.setbolt(...,1).setnumtasks(3); //設定bolt的task數,表示該bolt的一條執行緒執行其3個task任務

預設情況下,如果沒有指定task數量,每個執行執行緒執行乙個task,

併發程度 = spout's task + bolt's task.

Storm之併發機制

為了提高storm的並行能力,通常需要設定並行。1.1 worker 程序 設定worker程序數 config.setnumworkers int workers 1.2 executor 執行緒 設定executor執行緒數 topologybuilder.setspout string id,...

storm理解(未看)

在2011年storm開源之前,由於hadoop的火紅,整個業界都在喋喋不休地談論大資料。hadoop的高吞吐,海量資料處理的能力使得人們可以方便地處理海量資料。但是,hadoop的缺點也和它的優點同樣鮮明 延遲大,響應緩慢,運維複雜。有需求也就有創造,在hadoop基本奠定了大資料霸主地位的時候,...

Storm概念理解

組成 topology是storm裡的最高抽象概念,相當於hadoop裡的mapreduce,topology 流轉換圖 由spouts和bolts組成。spout建立stream,stream由無限的tuple 元組 構成。bolts接收spout流出的tuple並進行處理,處理後生成的新的tup...