怎樣使用spark的pipe呼叫外部程式

2021-09-26 05:12:15 字數 2359 閱讀 8203

spark在rdd上提供pipe()方法。spark的pipe()方法可以讓我們使用任意一種語言實現spark作業中的部分邏輯,只要它能讀寫unix標準的流就行。通過pipe(),你可以將rdd中的各元素從標準輸入流中以字串形式讀出,並對這些元素執行任何你需要的操作,然後把結果以字串的形式寫入標準輸出------這個過程就是rdd的轉化操作過程。有了pipe()這個管道我們就可以通過這個管道與r、c++、python以及shell指令碼等程式進行互動,使其能夠進行更快的計算。

本部落格只介紹spark通過pipe()與python,shell進行互動,感興趣的小夥伴可以嘗試一下其他的。

1.首先寫乙個python程式,在本地執行保證沒問題。

import sys

for line in sys.stdin:

d=line.strip().split(',')

if len(d) !=2:

continue

label=d.pop(0)

hit_id=d.pop(0)

features=

print(features)

注意:要用stdin或者raw_input獲取輸入。

2.把python程式寫成指令碼

#!/usr/bin/python

import sys

for line in sys.stdin:

d=line.strip().split(',')

if len(d) !=2:

continue

label=d.pop(0)

hit_id=d.pop(0)

features=

print(features)

注意:指令碼編寫後要新增指令碼的執行許可權

3.編寫spark程式呼叫pipe()

scala> val rdddata=sc.textfile("hdfs://ip/tmp/wordcount.txt")

scala> val scriptpath="/tmp/test/test.py"

scriptpath: string = /tmp/test/test.py

scala> println(rdddata.pipe(scriptpath).collect().tolist)

執行結果如下:

list([('item.id,spark', 1), ('item.id,hive', 1)], [('item.id,hadoop', 1), ('item.id,spark', 1)], [('item.id,zookeeper', 1), ('item.id,kylin', 1)], [('item.id,kylin', 1), ('item.id,hue', 1)], [('item.id,spark', 1), ('item.id,hue', 1)], [('item.id,hadoop', 1), ('item.id,spark', 1)], [('item.id,spark', 1), ('item.id,redis', 1)], [('item.id,spark', 1), ('item.id,hbase', 1)], [('item.id,hive', 1), ('item.id,hbase', 1)])
1.編寫shell指令碼

#!/bin/sh

while read line; do

echo $line | awk ''

done

2.在spark程式中呼叫

scala> val rdddata=sc.textfile("hdfs://ip/tmp/wordcount.txt")

scala> val scriptpath="/tmp/test/test.sh"

scriptpath: string = /tmp/test/test.sh

scala> println(rdddata.pipe(scriptpath).collect().tolist)

list((item.id,spark,1)(item.id,hive,1)(item.id,hive.hadoop,1)(item.id,hadoop,1)(item.id,spark,1)(item.id,zookeeper,1)(item.id,kylin,1)(item.id,kylin,1)(item.id,hue,1), (item.id,spark,1)(item.id,hue,1)(item.id,hadoop,1)(item.id,spark,1)(item.id,spark,1)(item.id,redis,1)(item.id,spark,1)(item.id,hbase,1)(item.id,hive,1)(item.id,hbase,1))

例項 Linux管道pipe的使用

例項 linux管道pipe的使用 moakap總結 函式 include int pipe int filedes 2 描述 pipe 函式建立乙個管道和指向該管道的一對檔案描述符,並且將檔案描述符儲存到檔案描述符陣列filedes中。其中filedes 0 為讀端,filedes 1 為寫端。返...

Spark在Yarn上的效能調優

1 process local 程序本地化 推薦使用 和資料在同乙個 executor 程序中,資料在executor的blockmanager中,效能最好 2 node local 節點本地化 推薦使用 和資料在乙個節點中,主要分兩種情況 i 資料在節點上,task在節點上的executor中執行...

Spark的基本使用

啟動spark shell 開啟命令列或終端 pyspark import pyspark 匯入pyspark 檢視spark context資訊 讀入檔案 列印檔案內容 可利用collect 函式,它能夠以陣列的形式,返回rdd資料集的所有元素 lines spark.read.text file...