python利用多核CPU實現mapreduce

2021-06-28 07:25:17 字數 3188 閱讀 8847

1. 最近公司用有個比較奇怪的需求,需要在流水中查詢某一條符合條件的流水記錄,記錄是在hdfs上的,按天存在檔案中,但是檔案都比較大,每天大概是25g的流水資料,現在提供刷卡回執單去查詢該消費記錄在我們hdfs上的對應的記錄,從而可以找到某個資訊(不能說是哪個。。。。)

2. 刷卡回執單我們可以找到卡號前6位、後四位,消費的時間,消費的金額,最初我是用管道來一行行排除的,就是cat *** | grep *** | grep *** | grep ***,但是這樣大概需要10幾分鐘才能出結果

3. 顯然不合理,比較好的做法是直接寫個mapreduce(只涉及到map,reduce直接輸出,這個我寫了,先貼**):

map.py

#!/usr/bin/env python

# vim: set fileencoding=utf-8

import sys

import os

def main():

card_start = os.environ.get('card_start')

card_last = os.environ.get('card_last')

trans_at = float(os.environ.get('trans_at'))

for line in sys.stdin:

detail = line.strip().split(',')

card = detail[0]

money = float(detail[17])

if trans_at == money and card_start == card[1 : 7] and card_last == card[-4 : ]:

print '%s\t%s' % (line.strip(), detail[1])

if __name__ == '__main__':

main()

reduce.py

#!/usr/bin/env python

# vim: set fileencoding=utf-8

import sys

import os

def main():

for line in sys.stdin:

detail = line.strip().split('\t')

print '%s\t%s' % (detail[0], detail[1])

if __name__ == '__main__':

main()

4. 其實這個需求並不麻煩,但是用線上的集群取跑難免會小題大做,然後想到了用python操作cpu多核去模擬mapreduce來做(其實這邊也只涉及到map,如果要統計一些資訊的話就需要reduce了)

5. 直接貼**:

#!/usr/bin/env python

# vim: set fileencoding=utf-8

'''created on jan 15, 2015

@author: qianjc

'''import sys

import multiprocessing

import time

class trmapreduce:

def __init__(self, map_fun, reduce_fun, num_workers=none):

'''map_fun: map函式,返回格式如: [(a, 1), (b, 1)]

reduce_fun: reduce函式, 返回格式如: (c, 10)

num_workers: 使用的多程序個數, 如果不指定就是預設cpu的核數

'''self.map_fun = map_fun

self.reduce_fun = reduce_fun

self.card_start = card_start

self.card_last = card_last

self.trans_at = trans_at

self.pool = multiprocessing.pool(num_workers)

def __call__(self, inputs, chunksize=1):

return self.pool.map(self.map_fun, inputs, chunksize=chunksize)

card_start = '@666666'

card_last = '1111'

trans_at = 0.0

# 用來過濾流水

'''根據卡號前6位、後四位以及消費金額尋找匹配的流水記錄

'''# print line.strip()

detail = line.strip().split(',')

card = detail[0]

money = float(detail[17])

global card_start

global card_last

global trans_at

output =

if trans_at == money and card_start == card[0 : 7] and card_last == card[-4 : ]:

print line.strip()

def reducer():

# 目前用不到reduce

pass

def main(argv):

start = time.time()

global card_start

global card_last

global trans_at

card_start = '@' + str(argv[1])

card_last = str(argv[2])

trans_at = float(argv[3]) * 100

res = trmapreduce(sys.stdin, 10000)

end = time.time()

print 'cost time: ', end - start

if __name__ == '__main__':

main(sys.argv)

主要參考:

實際測試了的確是多核在工作,但是沒有測試可以節約多少時間,我覺得應該時間上跟grep差不多,反正就當學了個思想吧!!!

利用CPU多核處理

在mysql5.5.x後,可以利用innodb read io threads和innodb write io threads,取代之前的innodb file io threads引數,在linux平台上就可以根據cpu核數來更改相應的引數值,預設是4.比如cpu是2棵8核的,可以設定 innod...

多核CPU利用測試

一直在想程式上是否特意讓執行緒在指定的cpu上去執行,這樣可以提高執行效率,所以特地寫個 讓cpu使用率畫正弦曲線的實驗,我使用的是amd x4 641的cpu,為四核四執行緒的 如下 include stdafx.h include include include include using na...

用 taskset 充分利用多核cpu

常常感覺系統資源不夠用,一台機子上跑了不下3個比較重要的服務,但是每天我們還要在上面進行個備份壓縮等處理,網路長時間傳輸,這在就很影響本就不夠用的系統資源 這個時候我們就可以把一些不太重要的比如copy 備份 同步等工作限定在一顆cpu上,或者是多核的cpu的一顆核心上進行處理,雖然這不一定是最有效...