storm學習三 drpc學習

2021-06-22 21:41:17 字數 3400 閱讀 1205

1. drpc介紹

storm是乙個分布式實時處理框架,它支援以drpc方式呼叫.可以理解為storm是乙個集群,drpc提供了集群中處理功能的訪問介面.

其實即使不通過drpc,而是通過在topoloye中的spout中建立乙個tcp/http監聽來接收資料,在最後乙個bolt中將資料傳送到指定位置也是可以的。這是後話,後面再進行介紹。而dprc則是storm提供的一套開發組建,使用drpc可以極大的簡化這一過程。

通過配置drpc伺服器,將storm的topology發布為drpc服務。客戶端程式可以呼叫drpc服務將資料傳送到storm集群中,並接收處理結果的反饋。這種方式需要drpc伺服器進行**,其中drpc伺服器底層通過thrift實現。適合的業務場景主要是實時計算。並且擴充套件性良好,可以增加每個節點的工作worker數量來動態擴充套件。

埠可以不用配置,預設是:3772

nimbus節點的配置:

storm.zookeeper.servers:

- "10.10.249.195"

- "10.10.249.196"

## nimbus.host: "nimbus"

## locations of the drpc servers

drpc.servers:

- "10.10.249.197"

supervisor節點的配置:

########### these must be filled in for astorm configuration

storm.zookeeper.servers:

- "10.10.249.195"

- "10.10.249.196"

#nimbus.host: "10.10.249.195"

### locations of the drpc servers

drpc.servers:

- "10.10.249.197"

#    - "server2"

supervisor.slots.ports:

-6700

-6701

-6702

2.drpc的使用

drpc包括服務端和客戶端兩部分。引用官方的一張來進行說明:

1)服務端

服務端由四部分組成:包括乙個drpc server, 乙個 dprc spout,乙個topology和乙個returnresult。

在實際使用中,主要有三個步驟:

a.啟動storm中的drpc server;

首先,修改storm/conf/storm.yaml中的drpc server位址(上面已經給出例子);

需要注意的是:必須修改所有nimbus和supervisor上的配置檔案,設定drpc server位址。否則在執行過程中可能無法返回結果。

然後,通過

./storm drpc  

命令啟動drpc server。

b.建立乙個drpc 的topology,提交到storm中執行。

該toplogy和普通的topology稍有不同,可以通過兩種方式建立:

建立方法一:直接使用 storm 提供的lineardrpctopologybuilder。 (不過該方法在0.82版本中顯示為已過期,不建議使用)

public static class exclaimbolt extends basebasicbolt 

public void declareoutputfields(outputfieldsdeclarer declarer)

}public static void main(string args) throws exception )

cluster.shutdown();

drpc.shutdown();

} else

}

建立方法二:

直接使用 storm 提供的通用topologybuilder。 不過需要自己手動加上開始的drpcspout和結束的returnresults。

topologybuilder builder = new topologybuilder(); 

//開始的spout

drpcspout drpcspout = new drpcspout("exclamation");

builder.setspout("drpc-input", drpcspout,5);

//真正處理的bolt

builder.setbolt("cpp", new cppbolt(), 5)

.nonegrouping("drpc-input");

//結束的returnresults

builder.setbolt("return", new returnresults(),5)

.nonegrouping("cpp");

config conf = new config();

conf.setdebug(false);

conf.setmaxtaskparallelism(3);

trycatch (exception e)

c.通過drpcclient對cluster進行訪問需要修改客戶端配置檔案 ~/.storm/storm.yaml,配置drpc server的位址。修改方法可storm服務端一樣。

訪問**就很簡單了:

drpcclient client = new drpcclient("10.100.211.232", 3772);

string result = client.execute("exclamation","test");

注意如果是本地模式,topology的提交和drpc的訪問都有不同。

localdrpc drpc = new localdrpc();

localcluster cluster = new localcluster();

cluster.submittopology("drpc-demo", conf,builder.createlocaltopology(drpc));

// 訪問

for (string word : new string )

cluster.shutdown();

drpc.shutdown();

Storm集群的DRPC模式

storm的drpc模式的作用是實現從遠端呼叫storm集群的計算資源,而不需要連線到集群的某乙個節點。ok。那麼storm實現drpc主要是使用lineardrpctopologybuilder這個類。下面就先來看看乙個簡單的例子,它的原始碼的github上。1 2 3 4 5 6 7 8 9 1...

Storm集群的DRPC模式

storm的drpc模式的作用是實現從遠端呼叫storm集群的計算資源,而不需要連線到集群的某乙個節點。ok。那麼storm實現drpc主要是使用lineardrpctopologybuilder這個類。下面就先來看看乙個簡單的例子,它的原始碼的github上。import backtype.sto...

storm學習筆記

1 基礎概念 元組 訊息傳遞的基本單元,支援所有的基本型別 字串和位元組陣列作為字段值。流 流由元組組成,spout是流的源頭從外部資料來源讀取元組並emit到拓撲中,bolt接收任何數量的輸入流執行處理後可能提交新的流。spout spout是拓撲的流的 是乙個拓撲中產生源資料流的元件。spout...