《深入理解Spark》之join和資料傾斜問題

2021-08-15 09:18:15 字數 3617 閱讀 1101

package com.lyzx.day34

import org.apache.spark._

class t2

/*** 對於join之前進行co-partition和不進行co-partition的效率測試

* 實測join之前進行co-partition操作的效率高於直接join

* 注意:資料量小的時候情況不一定

* @param sc

*/def f2(sc:sparkcontext): unit =

// println()

// for(v <- itr) yield v

// }).collect()

//// println("index2="+index)

// while(itr.hasnext)

// for(v <- itr) yield v

// }).collect()

val start1 = system.currenttimemillis()

//直接做join並記錄時間

println("start1="+start1)

val joinrdd = pairrdd1.join(pairrdd2)

println("pairrdd1,index:"+index)

while(itr.hasnext)

for(v <- itr) yield v

})// .foreach(x=>print())

println("joinrdd.partitions.length:"+joinrdd.partitions.length)

val end1 = system.currenttimemillis()

println("耗時="+(end1-start1)) //2563

val start2 = system.currenttimemillis()

val group1 = pairrdd1.groupbykey()

val group2 = pairrdd2.groupbykey()

// println("#############################")

// 檢視資料,除錯使用

println("group1-index1="+index)

while(itr.hasnext)

println()

for(v <- itr) yield v

}).collect()

println("group2-index1="+index)

while(itr.hasnext)

println()

for(v <- itr) yield v

}).collect()

println("start2="+start2)

group1.join(group2)

println("pairrdd2,index:"+index)

while(itr.hasnext)

for(v <- itr) yield v

}).foreach(x=>print())

val end2 = system.currenttimemillis()

println("耗時:"+(end2-start2)) //2000

} /**

* partitionby的使用

* @param sc

*/def f3(sc:sparkcontext): unit =

println()

for(v <- itr) yield v

}).foreach(x=>print("-"+x))

println("###################################")

rdd1.partitionby(new org.apache.spark.hashpartitioner(rdd1.partitions.length))

print("index:"+index)

while(itr.hasnext)

println()

for(v <- itr) yield v

}).foreach(x=>print("-"+x))

} /**

* 關於spark的並行度的問題

* 修改並行度的方式

* 方法1:直接在**中以local[*]的形式

* 方法2:conf.set("spark.default.parallelism","4")

* 方法3:val rdd = sc.parallelize(10 to 200,numpartitions)

* 優先順序: 方法3>方法2>方法1

* @param sc

*/def f4(sc:sparkcontext): unit =

/*** 關於filter後資料傾斜的問題

* 如果在使用filter運算元後發現資料傾斜問題

* 那麼可以通過將rdd[k]=>rdd[k,v]形式的rdd在使用預設的 hashpartitioner進行分割槽後

* 在使用map運算元轉換回來

** 如果rdd[t] 其中t是自定義物件可以使用自定義分割槽器進行分割槽

* 注意:網上有人說使用filter+coalesce運算元可以解決問題

* 經過我實際測試發現並不能完全解決資料傾斜問題

** @param sc

*/def f5(sc:sparkcontext): unit =

println()

for(v <- itr) yield v

})rdd

.filter(_>10)

.map(x=>(x,1))

.partitionby(new hashpartitioner(3))

.map(x=>x._1)

println("index2:"+index+" ")

while(itr.hasnext)

println()

for(v <- itr) yield v

}).collect()

} /**

* 使用自定義的分割槽器對資料進行分割槽

* @param sc

*/def f6(sc:sparkcontext): unit =

// for(v <- itr) yield v

// }).collect()

prdd

.filter(x=>x.age > 15)

.map(x=>(x,1))

.partitionby(new no_1_partitioner(3))

.map(x=>x._1)

while(itr.hasnext)

for(v <- itr) yield v

}).collect()

}}object t2

}case class person(name:string,var age:int,id:int)

}class no_1_partitioner(num:int) extends partitioner

}

深入理解SQL表連線(join)

關聯式資料庫中最重要的兩個概念,當屬表連線和聚合。表連線將一條資料分開成多條,表聚合將多條合成一條。這一分一合,形成了關聯式資料庫強大的邏輯表達能力,這篇文章講表連線,關於聚合請移步 深入理解sql分組聚合 內連線 外連線 左外連線 右外連線 全連線 交叉連線 自然連線 這麼多種連線方式,你是不是已...

深入理解C語言 深入理解指標

關於指標,其是c語言的重點,c語言學的好壞,其實就是指標學的好壞。其實指標並不複雜,學習指標,要正確的理解指標。指標也是一種變數,占有記憶體空間,用來儲存記憶體位址 指標就是告訴編譯器,開闢4個位元組的儲存空間 32位系統 無論是幾級指標都是一樣的 p操作記憶體 在指標宣告時,號表示所宣告的變數為指...

mysql 索引深入理解 深入理解MySql的索引

為什麼索引能提高查詢速度 先從 mysql的基本儲存結構說起 mysql的基本儲存結構是頁 記錄都存在頁裡邊 各個資料頁可以組成乙個雙向鍊錶每個資料頁中的記錄又可以組成乙個單向鍊錶 每個資料頁都會為儲存在它裡邊兒的記錄生成乙個頁目錄,在通過主鍵查詢某條記錄的時候可以在頁目錄中使用二分法快速定位到對應...