Storm 中的ack機制

2021-08-17 05:22:22 字數 4215 閱讀 8906

一.ack原理

storm中有個特殊的task名叫acker,他們負責跟蹤spout發出的每乙個tuple的tuple樹(因為乙個tuple通過spout發出了,經過每乙個bolt處理後,會生成乙個新的tuple傳送出去)。當acker(框架自啟動的task)發現乙個tuple樹已經處理完成了,它會傳送乙個訊息給產生這個tuple的那個task。

acker的跟蹤演算法是storm的主要突破之一,對任意大的乙個tuple樹,它只需要恆定的20位元組就可以進行跟蹤。

這樣乙個map就可以確定乙個stream,以後每個tuple知道他的祖宗tuple-

8位元組                                    4位元組       8位元組      

id,所以它自然可以算出要通知哪個acker來ack。一般乙個spout對應乙個acker,來自同乙個祖宗tuple的tuple都放在同乙個acker進行處理。

acker跟蹤演算法的原理:acker對於每個sp out-tuple儲存乙個ack-val的校驗值,它的初始值是0,然後每發射乙個tuple或ack乙個tuple時,這個tuple的id就要跟這個校驗值異或一下,並且把得到的值更新為ack-val的新值。那麼假設每個發射出去的tuple都被ack了,那麼最後ack-val的值就一定是0。acker就根據ack-val是否為0來判斷是否完全處理,如果為0則認為已完全處理。

要實現ack機制:

1,spout發射tuple的時候指定messageid

2,spout要重寫baserichspout的fail和ack方法

3,spout對發射的tuple進行快取(否則spout的fail方法收到acker發來的messsageid,spout也無法獲取到傳送失敗的資料進行重發),看看系統提供的介面,只有msgid這個引數,這裡的設計不合理,其實在系統裡是有cache整個msg的,只給使用者乙個messageid,使用者如何取得原來的msg貌似需要自己cache,然後用這個msgid去查詢,太坑爹了

3,spout根據messageid對於ack的tuple則從快取佇列中刪除,對於fail的tuple可以選擇重發。

4,設定acker數至少大於0;config.setnumackers(conf, ackerparal);

storm的bolt有bsicbolt和richbolt:

在basicbolt中,basicoutputcollector在emit資料的時候,會自動和輸入的tuple相關聯,而在execute方法結束的時候那個輸入tuple會被自動ack。

使用richbolt需要在emit資料的時候,顯示指定該資料的源tuple要加上第二個引數anchor tuple,以保持tracker鏈路,即collector.emit(oldtuple, newtuple);並且需要在execute執行成功後呼叫outputcollector.ack(tuple), 當失敗處理時,執行outputcollector.fail(tuple);

由乙個tuple產生乙個新的tuple稱為:anchoring,你發射乙個tuple的同時也就完成了一次anchoring。

ack機制即,spout傳送的每一條訊息,在規定的時間內,spout收到acker的ack響應,即認為該tuple 被後續bolt成功處理;在規定的時間內(預設是30秒),沒有收到acker的ack響應tuple,就觸發fail動作,即認為該tuple處理失敗,timeout時間可以通過config.topology_message_timeout_secs來設定。或者收到acker傳送的fail響應tuple,也認為失敗,觸發fail動作

二.storm怎麼處理重複的tuple?

因為storm要保證tuple的可靠處理,當tuple處理失敗或者超時的時候,spout會fail並重新傳送該tuple,那麼就會有tuple重複計算的問題。這個問題是很難解決的,storm也沒有提供機制幫助你解決。一些可行的策略:

(1)不處理,這也算是種策略。因為實時計算通常並不要求很高的精確度,後續的批處理計算會更正實時計算的誤差。

(2)使用第三方集中儲存來過濾,比如利用mysql,memcached或者redis根據邏輯主鍵來去重。

(3)使用bloom filter做過濾,簡單高效。

問題一:你們有沒有想過如果某乙個task節點處理的tuple一直失敗,訊息一直重發會怎麼樣?

我們都知道,spout作為訊息的傳送源,在沒有收到該tuple來至左右bolt的返回資訊前,是不會刪除的,那麼如果訊息一直失敗,就會導致spout節點儲存的tuple資料越來越多,導致記憶體溢位。

問題二:有沒有想過,如果該tuple的眾多子tuple中,某乙個子tuple處理failed了,但是另外的子tuple仍然會繼續執行,如果子tuple都是執行資料儲存操作,那麼就算整個訊息失敗,那些生成的子tuple還是會成功執行而不會回滾的。

這個時候storm的原生api是無法支援這種事務性操作,我們可以使用storm提供的高階api-trident來做到(具體如何我不清楚,目前沒有研究它,但是我可以它內部一定是根據分布式協議比如兩階段提交協議等)。向這種業務中要保證事務性功能,我們完全可以根據我們自身的業務來做到,比如這裡的入庫操作,我們先記錄該訊息是否已經入庫的狀態,再入庫時查詢狀態來決定是否給予執行。

問題三:tuple的追蹤並不一定要是從spout結點到最後乙個bolt,只要是spout開始,可以在任意層次bolt停止追蹤做出應答。

acker task 元件來設定乙個topology裡面的acker的數量,預設值是一,如果你的topoogy裡面的tuple比較多的話,那麼請把acker的數量設定多一點,效率會更高一點。

調整可靠性 

acker task是非常輕量級的, 所以乙個topology裡面不需要很多acker。你可以通過strom ui(id: -1)來跟蹤它的效能。 如果它的吞吐量看起來不正常,那麼你就需要多加點acker了。

如果可靠性對你來說不是那麼重要— 你不太在意在一些失敗的情況下損失一些資料, 那麼你可以通過不跟蹤這些tuple樹來獲取更好的效能。不去跟蹤訊息的話會使得系統裡面的訊息數量減少一半, 因為對於每乙個tuple都要傳送乙個ack訊息。並且它需要更少的id來儲存下游的tuple, 減少頻寬占用。

有三種方法可以去掉可靠性。

第一是把config.topology_ackers 設定成 0. 在這種情況下, storm會在spout發射乙個tuple之後馬上呼叫spout的ack方法。也就是說這個tuple樹不會被跟蹤。

第二個方法是在tuple層面去掉可靠性。 你可以在發射tuple的時候不指定messageid來達到不跟粽某個特定的spout tuple的目的。

最後乙個方法是如果你對於乙個tuple樹裡面的某一部分到底成不成功不是很關心,那麼可以在發射這些tuple的時候unanchor它們。 這樣這些tuple就不在tuple樹裡面, 也就不會被跟蹤了。

三. 如何關閉ack機制:

1.設定acker個數為0

2.spout傳送資料時不帶上messageid

ack還用於限流作用: 為了避免spout傳送資料太快,而bolt處理太慢,常常設定pending數,當spout有等於或超過pending數的tuple沒有收到ack或fail響應時,跳過執行nexttuple, 從而限制spout傳送資料。

通過conf.put(config.topology_max_spout_pending, pending);設定spout pend數。 四.

理解storm的可靠性的最好的方法是來看看tuple和tuple樹的生命週期, 當乙個tuple被建立, 不管是spout還是bolt建立的, 它會被賦予乙個64位的id,而acker就是利用這個id去跟蹤所有的tuple的。每個tuple知道它的祖宗的id(從spout發出來的那個tuple的id), 每當你新發射乙個tuple, 它的祖宗id都會傳給這個新的tuple。所以當乙個tuple被ack的時候,它會發乙個訊息給acker,告訴它這個tuple樹發生了怎麼樣的變化。具體來說就是:它告訴acker: 我呢已經完成了, 我有這些兒子tuple, 你跟蹤一下他們吧。storm使用一致性雜湊來把乙個spout-tuple-id對應到acker, 因為每乙個tuple知道它所有的祖宗的tuple-id, 所以它自然可以算出要通知哪個acker來ack。

當乙個spout發射乙個新的tuple, 它會簡單的發乙個訊息給乙個合適的acker,並且告訴acker它自己的id(taskid), 這樣storm就有了taskid-tupleid的對應關係。 當acker發現乙個樹完成處理了, 它知道給哪個task傳送成功的訊息。

Storm的可靠性與ack機制

無論是實時處理還是離線處理,都會遇到乙個不可避免的問題是,失敗任務如何重做?storm提供了乙個ack機制。首先來看一下ispout介面的方法。public inte ce ispout extends serializable可以看到,提供了兩個方法ack 和fail 裡面的引數是乙個叫msgid...

storm中,ack與fail資訊的處理

在網上看到了很多有關於storm的資料,不過很多具體的問題還是沒有解答。本文的主要內容主要是進行storm中ack和fail訊息的處理。大家都知道,spout和bolt的實現需要繼承或者實現介面。對於spout來說,有兩個基類以供繼承 basebasicspout和baserichspout。兩個類...

storm中fieldsGroup的機制

說實話storm功能非常強大,但是參考資料是在是太少了,有些只能自己摸索,專案中用到了fieldsgroup,所以就研究一下。fieldsgroup的機制是把declar中暴露定義的fields中的字段進行hash,然後分到不同的bolt中,開始理解有誤,所以一直跑不通。囧 直接上 在第乙個spou...