python對檔案進行平行計算初探

2022-06-12 09:21:09 字數 4007 閱讀 7597

最近工作中經常會有讀取乙個檔案,對資料做相關處理並寫入到另外乙個檔案的需求

當檔案行數較少的時候,單程序順序讀取是沒問題的,但是當檔案行數過萬,就需要消耗很客觀的時間。

一、一次性讀入,多程序處理

我最初想到的辦法是多程序,最初的辦法是一次性讀取所有行,然後分配給多個程序處理,最終還是寫入乙個檔案。其中需要借助queue來實現對異常的捕獲和處理,不具有可擴充套件性。

同時一次性讀取乙個檔案,寫入記憶體也受到記憶體大小的限制。而且這種多程序情況下返回值的處理也比較麻煩。

**見python併發——多程序中的異常捕獲

二、多次讀入,並行處理

考慮到linux有乙個按行分割檔案的功能split,可以借助她實現資料平行計算,思路是這樣的,通過計算檔案的總行數,將檔案分割成行數相等的多個小檔案,小檔案個數可以大於或等於併發度。

開啟多程序對每個小檔案分別處理,每個小檔案處理完都輸出到一一對應的目標小檔案,最終將目標小檔案進行合併。

**如下:

from multiprocessing import

pool

import

json

from time import

sleep

import

requests

import

ossrc_mid = '

_src_

'dst_mid = '

_dst_'#

業務邏輯處理

defget_jw(addr_name):

url = '

'result = requests.get(url.format(addr_name=addr_name))

result_str = str(result.content, encoding="

utf-8")

rj =json.loads(result_str)

if len(rj['

geocodes

']) >0:

jwd = rj['

geocodes

'][0]['

location']

print

(jwd)

return addr_name + '

,' + jwd + '\n'

else

:

print('

-,-'

)

return addr_name + '

,' + '

-,-' + '\n'

#輸入原始檔,返回分割後的源中間檔案list

class

parallelcompute(object):

def__init__(self, exe_func, source_file, target_file, concurrency=8):

self.exe_func =exe_func

self.source_file =source_file

self.target_file =target_file

self.concurrency =concurrency

self.abs_src_mid_dir =none

self.abs_dst_mid_dir =none

self.src_mid_file_list =none

self.dst_mid_file_list =none

#原始檔分割成多個小檔案

defsplit_file(self):

cur_path = os.path.abspath('.'

) self.abs_src_mid_dir =os.path.join(cur_path, src_mid)

self.abs_dst_mid_dir =os.path.join(cur_path, dst_mid)

os.mkdir(self.abs_src_mid_dir)

os.mkdir(self.abs_dst_mid_dir)

split_cmd = "

split /

".format(src_file=self.source_file,

abs_src_mid=self.abs_src_mid_dir)

print

(split_cmd)

os.system(split_cmd)

self.src_mid_file_list = [os.path.join(self.abs_src_mid_dir, it) for it in

os.listdir(self.abs_src_mid_dir)]

self.dst_mid_file_list = [src_file.replace(src_mid, dst_mid) for src_file in

self.src_mid_file_list]

#小檔案處理

deftranslate_file(self, src_file, dst_file):

with open(src_file, 'rb

') as f1, open(dst_file, 'a'

) as f2:

line =f1.readline().strip()

line = str(line, encoding='

utf8')

while

line:

try:

jw =self.exe_func(line)

f2.write(jw)

except

exception:

sleep(5)

offset = len(line.encode('

utf8

')) + 1f1.seek(-offset, 1)

line =f1.readline().strip()

line = str(line, encoding='

utf8')

#小檔案合併

defmerge_files(self):

with open(self.target_file, 'a

') as f2:

for dst_m_file in

self.dst_mid_file_list:

with open(dst_m_file, 'r

') as f1:

line =f1.readline()

while

line:

f2.write(line)

line =f1.readline()

#清理中間檔案

defdelete_mid_dir(self):

os.system(

'rm -rf %s

' %self.abs_src_mid_dir)

os.system(

'rm -rf %s

' %self.abs_dst_mid_dir)

defexecute(self):

p =pool(self.concurrency)

self.split_file()

for src_mid_file in

self.src_mid_file_list:

dst_mid_file =src_mid_file.replace(src_mid, dst_mid)

p.close()

p.join()

self.merge_files()

self.delete_mid_dir()

if__name__ == '

__main__':

source_file = '

/opt/test/qiuxue/target.txt

'target_file = '

/opt/test/qiuxue/result3.txt

'pc =parallelcompute(get_jw, source_file, target_file)

pc.execute()

這樣就做到了平行計算和業務邏輯的分離,簡化了呼叫者的使用難度

python對檔案進行平行計算初探 二)

上次的平行計算是通過將大檔案分割成小檔案,涉及到檔案分割,其實更有效的方法是在記憶體中對檔案進行分割,分別計算 最後將返回結果直接寫入目標檔案,省去了分割小檔案合併小檔案刪除小檔案的過程 如下 import json import math from multiprocessing import p...

python平行計算 python平行計算

0.基礎並行 發 multiprocessing threading 1.concurrent 2.併發 asynico 3.ipython下的平行計算 使用ipyparallel庫的ipython提供了前所未有的能力,將科學python的探索能力與幾乎即時訪問多個計算核心相結合。系統可以直觀地與本...

平行計算模型

平行計算模型通常指從並行演算法 的設計和分析出發,將各種並行計算機 至少某一類並行計算機 的基本特徵抽象出來,形成乙個抽象的計算模型。從更廣的意義上說,平行計算模型為平行計算提供了硬體和軟體介面 在該介面的約定下,並行系統硬體設計者和軟體設計 者可以開發對並行性 的支援機制,從而提高系統的效能。有幾...