python寫map,reduce訪問集群檔案

2021-07-30 03:15:28 字數 1651 閱讀 6679

python寫map,reduce訪問集群檔案

#!/usr/bin/env python

#coding:utf-8

import sys

for line in sys.stdin:  # 遍歷讀入資料的每一行

line = line.strip()  # 將行尾行首的空格去除

words = line.split()  #按空格將句子分割成單個單詞

for word in words:

print '%s\t%s' %(word, 1)

reducer.py

#!/usr/bin/env python

#coding:utf-8

from operator import itemgetter

import sys

current_word = none   # 為當前單詞

current_count = 0  # 當前單詞頻數

word = none

for line in sys.stdin:

words = line.strip()  # 去除字串首尾的空白字元

word, count = words.split('\t')  # 按照製表符分隔單詞和數量

try:

count = int(count)  # 將字串型別的『1』轉換為整型1

except valueerror:

continue

if current_word == word:  # 如果當前的單詞等於讀入的單詞

current_count += count  # 單詞頻數加1

else:

if current_word:  # 如果當前的單詞不為空則列印其單詞和頻數

print '%s\t%s' %(current_word, current_count)  

current_count = count  # 否則將讀入的單詞賦值給當前單詞,且更新頻數

current_word = word

if current_word == word:

print '%s\t%s' %(current_word, current_count)

修改這兩個檔案的許可權:

chmod +x reducer.py

上傳乙個文字檔案到集群上,我是傳到了/user/zkf/test-pythoninhadoop/input目錄下

設定streaming檔案中的jar包:

cd hadoop目錄下,找到hadoop-straming*.jar檔案,寫入環境變數,具體命令如下:

cd /esr/local/hadoop-1.2.1

find ./ -name "*streaming*"     ###找到hadoop-streaming*.jar位置

vi ~/.bashrc    ###開啟環境變數進行配置

export stream=/usr/local/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar

執行這一檔案,命令如下:

執行成功會在集群的/user/zkf/test-pythoninhadoop/output下得到結果,檢視可得到wordcount結果

Python基礎 高階函式 Map Reduce

map 函式接受2個引數 1.引數 函式 2.引數 iterable 示例 usr bin env python3 coding utf 8 map 函式的使用 轉換為字串 deffuntion x return str x deftest mlist 1,2,3,4,5 list 1,2,3,4,...

廖雪峰Python教程之mapreduce

1.map 函式 map 函式接收兩個引數,乙個是函式,乙個是iterable,map將傳入的函式依次作用到序列的每個元素,並把結果作為新的iterator返回。def f x return x x r map f,1,2,3,4,5 list r out 1,4,9,16,25 2.reduce ...

python學習進度11(map reduce)

python內建了map 和reduce 函式。如果你讀過google的那篇大名鼎鼎的 mapreduce simplified data processing on large clusters 你就能大概明白map reduce的概念。我們先看map。map 函式接收兩個引數,乙個是函式,乙個是...