hadoop hadoop的一次讀取

2021-09-01 17:10:37 字數 2948 閱讀 9329

一次hadoop的read

getfilesystem

**

public static filesystem getfilesystem() throws exception

configuration

configuration基本就是乙個空物件。新增了2個配置檔案到資源列表。

adddefaultresource("core-default.xml");

adddefaultresource("core-site.xml");

第一次通過configuration獲取param時才觸發資源載入解析。

檔案系統的cache

static class cache 

if (fs != null)

fs = createfilesystem(uri, conf);

synchronized (this)

// now insert the new file system into the map

if (map.isempty() && !clientfinalizer.isalive())

fs.key = key;

map.put(key, fs);

return fs;

}}

由uri uri, configuration conf作為key,對filesystem做了快取。

初始化檔案系統

private static filesystem createfilesystem(uri uri, configuration conf

) throws ioexception

filesystem fs = (filesystem)reflectionutils.newinstance(clazz, conf);

fs.initialize(uri, conf);

return fs;

}

由config中的fs.hdfs.impl得到檔案系統的實現類。這裡就是org.apache.hadoop.hdfs.distributedfilesystem。初始化distributedfilesystem,這樣distributedfilesystem就可以和namenode通訊了。

read file content

**

/**

* linux cat file.

* */

public static void readfile(string path) throws exception finally

system.out.println("--------------------------------------");

}

解析path

檔案的path為

hdfs:

解析後為

scheme hdfs

authority 192.168.81.130:9001

path /user/allen/input4wordcount/test_text_01.txt

開啟fsdatainputstream

聯絡namenode取到block資訊,注意這裡是乙個範圍查詢。查詢結果快取起來。

locatedblocks newinfo = callgetblocklocations(namenode, src, 0, prefetchsize);

prefetchsize = 671088640

在cache中查詢block

public int findblock(long offset) 

};return collections.binarysearch(blocks, key, comp);

}

注意這裡的comparator有乙個特殊處理。為了fake key可以和待查詢的locatedblock相等。

如果cache不命中則重新查詢namenode

int targetblockidx = locatedblocks.findblock(offset);

if (targetblockidx < 0)

更新原有cache

public void insertrange(int blockidx, listnewblocks)  else if(newoff == oldoff) 

insstart = insend = newidx+1;

oldidx++;

} else

}insend = newblocks.size();

if(insstart < insend)

}

選擇datanode

/**

* pick the best node from which to stream the data.

* entries in nodes are already in the priority order

*/private datanodeinfo bestnode(datanodeinfo nodes,

abstractmapdeadnodes)

throws ioexception }}

throw new ioexception("no live nodes contain current block");

}

當和datanode建立連線時,如果出錯。則3秒後(程式hard code)聯絡namenode重新獲取datanode的資訊。當重試超過一定次數時,則報錯。

建立連線,讀取內容

注意這裡有乙個簡單的檔案協議。

Hadoop hadoop的二次排序的思想

eg.輸入 輸出 關鍵點自定義,結合資料型別 作為map 函式輸出key 自定義分割槽partition 按照第乙個字段進行分割槽 自定義分組grouping 按照第乙個子彈進行分組 文字整理 將需要排序的字段封裝成乙個物件作為key,使用自定義資料型別可實現 通過mapreduce自帶shuffl...

只有一次 只有一次

二十多歲的年紀我累了很久,也在努力的尋找機會向陽而生。十三歲時迫於生活父母把剛上初一的我獨自留在老家,被父母伺候慣的我沒有一點生活常識,不懂得如何把衣服洗淨,不懂得如何生火做飯,不懂得如何花錢,不懂得如何戰勝黑暗和孤獨,每次乙個人回家時都會出現幻聽和幻覺,看著家裡好像有炊煙,聽誰都像媽媽的聲音.好不...

第一次出現一次的字元

題目 在字串中找出第乙個出現一次的字元。如輸入 abaccdeff 則輸出 b 分析 最直觀的解法從頭掃瞄這個字串中的每乙個字元。當訪問到某個字元的時候拿這個字元和後面的字元相比較,如果在後面沒有發現重複的字元,那該字元就是只出現一次的字元。如果字串有n個字元,每乙個字元可能與後面的o n 個字元比...