PySpark DataFrame 新增自增 ID

2021-09-30 00:15:18 字數 2963 閱讀 7148

在用 spark 處理資料的時候,經常需要給全量資料增加一列自增 id 序

號,在存入資料庫的時候,自增 id 也常常是乙個很關鍵的要素。

在 dataframe 的 api 中沒有實現這一功能,所以只能通過其他方式實

現,或者轉成 rdd 再用 rdd 的 zipwithindex 運算元實現。

下面呢就介紹三種實現方式。

建立 dataframe 物件

from pyspark.sql import sparksession

spark = sparksession.builder.getorcreate()

df = spark.createdataframe(

[ ,,,

,,])

df.show()

輸出:

+---+------+

|age| name|

+---+------+

| 18| alice|

| 22| sitoi|

| 22|****ao|

| 7| tom|

| 17| de|

+---+------+

1**,monotonically_increasing_id()** 函式

使用自帶函式 monotonically_increasing_id() 建立,由於 spark 會有分割槽,所以生成的 id 保證單調增加且唯一,但不是連續的。

優點:對於沒有分割槽的檔案,處理速度快。

缺點:由於 spark 的分割槽,會導致,id 不是連續增加

from pyspark.sql.functions import monotonically_increasing_id             

tempdf_index=date.withcolumn("idd_1",monotonically_increasing_id())

tempdf_index.show()

輸出:

+---+------+-----------+

|age| name| id|

+---+------+-----------+

| 18| alice| 8589934592|

| 22| sitoi|17179869184|

| 22|****ao|25769803776|

| 7| tom|42949672960|

| 17| de|51539607552|

+---+------+-----------+

如果讀取本地的單個 csv 檔案 或 json 檔案,id 會是連續增加且唯一的。

2,方法二,使用視窗函式

利用視窗函式:設定視窗函式的分割槽以及排序,因為是全域性排序而不是分組排序,所有分割槽依據為空,排序規則沒有特殊要求也可以隨意填寫

優點:保證 id 連續增加且唯一。

缺點:執行速度滿,並且資料量過大會爆記憶體,需要排序,會改變原始資料順序

from pyspark.sql.functions import row_number

from pyspark.sql.window import window

spec = window.partitionby().orderby("age")

df = df.withcolumn("id", row_number().over(spec))

df.show()

輸出:

+---+------+---+

|age| name| id|

+---+------+---+

| 7| tom| 1|

| 17| de| 2|

| 18| alice| 3|

| 22| sitoi| 4|

| 22|****ao| 5|

+---+------+---+

3,rdd運算元的zipwithindex函式

轉成 rdd 再用 rdd 的 zipwithindex 運算元實現

優點:保證 id 連續 增加且唯一。

缺點:執行速度慢。

from pyspark.sql import sparksession

from pyspark.sql.functions import monotonically_increasing_id

from pyspark.sql.types import structfield, longtype

spark = sparksession.builder.getorcreate()

schema = df.schema.add(structfield("id", longtype()))

rdd = df.rdd.zipwithindex()

def flat(l):

for k in l:

if not isinstance(k, (list, tuple)):

yield k

else:

yield from flat(k)

rdd = rdd.map(lambda x: list(flat(x)))

df = spark.createdataframe(rdd, schema)

df.show()

輸出:

+---+------+---+

|age| name| id|

+---+------+---+

| 18| alice| 0|

| 22| sitoi| 1|

| 22|****ao| 2|

| 7| tom| 3|

| 17| de| 4|

+---+------+---+

自增 i與i 自減 i與i

a 前置自增 變數值先 1,再計算表示式的值 前自增,先增再用 a 後置自增 先計算表示式的值 變數值後 1 後自增,先用再增 a 後置自減 先計算表示式的值 變數值後 1 後自減,先用再減 a 前置自減 變數值先 1,再計算表示式的值 前自減,先減再用 當自增自減遇上邏輯與和邏輯或 因邏輯與一非即...

自增(i ) 自減(i )運算子的學習筆記

自增自減運算子語法 自增運算子 使運算元的值加1,其運算元必須為可變左值 可簡單地理解為變數 對於自增就是加1這一點,eric想大家都不會有什麼疑問。問題在於 可以置於運算元前面,也可以放在後面,如 i i i表示,i自增1後再參與其它運算 而i 則是i參與運算後,i的值再自增1。自減運算子 與之類...

python中沒有i 自增運算

在python中是沒有自增和自減的,因此在python中用 i i 1和 i i 1 實現效果即可。因為python的模型規定,數值物件是不可改變的。i i 1 相當於重新建立了乙個變數 i 而不是改變了 i 中的數值。舉個例子 def main i 1 j 1print id i print id...