JStorm之Topology提交服務端

2021-06-28 06:15:29 字數 2801 閱讀 2451

topology提交前會先判斷集群中是否存在同名作業,如果存在在提交失敗,如果沒有則會增加集群提交次數submittedcount,每次提交成功,該變數都會加1,然後會為該作業分配乙個id,生成規則如下:

public static string topologynametoid(string topologyname, int counter)
因此我們從作業id中就可以判斷集群作業成功提交次數、提交時間、還有作業名稱了,如果我們沒有指定acker數量,,storm會自動為我們生成乙個,然後進入作業校驗,對topology本身的校驗比較細緻:

1、元件id是否合法

2、是否存在同名id

3、woker數量是否合法,小於0或null

4、ack數量校驗同worker一樣

public void submittopologywithopts(string topologyname,

string uploadedjarlocation, string jsonconf,

stormtopology topology, submitoptions options)

throws alreadyaliveexception, invalidtopologyexception,

topologyassignexception, texception catch (alreadyaliveexception e) catch (throwable e)

//成功提交次數加1

int counter = data.getsubmittedcount().incrementandget();

string topologyid = common.topologynametoid(topologyname, counter);

try

serializedconf.put(config.topology_id, topologyid);

serializedconf.put(config.topology_name, topologyname);

mapstormconf;

stormconf = nimbusutils.normalizeconf(conf, serializedconf,

topology);

maptotalstormconf = new hashmap(

conf);

totalstormconf.putall(stormconf);

stormtopology normalizedtopology = nimbusutils.normalizetopology(

stormconf, topology, false);

// 校驗id、字段合法性,worker和acker數量合法性

common.validate_basic(normalizedtopology, totalstormconf,

topologyid);

// don't need generate real topology, so skip common.system_topology

// common.system_topology(totalstormconf, topology);

stormclusterstate stormclusterstate = data.getstormclusterstate();

// create /local-dir/nimbus/topologyid/***x files

// copy jar to /local-dir/nimbus/topologyid/stormjar.jar

setupstormcode(conf, topologyid, uploadedjarlocation, stormconf,

normalizedtopology);

// generate taskinfo for every bolt or spout in zk

// 為每個元件建立相應znode,並存放相應資料,資料如下:

/****

**///zk的目錄結構如下:

//[zk: localhost:2181(connected) 20] ls /jstorm/tasks/test-3-1421404402

// [3, 2, 1, 6, 5, 4]

setupzktaskinfo(conf, topologyid, stormclusterstate);

// make assignments for a topology

log.info("submit for " + topologyname + " with conf "

+ serializedconf);

//這裡開始任務分發,任務分發由topologyassign完成,這步僅僅是建立乙個事件物件放入佇列中,然後返回

//真正的任務分發由其他執行緒來操作,所以這裡返回比較快,除非佇列是滿的

// servicehandler中

makeassignment(topologyname, topologyid, options.get_initial_status());

} catch (failedassigntopologyexception e)

}

private void makeassignment(string topologyname, string topologyid, 

topologyinitialstatus status) throws failedassigntopologyexception else

}

JStorm之Topology提交客戶端

乙個topology包含一或多個spout bolt,spout負責在資料來源獲得資料並傳送給bolt,每個bolt負責做完處理後發給下乙個bolt。通常topology的建立是由topologybuilder來建立的,該元件會記錄包含哪些spout bolt,並做相應驗證 各元件是否有id衝突,校...

JStorm之Supervisor啟動流程

4 分配新的任務 該元件主要包含 心跳執行緒 supervisor事件接受執行緒 處理執行緒,一旦事件接受到則會進入任務分配環節,主要邏輯 如下 public static void main string args public void run catch exception e while s...

Jstorm排程規則

任務排程演算法以worker為維度 排程過程中正在進行的排程動作不會對已發生的排程動作產生影響 排程過程中使用者可以自定義 usedefined assignment,和使用已有的old assignment,這兩者的優先順序是 usedefined assignment old assignmen...