建立spark 讀取資料

2022-09-09 00:48:34 字數 3456 閱讀 2770

在2.0版本之前,使用spark必須先建立sparkconf和sparkcontext,

不過在spark2.0中只要建立乙個sparksession就夠了,sparkconf、sparkcontext和sqlcontext都已經被封裝在sparksession當中。

在與spark2.0互動之前必須先建立

spark

物件

val spark = sparksession

.builder()

.master(masterurl)

.config("spark.some.config.option", "some-value")

.getorcreate()

當建立好了sparksession,我們就可以配置spark執行相關屬性。比如下面**片段我們修改了已經存在的執行配置選項。

spark.conf.set("spark.sql.shuffle.partitions", 6)

spark.conf.set("spark.executor.memory", "2g")

絕大多數的屬性控制應用程式的內部設定,並且預設值都是比較合理的。下面對這些屬性進行說明:

該屬性沒有預設值,它的含義是你的應用程式的名字,這個名字設定之後將會在web ui上和日誌資料裡面顯示。如果這個屬性沒有設定的話,將會把你應用程式的

main

函式所在類的全名作為應用程式的名稱。在

yarn

環境下,還可以用

--name

或者來設定應用程式的名稱。為了能夠方便地檢視各個應用程式的含義,取乙個好的名字是很重要的。

該屬性沒有預設值。這是spark程式需要連線的集群管理器所在的

url位址。當前的

spark

支援三種集群方式

standalone

、apache mesos

以及yarn

模式。如果這個屬性在提交應用程式的時候沒設定,程式將會通過

system.getenv("master")

來獲取master

環境變數;但是如果

master

環境變數沒有設定,那麼程式將會把

master

的值設定為

local[*]

,之後程式將在本地啟動。

該屬性的預設值是512m。每個

executor

處理器可以使用的記憶體大小之和,跟

jvm的記憶體表示的字串格式是一樣的

(比如:

'512m'

,'2g')

。在早期版本的

spark

,是通過

-xmx

和-xms

來設定的。如果這個值沒有設定,那麼程式將會先獲取

spark_executor_memory

環境變數;如果還沒設定,那麼獲取

spark_mem

環境變數的值;如果這個值也沒設定,那麼這個值將會別設定為

512,

。幾乎所有執行時效能相關的內容都或多或少間接和記憶體大小相關。這個引數最終會被設定到executor的

jvm的

heap

尺寸上,對應的就是

xmx和

xms的值。

預設值是org.apache.spark.serializer.j**aserializer。用於序列化網路傳輸或者以序列化形式快取起來的各種物件的類。預設的

serializer

可以對所有的

j**a

物件進行序列化,但是它的速度十分慢!所以如果速度是影響程式執行的關鍵,你可以將該值設定為

org.apache.spark.serializer.kryoserializer

。在一些情況下,

kryoserializer

的效能可以達到

j**aserializer的10

倍以上,但是相對於

j**aserializer

而言,主要的問題是它不能支援所有的

j**a

物件。當然,使用者可以直接繼承

org.apache.spark.serializer

來實現自己的

serializer

。預設值為空。如果你使用了kryoserializer,就要為

kryo

設定這個類去註冊你自定義的類,該類必須繼承自

kryoregistrator

,實現其中的

registerclasses(kryo: kryo)

即可。預設值為/tmp。用於設定

spark

的快取目錄,包括了

輸出的檔案,快取到磁碟的

rdd資料。最好將這個屬性設定為訪問速度快的本地磁碟。同

hadoop

一樣,可以用逗號分割來設定多個不同磁碟的目錄。需要注意,在

spark 1.0

和之後的版本,這個屬性將會被

spark_local_dirs (standalone, mesos)

或者 local_dirs (yarn)

環境變數替代。

預設值是false。當

sparkcontext

啟動的時候,以

info

日誌級別記錄下有效的

sparkconf

。當建立好sparksession後,就可以讀取資料了

用乙個map來儲存讀取檔案的格式

val options = map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs:")

再讀取hdfs上的資料

val data all=spark.sqlcontext.read.options(options).format("com.databricks.spark.csv").load()

或者val data all=spark.sqlcontext.read.options(options).csv(filepath)

儲存資料在hdfs上

val s**eoption = map("header" -> "true", "delimiter" -> "\t", "path" -> path)

data.repartition(1).write.format("com.databricks.spark.csv").mode(s**emode.overwrite).options(s**eoption).s**e()

讀取的資料建立臨時**

data.createorreplacetempview("groupdata")

可用sparksession.sql對資料進行字段提取,處理

val result = spark.sql("select ip,sum(count) count from groupdata group by ip")

spark 讀取 hdfs 資料分割槽規則

下文以讀取 parquet 檔案 parquet hive table 為例 hive metastore 和 parquet 轉化的方式通過 spark.sql.hive.convertmetastoreparquet 控制,預設為 true。如果設定為 true 會使用 org.apache.s...

spark 讀取 hdfs 資料分割槽規則

下文以讀取 parquet 檔案 parquet hive table 為例 hive metastore 和 parquet 轉化的方式通過 spark.sql.hive.convertmetastoreparquet 控制,預設為 true。如果設定為 true 會使用 org.apache.s...

hbase資料加鹽讀取(spark篇)

未加鹽資料 spark可以使用inputformat outputformat來讀寫hbase表。加鹽以後 需要在rowkey之前加一些字首,否則是查不到資料的。1 我們需要重新寫getsplits方法 從名字我們可以知道是要計算有多少個splits。在hbase中,乙個region對用乙個spli...