PySpark withColumn更新或新增列

2021-10-19 07:56:02 字數 4596 閱讀 4461

原文:

pysparkwithcolumn()是dataframe的轉換函式,用於更改或更新值,轉換現有dataframe列的資料型別,新增/建立新列以及多核。在本文中,我將使用withcolumn()示例向您介紹常用的pyspark dataframe列操作。

首先,讓我們建立乙個要使用的dataframe。

data =[(

'james',''

,'smith'

,'1991-04-01'

,'m'

,3000),

('michael'

,'rose',''

,'2000-05-19'

,'m'

,4000),

('robert',''

,'williams'

,'1978-09-05'

,'m'

,4000),

('maria'

,'anne'

,'jones'

,'1967-12-01'

,'f'

,4000),

('jen'

,'mary'

,'brown'

,'1980-02-17'

,'f',-

1)]columns =

["firstname"

,"middlename"

,"lastname"

,"dob"

,"gender"

,"salary"

]df = spark.createdataframe(data=data, schema = columns)

通過在dataframewithcolumn()上使用pyspark,我們可以強制轉換或更改列的資料型別。為了更改資料型別,您還需要將cast()函式與withcolumn()一起使用。下面的語句將「工資」列的資料型別從string更改integer為。

df2 = df.withcolumn(

"salary"

,col(

"salary"

).cast(

"integer"))

df2.printschema(

)

dataframe的pysparkwithcolumn()函式也可以用於更改現有列的值。為了更改值,將現有的列名作為第乙個引數傳遞,並將要分配的值作為第二個引數傳遞給withcolumn()函式。請注意,第二個引數應為columntype。

df3 = df.withcolumn(

"salary"

,col(

"salary")*

100)

df3.printschema(

)

此**段將「 salary」的值乘以100,並將其值更新回「 salary」列。

要新增/建立新列,請使用您希望新列成為的名稱指定第乙個引數,並通過對現有列執行操作來使用第二個引數來分配值。

df4 = df.withcolumn(

"copiedcolumn"

,col(

"salary")*

-1)df3.printschema(

)

此**段通過將「工資」列乘以值-1來建立新列「 copiedcolumn」。

為了建立新列,請將所需的列名傳遞給withcolumn()轉換函式的第乙個引數。確保此新列尚未出現在dataframe上(如果顯示的話)會更新該列的值。

在下面的**片段中,使用lit()函式將常量值新增到dataframe列。我們還可以鏈結以新增多個列。

df5 = df.withcolumn(

"country"

, lit(

"usa"))

df5.printschema(

)df6 = df.withcolumn(

"country"

, lit(

"usa"

)) \

.withcolumn(

"anothercolumn"

,lit(

"anothervalue"))

df6.printschema(

)

儘管您不能使用withcolumn重新命名列,但我還是想介紹一下,因為重新命名是我們在dataframe上執行的常見操作之一。要重新命名現有列,請使用withcolumnrenamed()dataframe上的函式。

df.withcolumnrenamed(

"gender"

,"***"

) \ .show(truncate=

false

)

使用「放置」功能從dataframe放置特定的列。

df4.drop(

"copiedcolumn"

) \.show(truncate=

false

)

**注意:**請注意,所有這些函式在應用函式後都將返回新的dataframe,而不是更新dataframe。

import pyspark

from pyspark.sql import sparksession

from pyspark.sql.functions import col, lit

from pyspark.sql.types import structtype, structfield, stringtype,integertype

'sparkbyexamples.com'

).getorcreate(

)data =[(

'james',''

,'smith'

,'1991-04-01'

,'m'

,3000),

('michael'

,'rose',''

,'2000-05-19'

,'m'

,4000),

('robert',''

,'williams'

,'1978-09-05'

,'m'

,4000),

('maria'

,'anne'

,'jones'

,'1967-12-01'

,'f'

,4000),

('jen'

,'mary'

,'brown'

,'1980-02-17'

,'f',-

1)]columns =

["firstname"

,"middlename"

,"lastname"

,"dob"

,"gender"

,"salary"

]df = spark.createdataframe(data=data, schema = columns)

df.printschema(

)df.show(truncate=

false

)df2 = df.withcolumn(

"salary"

,col(

"salary"

).cast(

"integer"))

df2.printschema(

)df2.show(truncate=

false

)df3 = df.withcolumn(

"salary"

,col(

"salary")*

100)

df3.printschema(

)df3.show(truncate=

false

) df4 = df.withcolumn(

"copiedcolumn"

,col(

"salary")*

-1)df4.printschema(

)df5 = df.withcolumn(

"country"

, lit(

"usa"))

df5.printschema(

)df6 = df.withcolumn(

"country"

, lit(

"usa"

)) \

.withcolumn(

"anothercolumn"

,lit(

"anothervalue"))

df6.printschema(

)df.withcolumnrenamed(

"gender"

,"***"

) \ .show(truncate=

false

)

df4.drop(

"copiedcolumn"

) \.show(truncate=

false

)

學習愉快!

mysql插入或更新

現有user表,userid為使用者id,做為資料表user的主鍵 由於userid不可以重複,而這裡userid直接作為主鍵。為防止併發操作,插入語句可以這樣設計 不存在userid則插入,否則更新 insert into user userid,nickname,role,createtime,...

KaliLinux新增更新源

路徑 etc apt sources.list 新增以下更新源 官方源 deb kali rolling main non free contrib deb src kali rolling main non free contrib 中科大kali源 deb kali rolling main n...

Kali更新源新增

1.使用vim命令開啟更新源sources.list檔案,命令如下 vim etc apt sources.list 2.按鍵盤a進行對更新源的輸入 3.常見的更新源如下 3.1中科大 deb kali rolling main non free contrib deb src kali rolli...