在 Oracle 資料庫中實現 MapReduce

2021-07-02 01:43:07 字數 4790 閱讀 9388

在程式設計師開發並行程式時,map-reduce模式正變得流行起來。這些map-reduce程式通常來並行處理大量資料。本文來演示如何在oracle資料庫上,通過使用parallel pipelined table函式及並行操作,來實現map-reduce程式。(譯者注:table()是oracle中乙個函式,可以把定義為pipelined的function的返回結果進行sql查詢)

原理:pipelined table函式是在oracle 9i引入的,作為能在資料流中嵌入過程邏輯**方法。從邏輯上說,乙個table函式是可以出現在from子句中,該函式就像資料表一樣的返回多行資料。table函式同樣也可以接收多行資料做為輸入引數。大多數情況下,pipelined table函式可以嵌入到乙個資料流中,它讓資料「流」進sql語句中,從而避免增加乙個物理層(直譯:具體化的中介)。再次說明,pipelined table函式是可以並行處理的。

在多個發行版中,pipelined table函式已經被使用者使用,並成為oracle可擴充套件基礎功能的乙個核心部分。無論是外部使用者,還是oracle的開發部門,table函式成為乙個有效的、簡單的擴充資料庫核心功能的方法。

類似table函式的功能已經在oracle內使用,並且是oracle spatial 和oracle warehouse builder許多特色功能的實現方式。oracle spatial(空間資料處理系統)使用它涉及spatial joins 和許多 spatial data的資料探勘的操作。oracle warehouse builder讓讓使用者使用table 函式對資料流進行並行處理的邏輯,比如match-merge 演算法和其它逐行計算的演算法。

手把手的例子

所有的例子都在omr.sql檔案中。

為了說明並行的使用方法以及用pipelined table函式在oracle資料庫內寫乙個map-reduce演算法, 我們實現乙個最經典的map-reduce例子–單詞計數。單詞計數是實現返回一組文件中所有不重複單詞出現的個數的程式,也可以說是查詢單詞出現頻率功能。

示例**是用pl/sql實現,但如前所說,oracle允許你選擇其它語言來實現這個過程邏輯。

1、配置環境

我們將在一組文件中查詢,這些文件可以是資料庫之外的檔案中,也可以儲存在secure files/clob的資料庫內的列中。在我們這個存文件的表也相當於乙個檔案系統。

在本例中,我們將在資料庫內建立乙個表,用下面的宣告:

create

table documents (a clob)

lob(a) store as securefile(tablespace sysaux);

該錶的每一行都對應乙個文件,我們在用下面的語句,這個表中插入三個簡單的文件:

insert

into documents values ('abc def');

insert

into documents values ('def ghi');

insert

into documents values ('ghi jkl');

commit;

map**和reduce**都將包含在乙個包中,保持**的整潔。為了展示這些步驟,我將把這些**段從包中拿出來,在下面各小節展示。在實際的包中,還必須要定義幾個types。所有**均在oracle database 11g (11.1.0.6)測試通過。
create table documents (a clob)

lob(a) store as securefile(tablespace sysaux);

insert into documents values ('abc def');

insert into documents values ('def ghi');

insert into documents values ('ghi jkl');

commit;

create or replace

package oracle_map_reduce is

type word_t is record (word varchar2(4000));

type words_t is table of word_t;

type word_cur_t is ref cursor return word_t;

type wordcnt_t is record (word varchar2(4000), count number);

type wordcnts_t is table of wordcnt_t;

pipelined parallel_enable (partition doc by any);

function reducer(in_cur in word_cur_t) return wordcnts_t

pipelined parallel_enable (partition in_cur by hash(word))

cluster in_cur by (word);

end;

/create or replace

package body oracle_map_reduce is

---- and emits individual words

--pipelined parallel_enable (partition doc by any)

isdocument clob;

istart number;

pos number;

len number;

word_rec word_t;

begin

-- for every document

loop

fetch doc into document;

exit

when doc%notfound;

istart := 1;

len := length(document);

-- for every word within a document

while (istart <= len) loop

pos := instr(document, sep, istart);

if (pos = 0) then

word_rec.word := substr(document, istart);

pipe row (word_rec);

istart := len + 1;

else

word_rec.word := substr(document, istart, pos - istart);

pipe row (word_rec);

istart := pos + 1;

endif; end

loop; -- end

loop

for a single document

endloop; -- end

loop

for all documents

return;

---- the reducer emits words and the number of times they're seen

--function reducer(in_cur in word_cur_t) return wordcnts_t

pipelined parallel_enable (partition in_cur by hash(word))

cluster in_cur by (word)

isword_count wordcnt_t;

next varchar2(4000);

begin

word_count.count := 0;

loop

fetch in_cur into

next;

exit

when in_cur%notfound;

if (word_count.word is null) then

word_count.word := next;

word_count.count := word_count.count + 1;

elsif (next

<> word_count.word) then

pipe row (word_count);

word_count.word := next;

word_count.count := 1;

else

word_count.count := word_count.count + 1;

endif; end

loop;

if word_count.count <> 0

then

pipe row (word_count);

endif; return;

end reducer;

end;

/-- select statements

select word, count(*)

from (

select value(map_result).word word

group

by (word);

select *

from table(oracle_map_reduce.reducer(

cursor(select value(map_result).word word

cursor(select a from documents), ' ')) map_result)));

**:

在C 中操縱Oracle資料庫

oracle資料庫相關操作 定義使用者 sql connect sys oracle as sysdba sql create user sa identified by sa sql grant dba to sa sql connect sa sa 定義表 sql create table te...

oracle中資料庫自增列實現

oracle可以自定義序列,但是如果想實現多表序列管理,可以用乙個很簡單的表去圍護 邏輯很簡單,新建表如下 兩個字段,分別記錄專案表名和表中對應的最大序列號 然後在新增序列時圍護這個表,就實現了專案中表序列的圍護了。create table create table table seq table ...

ORACLE在新服務中建立資料庫

在database configuration assitant建立乙個新的服務之後,往往需要在新的服務裡面新增新的資料庫,如何通過sqlplus在新的服務裡面新增新的資料庫呢,答案如下 上面中的system是系統使用者名稱,需要從這裡進入oracle新建立的服務。注意是 再次,是在database...