JStorm之Topology提交客戶端

2021-06-28 06:08:01 字數 2098 閱讀 9655

乙個topology包含一或多個spout bolt,spout負責在資料來源獲得資料並傳送給bolt,每個bolt負責做完處理後發給下乙個bolt。通常topology的建立是由topologybuilder來建立的,該元件會記錄包含哪些spout bolt,並做相應驗證:各元件是否有id衝突,校驗方法如下:

private void validateunusedid(string id) 

if (_spouts.containskey(id))

if (_statespouts.containskey(id))

}

topologybuilder會儲存各個元件到相應的資料結構中,資料結構如下:

public class topologybuilder
元件配置資訊存放方法如下

private void initcommon(string id, icomponent component, number parallelism) 

map conf = component.getcomponentconfiguration();

if (conf != null)

common.set_json_conf(utils.to_json(conf));

_commons.put(id, common);

}

資訊儲存好後,在topology階段builder會根據這些資訊建立乙個stormtopology例項,然後由stormsubmitter.submittopology進行提交,該階段分兩步:1、上傳jar檔案 2、提交作業

public static void submittopology(string name, map stormconf,

stormtopology topology, submitoptions opts)

throws alreadyaliveexception, invalidtopologyexception

stormconf = new hashmap(stormconf);

stormconf.putall(utils.readcommandlineopts());

map conf = utils.readstormconfig();

conf.putall(stormconf);

putuserinfo(conf, stormconf);

try else

//上傳jar檔案,下面會詳細解釋這個方法

submitjar(conf);

try else

} finally

} log.info("finished submitting topology: " + name);

} catch (invalidtopologyexception e)

}

jar檔案上傳包含兩部分,jar檔案本身和其依賴的庫檔案都會被傳到服務端,預設上傳buf大小為512k,可以通過nimbus.thrift.max_buffer_size來調整buf大小,服務端儲存的目錄結構如下:

[hongmin.lhm@rt2l02045 ~]$tree /home/hongmin.lhm/jstorm_data/nimbus/inbox/

/home/hongmin.lhm/jstorm_data/nimbus/inbox/

`-- 7c1b7d1e-9134-4ed8-b664-836271b49bd3

`-- stormjar-7c1b7d1e-9134-4ed8-b664-836271b49bd3.jar

private static void submitjar(map conf) 

if (localjar != null)

submittedjar = submitjar(conf, localjar,

uploadlocation, client);

} else

} catch (exception e) finally

} else

}

JStorm之Topology提交服務端

topology提交前會先判斷集群中是否存在同名作業,如果存在在提交失敗,如果沒有則會增加集群提交次數submittedcount,每次提交成功,該變數都會加1,然後會為該作業分配乙個id,生成規則如下 public static string topologynametoid string top...

JStorm之Supervisor啟動流程

4 分配新的任務 該元件主要包含 心跳執行緒 supervisor事件接受執行緒 處理執行緒,一旦事件接受到則會進入任務分配環節,主要邏輯 如下 public static void main string args public void run catch exception e while s...

Jstorm排程規則

任務排程演算法以worker為維度 排程過程中正在進行的排程動作不會對已發生的排程動作產生影響 排程過程中使用者可以自定義 usedefined assignment,和使用已有的old assignment,這兩者的優先順序是 usedefined assignment old assignmen...