Spark學習之路

2021-09-11 18:28:55 字數 3007 閱讀 1180

概念

spark可以分為1個driver和若干個executor,通過sparkcontext連線spark集群、建立rdd、累加器、廣播變數,簡單可以認為sparkcontext是spark程式的根本

driver會把計算任務分成一系列小的task,然後送到executor執行,executor之間可以通訊,在每個executor完成子的task後,所有的資訊會被傳回

spark中的所有處理和計算任務會被組織成一系列resilient distributed dataset(彈性分布式資料集)上的transformations和actions

rdd是乙個包含諸多元素、被劃分到不同節點上進行並行處理的資料集合。可以將rdd持久化到記憶體中,正陽可以有效地在並行操作中服用。在節點發生錯誤時rdd可以自動恢復

rdd 就像numpy array或者 pandas series,可以視作乙個有序的item集合。只不過這些item並不存在driver的記憶體中,而是被分割成很多個partitions,每個partition的資料存在於集群的executors的記憶體中

初始化

import pyspark

from pyspark import sparkcontext

from pyspark import sparkconf

sc = sparkcontext.getorcreate(conf)

#把1-6 劃分為不同的partition分發到不同的節點上

rdd = sc.parallelize([1,2,3,4,5,6])

檢視分割槽情況

rdd.getnumpartitions()

檢視分割槽狀況

rdd.glom().collect()

初始化rdd之檔案

import os

cwd = os.getcwd()

#spark一般預設你的路徑是指向dhfs的,如果要從本地讀取檔案的化,給乙個file://開頭的全域性路徑

rdd = sc.textfile("file://"+cwd+"/names/yob1880.txt")

rdd.first()

#spark會將每一行資料讀成乙個item

初始化rdd之資料夾

rdd = sc.wholetextfile('file://'+cwd+"/names")

#會將檔案生成(『檔案路徑』,'檔案內容')樣式的item

rdd transformation操作

rdd間操作

特點

spark 的乙個核心概念時惰性計算,當你把乙個rdd轉換成另乙個的時候,這個轉換不會直接執行。 spark會把他們先記在心裡,等到真的需要拿到轉換結果時,才會重新組織transformation。因此就有了action,即將transformation生效

action

快取

有時候我們需要重複用到某個transform序列得到的rdd結果,但是一遍遍重複計算顯然是有開銷的,所以我們可以通過乙個叫做cache()的操作把他暫時儲存在記憶體中

針對複雜結構的transformation 和 action

以pair rdd為例:

構建dataframe

首先構建sparksession

from pyspark.sql import sparksession
建立dataframe

df = spark.read.json('data/people.json')

df.show()

dataframe 操作

df.printschema()

df.select('name').show()

df.select(['name','age']).show()

df.select([df['name'],'df['age']+1).show()

df.filter(df['age']>2).show()

df.groupby('age').count().show()

spark sql

df.createorreplacetempview('people')

sqldf = spark.sql('select * from people')

sqldf.show()

反射推斷

from pyspark.sql import row

sc = spark.sparkcontext

lines = sc.textfile('data/prople.txt')

parts = lines.map(lambda l : l.split(','))

people = parts.map(lambda p :row(name=p[0],age=int(p[1])))

schemapeople = spark.createdataframe(people)

schemapeople.createorreplacetempview('people')

直接指定字段

Spark學習之路(二)

spark的邏輯處理流程主要分為四個部分 rdd是如何生成的?spark對程式中的每乙個資料進行操作,比如transformation操作 map 就會生成新的rdd,對於複雜的操作 join 則會生成多個rdd 新的rdd分割槽數量是如何得到的?使用者和parent rdd兩者共同決定新的rdd分...

Spark學習之路 官方文件 簡單

英文原文 中文文件 1.1 rdd programming guide 英文原文 中文文件 1.2 spark sql,dataframes and datasets guide 英文原文 中文文件 看完官方的技術文件實踐後,自己可以試著實現spark的三種執行方式 spark2.1.1中用各種模式...

開啟Hadoop和Spark的學習之路

hadoop是乙個由apache 會所開發的分布式系統基礎架構。使用者可以在不了解分布式底層細節的情況下,開發分布式程式。充分利用集群的威力進行高速運算和儲存。hadoop實現了乙個分布式檔案系統 hadoop distributed file system 簡稱hdfs。hdfs有高容錯性的特點,...