在Hadoop上發布spark作業

2021-10-02 04:25:37 字數 3738 閱讀 9636

這次的例子是計算航空公司的平均延遲時間,並畫圖

直接上**:

import csv

import matplotlib.pyplot as plt

import matplotlib as mpl

mpl.use(

"tkagg"

)# use tkagg to show figures

from stringio import stringio

from datetime import datetime

from collections import namedtuple

from operator import add, itemgetter

from pyspark import sparkconf, sparkcontext

"flight delay analysis"

date_fmt =

"%y-%m-%d"

time_fmt =

"%h%m"

fields =

('date'

,'airline'

,'flightnum'

,'origin'

,'dest'

,'dep'

,'dep_delay'

,'arv'

,'arv_delay'

,'airtime'

,'distance'

)flight = namedtuple(

'flight'

, fields)

defparse

(row)

:# 將每一行解析為乙個元祖

row[0]

= datetime.strptime(row[0]

, date_fmt)

.date(

) row[5]

= datetime.strptime(row[5]

, time_fmt)

.time(

) row[6]

=float

(row[6]

) row[7]

= datetime.strptime(row[7]

, time_fmt)

.time(

) row[8]

=float

(row[8]

) row[9]

=float

(row[9]

) row[10]

=float

(row[10]

)return flight(

*row[:11

])defsplit

(line)

:# 將一行內容作為檔案給csv.reader

reader = csv.reader(stringio(line)

)return reader.

next()

defplot

(delays)

: airlines =

[d[0

]for d in delays]

minutes =

[d[1

]for d in delays]

index =

list

(xrange

(len

(airlines)))

fig, axe = plt.subplots(

) bars = axe.barh(index, minutes)

for idx, air,

minin

zip(index, airlines, minutes):if

min>0:

bars[idx]

.set_color(

'#d9230f'

) axe.annotate(

"%0.0f min"

%min

, xy=

(min+1

, idx+

0.5)

, va=

'center'

)else

: bars[idx]

.set_color(

'#469408'

) axe.annotate(

"%0.0f min"

%min

, xy=

(min+1

, idx+

0.5)

, va=

'center'

) ticks = plt.yticks(

[idx+

0.5for idx in index]

, airlines)

xt = plt.xticks()[

0]plt.xticks(xt,

[' ']*

len(xt)

) plt.grid(axis=

'x', color =

'white'

, linestyle=

'-')

plt.title(

"total minutes delayed per airline"

) plt.show(

)def

main

(sc)

:# 開啟查詢airline全程的檔案,作為乙個字典用於查詢

airlines =

dict

(sc.textfile(

"flight/airlines.csv").

map(split)

.collect())

# 將這個字典傳送給所有worker節點

airline_lookup = sc.broadcast(airlines)

# 開啟航班資訊

flights = sc.textfile(

"flight/flights.csv").

map(split)

.map

(parse)

# 計算延誤時間

delays = flights.

map(

lambda f:

(airline_lookup.value[f.airline]

,add(f.dep_delay, f.arv_delay)))

# 對同個航空公司的延誤時間相加

delays = delays.reducebykey(add)

.collect(

)# 根據延誤時間排序

delays =

sorted

(delays, key=itemgetter(1)

)for d in delays:

print

"%0.0f minutes delayed\t%s"

%(d[1]

, d[0]

)# 畫圖

plot(delays)

if __name__==

"__main__"

:# spark配置

conf = sparkconf(

).setmaster(

"local[*]"

) sc = sparkcontext(conf=conf)

main(sc)

就可以執行了

Spark灰度發布在十萬級節點上的實踐

持續整合是指,及時地將最新開發的且經過測試的 整合到主幹分支中。持續整合的優點 目前主流的 管理工具有,github gitlab等。本文所介紹的內容中,所有 均託管於私有的 gitlab 中。鑑於 jenkins 幾乎是 ci 事實上的標準,本文介紹的 spark ci cd u0026amp c...

Spark灰度發布在十萬級節點上的實踐

持續整合是指,及時地將最新開發的且經過測試的 整合到主幹分支中。持續整合的優點 目前主流的 管理工具有,github gitlab等。本文所介紹的內容中,所有 均託管於私有的 gitlab 中。鑑於 jenkins 幾乎是 ci 事實上的標準,本文介紹的 spark ci cd u0026amp c...

Spark灰度發布在十萬級節點上的實踐

持續整合是指,及時地將最新開發的且經過測試的 整合到主幹分支中。持續整合的優點 目前主流的 管理工具有,github gitlab等。本文所介紹的內容中,所有 均託管於私有的 gitlab 中。鑑於 jenkins 幾乎是 ci 事實上的標準,本文介紹的 spark ci cd u0026amp c...