Trident exactly once實現原理

2021-08-04 04:32:26 字數 3882 閱讀 2221

為了實現exactly-once,storm0.7.0開始支援transactional toplogy(事務topology),也是微批處理架構,但目前已經不再維護(基本沒有人用),功能完全被trident所替代。準確的說,trident topology是從transactional topology的基本上發展而來,包括spout和state都延用的transactional topology的思路,最大的改變就是抽象出了stream的概念。

trident中將tuple封裝成batch,每乙個batch提供乙個唯一的txid,資料的傳送、提交、重發都是基於txid,以batch為單位進行。

實現exactly-once的關鍵在於狀態的儲存,就trident而言,包括:

元資料的儲存,資料處理失敗時,能知道要重發是哪些資料

中間狀態的儲存,重發資料時,能敏銳發現資料的狀態是否已經更新過

trident只在用state來做中間狀態儲存的地方確保exactly once,而資料流並不一定要在所有的地方都需要用state,以wordcount為例,最終我們關心的只是單詞的統計結果,中間的read和split並不需要關心,也就不需要儲存state,因此,選擇state的儲存時機是重點。

transactional spouts,也叫事務spout,提供了強一致性,有如下三點保障:

乙個txid對應乙個batch,如果乙個batch被重發,txid不變

任意兩個batch中不會有tuple相同;

每個tuple都會被放到乙個batch中,不會有tuple被漏掉

事務spout的實現是假設訊息中介軟體是可靠的,但如果在重發乙個batch時,正好batch中tuple所以某個分割槽失效,則會導致spout一直卡住,因為為了保證batch完全一致,會一直去嘗試讀取去tuple。為了解決這個缺陷,所以有了opaque transactional spouts。

opaque transactional spouts,也叫不透明事務spout,提供了弱一致性,即每個tuple只在乙個batch中被成功處理,但不保證同乙個txid對應的batch完全一致。當重新去讀取乙個batch的tuple時,不會因為讀取不到某個tuple而卡住。

no-transactional spouts,非事務性spout,不保證對一致性

就以上三種spout,分別有三種state來做狀態儲存

transactional state,儲存txid和value

opaque transactional state,儲存txid、value、prevalue

non-transactional state,不保證exactly once

yes表示可以實現exactly-once的組合

trident spout實際是乙個簡單的topology結構,spout包含兩個內部介面:coordinator(協調者)和emitter(訊息傳送者),這兩組介面將由三個執行器來執行

masterbatchcoordinator(mbc):主協調者,coordinator的執行器之一,也是實際的spout,負責的流的管理,以及控制新事務的產生

tridentspoutcoordinator(tsc):另乙個協調者,也是coordinator的執行器之一,實際為bolt,主要負責元資料的封裝

tridentspoutexecutor(tse):訊息的傳送者,emitter的執行器,從資料來源裡根據tsc發來的元資料讀出實際資料,傳送出去

通過下圖可以更清楚看到三個執行器的關係:

乙個batch中tuple的數量並不能直接做到精確控制,主要受資料量的影響,也可以通過配置topology.max.spout.pending的值(預設是1),來增加併發。

可以增加併發,當然也可以限制batch產生的速度。在batch數小於topology.max.spout.pending的情況下,mbc至少會等待trident batch interval的時間(預設是500ms)才會產生乙個新的batch,關於這個引數,官方建議設定為正常的端到端處理時間的一半左右 —— 也就是說如果需要花費 600 ms 的時間處理乙個batch,那麼就可以每將此引數設定為300ms

trident 會按照txid的大小來順序更新 batch 的狀態,也就是說txid=3的batch必然在txid=2的batch之後進行更新。

主要分析一下transactional stateopaque transactional state的實現原理,都是通過txid來判斷資料是否已經處理過,不同之處在於,當txid為重發資料時,transactional state直接忽略此次value更新,而opaque transactional state是將上次處理的值與重發後的值進行combina後更新value,以wordcount為例,分別說明兩者的儲存過程。

假如正在處理的batch的txid=3,包含tuple為:

["man"]["man"]["dog"]
庫中已存入如下結果:

man => [value=3, txid=1]

dog => [value=4, txid=3]

庫中單詞「man」的txid是1,但當前的txid是3,所以可以確定當前batch中的「man」還沒有更新過,可以放心的給count加2並更新txid為3.

與此同時,庫中單詞「dog」的txid和當前的txid是相同的,表明當前batch中的」dog」已經更新過,因此要可以跳過這次更新。此次更新後,資料庫中的資料如下:

man => [value=5, txid=3]

dog => [value=4, txid=3]

以上是transactional state狀態更新過程,前提是每個batch重發時,所包含的tuple都是一致的,但如果這個batch在重發的過程中,讀取訊息中介軟體時,某些區分也失效,則batch可能並不完整transactional state不能保證exactly once,但opaque transactional spout可以。

使用opaque transactional state儲存時,庫中除了儲存value和txid以外,還會存prevalue(上次處理後的值),以更新單詞man為例,如設man在庫中已經儲存如下:

man => [value=5,prevalue=3, txid=3]
若新的batch的txid為4,單詞man出現3次,則正常更新,將value的值覆蓋到prevalue,同時value加上新增的值,更新後如下:

man => [value=8,prevalue=5, txid=4]
若新的batch的txid為3,單詞man出現3次,則表示此txid之前已經處理過,但不能確認單詞man出現的次數和上次是否一致,更新時,prevalue的值不變,value的值更新成prevalue + 3,更新後的結果為:

man => [value=6,prevalue=3, txid=3]

ConcurrentHashMap實現原理

concurrenthashmap實現原理 在jdk1.7中 concurrenthashmap是通過segment陣列 hashentry陣列 單鏈表的結構進行儲存資料。segment陣列中存放的是hashentry陣列的首位址,hashentry中存放的是乙個單鏈表 首節點位址 put 我們通過...

ConcurrentHashMap 實現原理

由於hashmap是乙個執行緒不安全的容器,主要體現在容量大於總量 負載因子發生擴容時會出現環形鍊錶從而導致死迴圈。因此需要支援執行緒安全的併發容器concurrenthashmap。如圖所示,是由segment陣列 hashentry陣列組成,和hashmap一樣,仍然是陣列加鍊表組成。concu...

iOS NSDictionary 字典 實現原理

1.nsdictionary 字典 是使用 hash表來實現key和value之間的對映和儲存的,hash函式設計的好壞影響著資料的查詢訪問效率。void setobject id anobject forkey id akey 2.objective c 中的字典 nsdictionary 底層其...