Scala 的並行集合

2022-04-11 10:45:17 字數 965 閱讀 7291

當出現kafka單個分割槽資料量很大,但每個分割槽的資料量很平均的情況時,我們往往採用下面兩種方案增加並行度:

l  增加kafka分割槽數量

l  對拉取過來的資料執行repartition

但是針對這種情況,前者的改動直接影響所有使用消費佇列的模型效能,後者則存在乙個shuffle的效能消耗。有沒有既不會發生shuffle,又能成倍提公升效能的方法呢?

/*

在上述場景中存在的情況是,單核資料量很大,但是又由於分割槽數量限制導致多核無法分配到資料。因此如果使用foreachpartition運算元,就可以獲取到每個分割槽的資料集,對這些資料集使用多執行緒並行執行。

*///具體**如下:

rdd.foreachpartition(datas=>

})//經本地測試,該方法有效。但沒有測試複雜的邏輯,如:多個遍歷運算元、kafka場景等

如果spark會優先為每個executor拉取資料,就可以通過設定executor num=kafka分割槽數,然後為每個executor設定多個cpu core的方式實現成倍的處理速度。

經實驗,spark在拉取kafka資料時,不管cpu核數多少,會優先為每個executor分配乙份kafka分割槽,只有當總executor數量以下是我使用10個節點,每個節點分配4個執行緒拉取乙個分割槽數量為10的kafka時,task的分布情況:

可以看到,資料被很好的分散到了十個節點上。並且在這個測試模型中,我使用了並行集合執行累加器操作。可以看到,並行集合並沒有造成資料丟失,而是正常的執行了計算邏輯。

可惜從少量的資料中看不出並行集合帶來的提公升。此外,關於該方案是否適用於複雜邏輯和持久穩定執行,還需要後續觀察。

Scala的集合框架

1.元組 定義方式 val tp nana 1,1.1 特點 集合中的資料可以是不同型別的 最多只能放22個元素 取值 通過角標取值,這裡的角標是從1開始的,元組名稱.角標 tp.1 nana 當出現陣列長度不相同時,報錯 對偶元組 val tp nana 1 兩個元素 拉鍊操作 zip 當出現陣列...

scala 集合型別

iterable 是序列 seq 集 set 對映 map 的特質 序列式有序的集合如陣列和列表 集合可以通過 方法確定對每個物件最多包含乙個 對映包含了鍵值對映關係的集合 列表快取 使用listbuffer代替list 另乙個理由是為了避免棧溢位的風險 陣列快取 arraybuffer需要先從可變...

Scala基礎學習 scala集合 005

定長陣列 println test val a new array string 5 a.length a 0 hello 賦值 a 1 取值 b 1 flink 可以修改值,並沒有修改指標 val c array 1,2,3,4,5,6 c.mkstring 轉換為字串 c.mkstring c....