1. 最近公司用有個比較奇怪的需求,需要在流水中查詢某一條符合條件的流水記錄,記錄是在hdfs上的,按天存在檔案中,但是檔案都比較大,每天大概是25g的流水資料,現在提供刷卡回執單去查詢該消費記錄在我們hdfs上的對應的記錄,從而可以找到某個資訊(不能說是哪個。。。。)
2. 刷卡回執單我們可以找到卡號前6位、後四位,消費的時間,消費的金額,最初我是用管道來一行行排除的,就是cat *** | grep *** | grep *** | grep ***,但是這樣大概需要10幾分鐘才能出結果
3. 顯然不合理,比較好的做法是直接寫個mapreduce(只涉及到map,reduce直接輸出,這個我寫了,先貼**):
map.py
#!/usr/bin/env python
# vim: set fileencoding=utf-8
import sys
import os
def main():
card_start = os.environ.get('card_start')
card_last = os.environ.get('card_last')
trans_at = float(os.environ.get('trans_at'))
for line in sys.stdin:
detail = line.strip().split(',')
card = detail[0]
money = float(detail[17])
if trans_at == money and card_start == card[1 : 7] and card_last == card[-4 : ]:
print '%s\t%s' % (line.strip(), detail[1])
if __name__ == '__main__':
main()
reduce.py
#!/usr/bin/env python
# vim: set fileencoding=utf-8
import sys
import os
def main():
for line in sys.stdin:
detail = line.strip().split('\t')
print '%s\t%s' % (detail[0], detail[1])
if __name__ == '__main__':
main()
4. 其實這個需求並不麻煩,但是用線上的集群取跑難免會小題大做,然後想到了用python操作cpu多核去模擬mapreduce來做(其實這邊也只涉及到map,如果要統計一些資訊的話就需要reduce了)
5. 直接貼**:
#!/usr/bin/env python
# vim: set fileencoding=utf-8
'''created on jan 15, 2015
@author: qianjc
'''import sys
import multiprocessing
import time
class trmapreduce:
def __init__(self, map_fun, reduce_fun, num_workers=none):
'''map_fun: map函式,返回格式如: [(a, 1), (b, 1)]
reduce_fun: reduce函式, 返回格式如: (c, 10)
num_workers: 使用的多程序個數, 如果不指定就是預設cpu的核數
'''self.map_fun = map_fun
self.reduce_fun = reduce_fun
self.card_start = card_start
self.card_last = card_last
self.trans_at = trans_at
self.pool = multiprocessing.pool(num_workers)
def __call__(self, inputs, chunksize=1):
return self.pool.map(self.map_fun, inputs, chunksize=chunksize)
card_start = '@666666'
card_last = '1111'
trans_at = 0.0
# 用來過濾流水
'''根據卡號前6位、後四位以及消費金額尋找匹配的流水記錄
'''# print line.strip()
detail = line.strip().split(',')
card = detail[0]
money = float(detail[17])
global card_start
global card_last
global trans_at
output =
if trans_at == money and card_start == card[0 : 7] and card_last == card[-4 : ]:
print line.strip()
def reducer():
# 目前用不到reduce
pass
def main(argv):
start = time.time()
global card_start
global card_last
global trans_at
card_start = '@' + str(argv[1])
card_last = str(argv[2])
trans_at = float(argv[3]) * 100
res = trmapreduce(sys.stdin, 10000)
end = time.time()
print 'cost time: ', end - start
if __name__ == '__main__':
main(sys.argv)
主要參考:
實際測試了的確是多核在工作,但是沒有測試可以節約多少時間,我覺得應該時間上跟grep差不多,反正就當學了個思想吧!!!
利用CPU多核處理
在mysql5.5.x後,可以利用innodb read io threads和innodb write io threads,取代之前的innodb file io threads引數,在linux平台上就可以根據cpu核數來更改相應的引數值,預設是4.比如cpu是2棵8核的,可以設定 innod...
多核CPU利用測試
一直在想程式上是否特意讓執行緒在指定的cpu上去執行,這樣可以提高執行效率,所以特地寫個 讓cpu使用率畫正弦曲線的實驗,我使用的是amd x4 641的cpu,為四核四執行緒的 如下 include stdafx.h include include include include using na...
用 taskset 充分利用多核cpu
常常感覺系統資源不夠用,一台機子上跑了不下3個比較重要的服務,但是每天我們還要在上面進行個備份壓縮等處理,網路長時間傳輸,這在就很影響本就不夠用的系統資源 這個時候我們就可以把一些不太重要的比如copy 備份 同步等工作限定在一顆cpu上,或者是多核的cpu的一顆核心上進行處理,雖然這不一定是最有效...