向Spark的DataFrame增加一列資料

2021-09-22 08:26:15 字數 4783 閱讀 5231

先說個題外話,如何給hive表增加乙個列,並且該把該列的所有欄位設為』china』?

如果僅僅是增加一列倒是很簡單:

alter table test add columns(flag string)
可要把這個flag欄位全部設定為china,看起來的確是有點難度,因為往hive表中裝載資料的唯一途徑就是使用一種「大量」的資料裝載操作(如何往hive表載入資料請參考),這個時候,如果資料集中本來就沒有flag對應的資料,難道非要手動把china新增上去?這種情況,可以通過靜態分割槽就能夠解決:

load data local inpath '/data/test.txt' overwrite into table test partition (flag = 'china')
有人說,這不扯淡嗎?如果這個china欄位,並不是我們經常需要訪問的字段,何須作為分割槽欄位呢?的確是這樣的,這個時候還可以通過下面的方式來解決這個問題:

insert into table test1  select id, name,'china' as flag from test;
好了步入正題:如何向spark的dataframe增加一列資料

準備資料集:

張三,23

李四,24

王五,25

趙六,26

程式入口sparksession和載入資料**這裡不再描述:

val spark = sparksession

.builder()

.master(master = "local[*]")

.getorcreate()

import spark.implicits._

val df = spark.read.textfile("./data/clm")

.map(_.split(","))

.map(x => (x(0), x(1)))

.todf("name", "age")

.cache()

通過新增列或替換具有相同名稱的現有列來返回新的資料集

column的表示式只能引用此資料集提供的屬性。 新增引用其他資料集的列是錯誤的

新的列只能通過現有列轉換得到,這個就有點侷限,不過也能解決一部分問題:

比如,我想再增加一列為所有age增加1作為新的一列:

df.withcolumn("new_age", col = df("age") + 1).show()
結果:

+----+---+-------+

|name|age|new_age|

+----+---+-------+

|張三| 23| 24.0|

|李四| 24| 25.0|

|王五| 25| 26.0|

|趙六| 26| 27.0|

+----+---+-------+

那麼如果我想像前言中做那樣的操作怎麼辦?

lit函式的作用:creates a [[column]] of literal value. 建立[[column]]的字面量值

df.withcolumn("class",lit("一班")).show()
結果:

+----+---+-----+

|name|age|class|

+----+---+-----+

|張三| 23| 一班|

|李四| 24| 一班|

|王五| 25| 一班|

|趙六| 26| 一班|

+----+---+-----+

df.createtempview(viewname = "view1")

import spark.sql

sql(sqltext = "select name,age,'一班' as class from view1").show()

結果:

+----+---+-----+

|name|age|class|

+----+---+-----+

|張三| 23| 一班|

|李四| 24| 一班|

|王五| 25| 一班|

|趙六| 26| 一班|

+----+---+-----+

sql(sqltext = "select name,age,concat('','一班') as class from view1").show()
結果:

+----+---+-----+

|name|age|class|

+----+---+-----+

|張三| 23| 一班|

|李四| 24| 一班|

|王五| 25| 一班|

|趙六| 26| 一班|

+----+---+-----+

該函式官網的描述是:乙個列表示式,用於生成單調遞增的64位整數。但是請注意:這個自增列在分區內是連續的,但是分區間並不連續

先來個簡單的使用案例:

import org.apache.spark.sql.functions._

df.withcolumn("id", monotonically_increasing_id()).show()

結果:

+----+---+---+

|name|age| id|

+----+---+---+

|張三| 23| 0|

|李四| 24| 1|

|王五| 25| 2|

|趙六| 26| 3|

+----+---+---+

但是,monotonically_increasing_id() 方法生成單調遞增僅僅是針對同乙個分割槽,儘管不同分割槽之間生成的id都是不同的,可不同分區間id不連續,也會造成使用上面的困難,下面進行詳細講解

df.repartition(2)

.withcolumn("id", monotonically_increasing_id())

.show()

結果:

+----+---+----------+

|name|age| id|

+----+---+----------+

|李四| 24| 0|

|趙六| 26| 1|

|張三| 23|8589934592|

|王五| 25|8589934593|

+----+---+----------+

顯然,可以看出李四和趙六為同一分割槽,張三和王五為另乙個分割槽,這兩個分區間id雖然不同,但是並不連續

val tmprdd: rdd[(row, long)] = df.rdd.repartition(2).zipwithindex()

val record: rdd[row] = tmprdd.map(x => )

val schema = new structtype().add("name", "string")

.add("age", "string")

.add("id", "long")

spark.createdataframe(record, schema).show()

結果:

+----+---+---+

|name|age| id|

+----+---+---+

|張三| 23| 0|

|王五| 25| 1|

|李四| 24| 2|

|趙六| 26| 3|

+----+---+---+

val w = window.orderby("age")

df.repartition(2).withcolumn("id", row_number().over(w)).show()

結果:

+----+---+---+

|name|age| id|

+----+---+---+

|張三| 23| 1|

|李四| 24| 2|

|王五| 25| 3|

|趙六| 26| 4|

+----+---+---+

df.repartition(1)

.withcolumn("id", monotonically_increasing_id())

.repartition(2)

.show()

結果:

+----+---+---+

|name|age| id|

+----+---+---+

|張三| 23| 0|

|李四| 24| 1|

|王五| 25| 2|

|趙六| 26| 3|

+----+---+---+

pandas的資料結構之DataFrame

dataframe是乙個 型的資料結構,它含有一組有序的列,每列可以是不同資料型別的資料。dataframe既有行索引也有列索引,可以將它看作為乙個由series組成的字典 共用同乙個索引 dataframe中的資料是以乙個或多個二維塊儲存的,而不是列表 字典或別的一維資料結構。a 通過字典建立,字...

pandas中的資料結構 DataFrame

型的資料結構 修改某一行 frame.values 0 d 2 frame name1 pay2 x d 2 y b 6000 z c 9000 修改某一行的值 frame.values 1 1 9000 frame name1 pay2 x d 2 y b 9000 z c 9000 獲取某行資料...

關於Spark和Spark的學習資料

hadoop社群依然發展迅速,2014年推出了2.3,2.4,2.5 的社群版本,比如增強 resource manager ha,yarn rest api,acl on hdfs,改進 hdfs 的 web ui hadoop roadmap 根據我的觀察,主要更新在yarn,hdfs,而map...