用Crontab打造簡易工作流引擎

2022-07-21 03:09:11 字數 2584 閱讀 2581

眾所周知,oozie(1, 2)是基於時間條件與資料生成來做工作流排程的,但是oozie的資料觸發條件只支援hdfs路徑,故而面臨著這樣的問題:

因此,靈活可擴充套件的工作流引擎才是正確姿勢!下面,我將介紹如何用crontab來打造乙個類似於oozie的簡易工作流引擎;對標oozie,其應滿足功能:

判斷hive partition是否已存在,思路比較簡單——show partitions後能否grep到該partition:

# check wheter $1's partition ($2) exists

hive_partition_exists() " | grep $

[ $? == 0 ]

}

獲取hive 表的最後乙個partition,grep命令配合正規表示式中的lookahead匹配:

# get latest hive partition

latest_hive_partition() " | tail -1 | grep -po "(?<=$=).*"

}

在檢查es index是否寫入完成時,可用思路——定時flush index,然後判斷當前時刻的doc數較上一時刻是否發生變化;若變化,則說明正在寫入。shell指令碼處理json太蛋疼了,故不給出**啦。

所謂「條件輪詢」,是指如果資料未生成,則會一直輪詢該條件是否滿足。我們採用while迴圈中sleep的方式來實現條件輪詢:

hive_partition_exists etl.ad_tb1 $

ad1_exists=$?

hive_partition_exists etl.ad_tb2 $

ad2_exists=$?

while (( $ != 0 || $ != 0))

do echo "`date -d "now"`: log partitions $ not exist, and waiting" >> $

sleep 1m

hive_partition_exists etl.ad_tb1 $

ad1_exists=$?

hive_partition_exists etl.ad_tb2 $

ad2_exists=$?

done

接下來,以hive寫elasticsearch的為例,說明如何用crontab做定時hive任務。hiveql指令碼如下:

add jar /path/to/jars/elasticsearch-hadoop-2.3.1.jar;

set mapred.job.name=ad_tag-$~~$;

set hive.map.aggr = false;

insert overwrite table ad_tag

select media, a.dvc as dvc, case when c1_arr is null then array('empty') else c1_arr end as c1_arr, '$' as week_time

from (

from ad_log

where is_exposure = '1' and day_time between date_sub('$', 6) and '$'

) a

left outer join (

select dvc, collect_set(c1) as c1_arr

from tag

lateral view inline(tag) in_tb

where day_time = '$'

group by dvc

) bon a.dvc = b.dvc;

為了實現任務的並行執行,我用到linux命令中的&

log_partition=`date -d "5 day ago" "+%y-%m-%d"`

tag_partition=$(latest_hive_partition tag.dmp_tag day_time)

log_path="$.log"

echo "`date -d "now"`: log partitions $ exist" >> $

echo "`date -d "now"`: latest tag partition $" >> $

hive -f ad_tag1.hql --hivevar log_partition=$ --hivevar tag_partition=$ & hive -f ad_tag2.hql --hivevar log_partition=$ --hivevar tag_partition=$

exit 1

ps: 當手動執行指令碼是ok的,但是crontab去執行時卻出錯,最可能的原因是crontab未能正確載入使用者的環境變數;故可以在執行指令碼中加入:

source /etc/profile

source /path/to/.bashrc

但是,用crontab做工作流排程,會存在如下問題:

工作流建模 工作流概念

工作流建模 工作流概念 1 案例 工作流系統得基本目的是處理案例。每個案例都有乙個唯一標識,而且每個案例的生命週期都是有限的。案例生命週期都處於某個特定狀態,該狀態由三個元素組成 1 案例相關的屬性的值 案例屬性是一系列同案例相關的變數。能夠用來管理案例。正是通過這些變數,才有可能指出在特定條件下某...

工作流 一 什麼是工作流

什麼是工作流 工作流的英文全稱是 workflow,簡單理解則是業務流程的計算機化或自動化。它是是針對工作中具有固定程式的常規活動而提出的乙個概念,通過將工作活動分解定義良好的任務 角色 規則和過程來進行執行和監控,達到提高生產組織水平和工作效率的目的。工作流技術發端於70年代中期辦公自動化領域的研...

知識體系 打造寫作工作流

我想將我的學習過程全部記錄下來,技術,工作,生活,還是思維片段,所有能記的都要記下來,終生學習這個理念不單要植入自己的腦子還要形成肌肉記憶。當然記錄這件事情也一直在做,但是做得並不好,單純的記錄其實意義不大,如果能分享出去,並因此而獲取一些正向的反饋,然後再激勵自己去學習 記錄 分享 獲得正向反饋形...