原 Storm分布式RPC

2022-06-26 01:03:14 字數 4841 閱讀 3877

分布式 rpc(drpc)的設計目標是充分利用 storm 的計算能力實現高密度的並行實時計算。storm 接收若干個函式引數作為輸入流,然後通過 drpc 輸出這些函式呼叫的結果。嚴格來說,drpc 並不能算作是 storm 的乙個特性,因為它只是一種基於 storm 原語 (stream、spout、bolt、topology) 實現的計算模式。雖然可以將 drpc 從 storm 中打包出來作為乙個獨立的庫,但是與 storm 整合在一起顯然更有用。

分布式rpc是通過「drpc server」協調處理的(storm用乙個包來實現該功能)。drpc server 負責接收 rpc 請求,並將該請求傳送到 storm 中執行的 topology,等待接收 topology 傳送的處理結果,並將該結果返回給傳送請求的客戶端。因此,從客戶端的角度來說,dprc 與普通的 rpc 呼叫並沒有什麼區別。例如,以下是乙個使用引數 「 呼叫 「reach」 函式計算結果的例子:

drpcclient client = new drpcclient("drpc-host", 3772);

string result = client.execute("reach", "");

分布式rpc工作流示意圖如下所示:

客戶端通過向 drpc 伺服器傳送待執行函式的名稱以及該函式的引數來獲取處理結果。實現該函式的拓撲使用乙個drpcspout 從 drpc server中接收乙個函式呼叫流,drpc server會為每個函式呼叫都標記了乙個唯一的 id,隨後拓撲會執行函式來計算結果,並在拓撲的最後使用乙個名為 returnresults 的 bolt 連線到 drpc server,根據函式呼叫的 id 來將函式呼叫的結果返回。

storm中提供了名為lineardrpctopologybuilder 的topology builder,它幾乎自動完成了drpc的所有步驟,如下所示:

1.設定spout

2.向drpc server返回執行結果。

3.給bolts提供了聚集元組的功能。

讓我們一起看一下簡單的例子,該例子是drpc topology的乙個實現並返回結果為輸入附加字串「!」。

public static class exclaimbolt extends basebasicbolt 

public void declareoutputfields(outputfieldsdeclarer declarer) }

public static void main(string args) throws exception

正如你所見,當建立lineardrpctopologybuilder 時,你需要讓topology知道 drpc函式的名字。單個drpc server負責很多函式的協調處理,且這些函式的功能不同。你宣告的第乙個bolt將接受乙個2元組,第乙個域是請求id,第二個域是請求引數。lineardrpctopologybuilder 中最後乙個bolt會輸出形式為[id,result]的2元組輸出流。最後,所有中間結果的元組的第乙個域必須包括請求id。

在本例子中,exclaimbolt 只是簡單地給第二個域附加字串「!」。lineardrpctopologybuilder 繼續和drpc server通訊並將結果返回。

drpc可以以本地模式執行,下面以本地模式執行的例子:

localdrpc drpc = new localdrpc();

localcluster cluster = new localcluster();

cluster.submittopology("drpc-demo", conf, builder.createlocaltopology(drpc));

system.out.println("results for 'hello':" + drpc.execute("exclamation", "hello"));

cluster.shutdown();

drpc.shutdown();

首先你會建立乙個 localdprc 物件,該物件會在程序中模擬乙個 drpc 伺服器,就像localcluster 在程序中模擬 storm 集群的功能一樣。然後,建立localcluster以本地模式執行topology 。lineardrpctopologybuilder 有獨立的方法用於建立本地topologies 和遠端topologies 。在本地模式下,localdrpc 物件沒有繫結任何埠所以topology需要知道正在和它進行通訊的物件,這是方法createlocaltopology 接受localdrpc 物件作為輸入引數的原因。

在啟動拓撲後,你可以使用 execute 方法來完成 drpc 呼叫。

在乙個真實的集群中使用 drpc 有以下三個步驟:

1.啟動 drpc server;

2.配置 drpc server的位址;

3.將 drpc topologies 提交到集群執行。

可以像 nimbus、supervisor 那樣使用 storm 命令來啟動 drpc serve,如下:

bin/storm drpc

接下來,你需要在集群上配置 drpc server的位址。這是為了讓 drpcspout 獲取從**觸發函式呼叫的方法。可以通過編輯 storm.yaml 或者新增拓撲配置的方式實現配置。配置 storm.yaml 的方式類似於下面這樣:

drpc.servers:

- "drpc1.foo.com"

- "drpc2.foo.com"

最後,你可以像其他拓撲一樣使用 stormsubmitter 來啟動拓撲。以下是使用遠端模式構造拓撲的乙個例子:

stormsubmitter.submittopology("exclamation-drpc", conf, builder.createremotetopology());
createremotetopology 方法是用來建立集群模式下執行的topologies 。

上面描述的exclamation drpc是為了說明drpc的簡單例子。下面讓我們共同學習一下更複雜的例子-storm集群平行計算的drpc函式呼叫,該例子是計算twitter上url的訪問。

url訪問是指不同的人在twitter上發的推文,你需要完成如下計算:

1.獲取所有tweeted了該url的所有人。

2.獲取所有關注了1中的所有人。

3.2中所有人的set集合。

4.統計3中set集合的個數。

一次計算可能涉及上百次的資料庫呼叫和數以千萬計的關注記錄,這計算規模確實很大。正如你所見,實現storm函式是非常簡單的。在單台機器上,計算需要1分鐘;但在集群中,即使最難計算的url訪問也只需數秒。

乙個簡單的訪問topology 可以在storm-starter中找到。下面是定義訪問topology 的具體步驟:

lineardrpctopologybuilder builder = new lineardrpctopologybuilder("reach");

builder.addbolt(new gettweeters(), 3);

builder.addbolt(new getfollowers(), 12)

.shufflegrouping();builder.addbolt(new partialuniquer(), 6)

.fieldsgrouping(new fields("id", "follower"));

builder.addbolt(new countaggregator(), 2)

.fieldsgrouping(new fields("id"));

topology 的執行按照以下四步驟:

1.gettweeters 得到tweeted了url的使用者。它將[id,url]格式的輸入流轉換為[id,tweeter]格式的輸出流。每個url將對映為多個tweeter元組。

2.getfollowers得到tweeters的關注者。它將[id,tweeter]格式的輸入流轉換為[id,follower]格式的輸出流。這些任務中,可能有重複的元組,因為一些人可能關注了多個人都tweeted了同樣的url。

3.partialuniquer根據關注者id分組。這會導致同樣的關注者在同樣的任務中處理,所以每個partialuniquer 的任務將會接受多個互補的關注者集合。一旦partialuniquer 接受了所有的關注者元組,它將會輸出關注者子集合元素的個數。

4.最後,countaggregator 從partialuniquer 接受部分count值然後累加完成整個計算,並返回結果。

下面看partialuniquer bolt的**實現:

public class partialuniquer extends basebatchbolt 

@override

public void execute(tuple tuple)

@override

public void finishbatch()

@override

public void declareoutputfields(outputfieldsdeclarer declarer) }

lineardrpctopologybuilder 只處理線性drpc的topology,它的計算是乙個序列步驟。不難想象功能需求將需要更複雜的topology ,它可能涉及到bolts的分支與整合。

keyedfairbolt 同時處理多個請求。

如何直接使用coordinatedbolt。

storm分布式安裝

1.機器a和b成功安裝zookeeper.3.檢視配置檔案 storm.zookeeper.servers 111.222.333.444 555.666.777.888 4.開啟配置檔案conf storm.yaml,這裡是配置zookooper servers 11 這裡代表是機器a的主機名55...

Storm分布式集群搭建

1.主要配置專案 配置資料目錄hostname 連線leader埠 leader選舉埠 server.101 mad101 2801 2802 server.102 mad102 2801 2802 server.103 mad103 2801 2802 2.在datadir目錄新建myid檔案 在...

深入RPC分布式原理 python

分布式本質上不過是將多個單機服務組合在一起對外提供服務 1 客戶端 當 rpc 服務部署在多個節點上時,客戶端得到的是乙個服務列表,有多個 ip 埠對。客戶端的連線池可以隨機地挑選任意的 rpc 服務節點進行連線,每個服務節點應該有個權重值,當所有節點的權重值一樣時,它們的流量分配就是均勻的。如果某...