海量資料處理 一

2021-06-13 03:25:01 字數 4368 閱讀 1550

想法:1.hash對映:順序讀取10個檔案,按照hash(ip)%10的結果將資料寫入到另外10個檔案中。

2. hash統計:依次對小檔案用hash_map(ip, ip_count)來統計每個ip出現的次數。

3.堆/快速/歸併排序:利用快速/堆/歸併排序按照出現次數進行排序,將排序好的ip和對應的ip_cout輸出到檔案中,這樣得到了10個排好序的檔案。最後,對這10個檔案進行歸併排序(內排序與外排序相結合)

實踐:0. 模擬海量資料分布式儲存

0.1生成海量資料

import random

from time import ctime

# 生成海量資料

def generaterandom(rangefrom, rangeto):

return random.randint(rangefrom,rangeto)

def generagemassiveipaddr(filelocation,numberoflines):

ip =

file_handler = open(filelocation, 'a+')

for i in range(numberoflines):

file_handler.writelines(ip)

file_handler.close()

if __name__ == '__main__':

print(ctime())

for i in range(10):

print(' ' + str(i) + ": " + ctime())

generagemassiveipaddr('e:\\massiveip.txt', 1000000)

print(ctime())

0.2 將海量資料分成10個小檔案

from time import ctime

import os

#將海量資料拆分成小的檔案

def splitfile(filelocation, targetfoler):

file_handler = open(filelocation, 'r')

block_size = 1006633 # 14.4m

line = file_handler.readline()

temp =

countfile = 1

while line:

for i in range(block_size):

if i == (block_size-1):

# write block to small files

file_writer = open(targetfoler +"\\file_"+str(countfile)+".txt", 'a+')

file_writer.writelines(temp)

file_writer.close()

temp =

print(" file " + str(countfile) + " generated at: " + str(ctime()))

countfile = countfile + 1

else:

line=file_handler.readline()

file_handler.close()

if __name__ == '__main__':

print("start at: " + str(ctime()))

os.makedirs('e:\\massivedata')

splitfile("e:\\massiveip.txt", "e:\\massivedata")

1. 對10個小檔案進行hash對映,使得相同的ip分在同乙個小檔案中

from time import ctime

import os

datadir = "e:\\massivedata"

tempdir = "e:\\temp"

def hashfiles():

fs =

if not os.path.exists(tempdir):

os.makedirs(tempdir)#建立緩衝區

for f in range(0,10):

for parent, dirnames, filenames in os.walk(datadir):#遍歷datadir

for filename in filenames:

f = open(os.path.join(parent, filename),'r')

for ip in f:

fs[hash(ip)%10].write(ip)

f.close()

for f in fs:

f.close()

if __name__ == '__main__':

print("start at: " + str(ctime()))

hashfiles()

print("end at: " + str(ctime()))

2. 對10個小檔案中的ip數進行統計,重複最多的ip放在前面,包括ip和次數

from time import ctime

import os

import operator

tempdir = "e:\\temp"

def sortipinfile():

'''對每個小檔案中的資料進行統計排序'''

fs =

if not os.path.exists(tempdir):

return

for f in range(0,10):

for f in fs:

d = {}

for ip in f:

if ip in d:

d[ip] += 1

else:

d[ip] = 1

sorted_d = sorted(d.items(), key=operator.itemgetter(1), reverse=true)

f.seek(0,0)

f.truncate()#清空小檔案內容

for item in sorted_d:#將排好序的內容寫入小檔案

f.write(str(item[1]) + "\t" + item[0])

f.close()

if __name__ == '__main__':

print("start at: " + str(ctime()))

sortipinfile()

print("end at: " + str(ctime()))

3. 堆排序

from time import ctime

import os

import heapq

tempdir = "e:\\temp"

destfile = "e:\\sorted.txt"

def decorated_file(f):

""" yields an easily sortable tuple.

"""# 迭代函式,避免將資料一次讀入記憶體

for line in f:

count, ip = line.split('\t',2)

yield (-int(count), ip)

def mergefiles():

fs =

if not os.path.exists(tempdir):

return

for f in range(0,10):

#已排序檔案tmp_i,txt列表

f_dest = open(destfile,"w")#存放最終排好序的結果

lines_written = 0

#呼叫堆排序演算法 merge(*iterables)

for line in heapq.merge(*[decorated_file(f) for f in fs]):

f_dest.write(line[1])

lines_written += 1

return lines_written

if __name__ == '__main__':

print("start at: " + str(ctime()))

print("sorting completed, total queries: ", mergefiles())

print("end at: " + str(ctime()))

海量資料處理 一

題目一 搜尋引擎會通過日誌檔案把使用者每次檢索使用的所有檢索串都記錄下來,每個查詢串的長度為1 255位元組。假設目前有一千萬個記錄 這些查詢串的重複度比較高,雖然總數是1千萬,但如果除去重複後,不超過3百萬個。乙個查詢串的重複度越高,說明查詢它的使用者越多,也就是越熱門。請你統計最熱門的10個查詢...

海量資料處理

1 有一千萬條簡訊,有重複,以文字檔案的形式儲存,一行一條,有 重複。請用5分鐘時間,找出重複出現最多的前10條。方法1 可以用雜湊表的方法對1千萬條分成若干組進行邊掃瞄邊建雜湊表。第一次掃瞄,取首位元組,尾位元組,中間隨便兩位元組作為hash code,插入到hash table中。並記錄其位址和...

海量資料處理

給定a b兩個檔案,各存放50億個url,每個url各占用64位元組,記憶體限制是4g,如何找出a b檔案共同的url?答案 可以估計每個檔案的大小為5g 64 300g,遠大於4g。所以不可能將其完全載入到記憶體中處理。考慮採取分而治之的方法。遍歷檔案a,對每個url求取hash url 1000...