DataX原始碼分析 2

2022-07-29 15:30:20 字數 2435 閱讀 9448

接著上一節…… 做好了切分工作,下一步當然就是對對應的各個任務進行任務託管和監控:schedule,post,posthandle,invokehooks。 schedule首先完成的工作是把上一步reader和writer split的結果整合到具體的taskgroupcontainer中。

int channelspertaskgroup = this.configuration.getint(

coreconstant.datax_core_container_taskgroup_channel, 5);

int tasknumber = this.configuration.getlist(

coreconstant.datax_job_content).size();

this.needchannelnumber = math.min(this.needchannelnumber, tasknumber);

perftrace.getinstance().setchannelnumber(needchannelnumber);

從上面的**看出,在不配置splitpk的情況下,單錶etl不管配置channel值為大於一的任何值,最後的channel數都為1。 公平的分配 task 到對應的 taskgroup中,返回configuration集合。將執行模式置為standalone,交給abstractscheduler,啟動所有的任務執行緒startalltaskgroup(configurations)。

@override

public void startalltaskgroup(listconfigurations)

this.taskgroupcontainerexecutorservice.shutdown();

}

真正擔任執行的是taskgroupcontainer.start()方法,該方法參與狀態匯報和task啟動,並實現自動容錯機制。具體的sql執行由plugin對應資料庫**完成。

while (true) 

//3.有任務未執行,且正在執行的任務數小於最大通道限制

iteratoriterator = taskqueue.iterator();

while(iterator.hasnext() && runtasks.size() < channelnumber)

.............

//4.任務列表為空,executor已結束, 蒐集狀態為success--->成功

if (taskqueue.isempty() && isalltaskdone(runtasks) && containercommunicator.collectstate() == state.succeeded) ] completed it's tasks.", this.taskgroupid);

break;

}// 5.如果當前時間已經超出匯報時間的interval,那麼我們需要馬上匯報

long now = system.currenttimemillis();

if (now - lastreporttimestamp > reportintervalinmillsec)

}thread.sleep(sleepintervalinmillsec);

}

不斷迴圈communication傳過來的訊息進行收集處理。

while (true) 

errorlimit.checkrecordlimit(nowjobcontainercommunication);

if (nowjobcontainercommunication.getstate() == state.succeeded)

if (isjobkilling(this.getjobid())) else if (nowjobcontainercommunication.getstate() == state.failed)

thread.sleep(jobsleepintervalinmillsec);

}

post方法對於reader啥也沒乾,對writer對處理資料後要做的事兒。

if (null != renderedpostsqls && !renderedpostsqls.isempty()) ]. context info:{}.",

stringutils.join(renderedpostsqls, ";"), jdbcurl);

writerutil.executesqls(conn, renderedpostsqls, jdbcurl, databasetype);

dbutil.closedbresources(null, null, conn);

}

好啦,大致重要的方法就這些

阿里 DataX原始碼解讀彙總

將自己datax的系列文章進行彙總形成目錄 datax 1 編譯打包使用 datax 2 通過idea搭建原始碼閱讀 除錯環境 datax 3 win環境cmd亂碼 datax 4 datax.py解讀 datax 5 改造公升級 自動識別py環境,執行datax任務 datax 6 啟動步驟解析 ...

iptables原始碼分析(2)

1.1 表的查詢 再回到iptc init 函式上來,它根據表名,從核心獲取對應的表的相關資訊,handle是乙個iptc handle t型別的指標,在libiptc.c中,有如下定義 transparent handle type.typedef struct iptc handle iptc ...

Leveldb原始碼分析 2

輕鬆一刻,前面約定中講過leveldb使用了很多varint型編碼,典型的如後面將涉及到的各種key。其中的編碼 解碼函式分為varint和fixedint兩種。int32和int64操作都是類似的。首先是fixedint編碼,直接上 很簡單明瞭。void encodefixed32 char bu...