排程系統Airflow的第乙個DAG

2022-01-19 21:30:08 字數 4228 閱讀 3077

考慮了很久,要不要記錄airflow相關的東西, 應該怎麼記錄. 官方文件已經有比較詳細的介紹了,還有各種部落格,我需要有乙份自己的筆記嗎?

答案就從本文開始了.

本文將從乙個陌生視角開始認知airflow,順帶勾勒出應該如何一步步搭建我們的資料排程系統.

現在是2023年9月上旬, airflow最近的乙個版本是1.10.5.

ps. 查資料發現自己好多文章被爬走,換了作者.所以,接下裡的內容會隨機新增一些防偽標識,忽略即可.

什麼資料排程系統?

中臺這個概念最近比較火, 其中就有乙個叫做資料中臺, 文章資料中臺到底是什麼給出了乙個概念.

我粗糙的理解, 大概就是: 收集各個零散的資料,標準化,然後服務化, 提供統一資料服務. 而要做到資料整理和處理,必然涉及資料排程,也就需要乙個排程系統.[本文出自ryan miao]

資料排程系統可以將不同的異構資料互相同步,可以按照規劃去執行資料處理和任務排程. airflow就是這樣的乙個任務排程平台.

前面airflow1.10.4介紹與安裝已經

安裝好了我們的airflow, 可以直接使用了. 這是第乙個dag任務鏈.

目標: 每天早上8點執行乙個任務--列印hello world

在linux上,我們可以在crontab插入一條記錄:

使用springboot, 我們可以使用@scheduled(cron="0 0 8 * * ?")來定時執行乙個method.

使用quartz, 我們可以建立乙個crontrigger, 然後去執行對應的jobdetail.

crontrigger trigger = (crontrigger)triggerbuilder.newtrigger()

.withidentity("trigger1", "group1")

.withschedule(cronschedulebuilder.cronschedule("0 0 8 * * ?"))

.build();

使用airflow, 也差不多類似.

在docker-airflow中,我們將dag掛載成磁碟,現在只需要在dag目錄下編寫dag即可.

volumes:

- ./dags:/usr/local/airflow/dags

建立乙個hello.py

"""

airflow的第乙個dag

"""from airflow import dag

from airflow.operators.bash_operator import bashoperator

from datetime import datetime

default_args =

dag = dag("hello-world",

description="第乙個dag",

default_args=default_args,

schedule_interval='0 8 * * *')

t1 = bashoperator(task_id="hello", bash_command="echo 'hello world, today is }'", dag=dag)

這是乙個python指令碼, 主要定義了兩個變數.

dag

表示乙個有向無環圖,乙個任務鏈, 其id全域性唯一. dag是airflow的核心概念, 任務裝載到dag中, 封裝成任務依賴鏈條. dag決定這些任務的執行規則,比如執行時間.這裡設定為從9月1號開始,每天8點執行.

task

task表示具體的乙個任務,其id在dag內唯一. task有不同的種類,通過各種operator外掛程式來區分任務型別. 這裡是乙個bashoperator, 來自airflow自帶的外掛程式, airflow自帶了很多拆箱即用的外掛程式.

ds

airflow內建的時間變數模板, 在渲染operator的時候,會注入乙個當前執行日期的字串. 後面會專門講解這個執行日期.

[本文出自ryan miao]

將上述hello.py上傳到dag目錄, airflow會自動檢測檔案變化, 然後解析py檔案,匯入dag定義到資料庫.

訪問airflow位址,重新整理即可看到我們的dag.

開啟dag, 進入dag定義, 可以看到已經執行了昨天的任務.

點選任務例項, 點選view log可以檢視日誌

我們的任務在這台機器上執行,並列印了hello, 注意, 這個列印的日期.

這樣就是乙個基本的airflow任務單元了, 這個任務每天8點會執行.

定義乙個任務的具體內容,比如這裡就是列印hello world,today is }.

任務設定了執行時間,每次執行時會生成乙個例項,即 dag-task-executiondate 標記乙個任務例項.任務例項和任務當前代表的執行時間繫結. 本demo中,每天會生成乙個任務例項.

今天是2019-09-07, 但我們日誌裡列印的任務執行日期是2019-09-06.

執行日期是任務例項執行所代表的任務時間, 我們通常叫做execute-date或bizdate, 類似hive表的的分割槽.

為什麼今天執行的任務,任務的時間變數是昨天呢?

因為任務例項是乙個時間段的任務, 比如計算每天的訪問量, 我們只有6號這一天過去了才能計算6號這一天的的總量. 那這個任務最早要7號0點之後才能計算, 計算6號0點到7號0點之間的訪問量.所以,這個任務時間就代表任務要處理的資料時間, 就是6號. 任務真正執行時間不固定的, 可以7號, 也可以8號, 只要任務執行計算的資料區間是6號就可以了.

因此, 排程系統中的ds(execution date)通常是過去的乙個週期, 即本週期執行上週期的任務.

最典型的任務模型etl(extract & transformation & loading,即資料抽取,轉換,載入)最少也要分成3步. 對於每天要統計訪問量這個目標來說, 我必須要抽取訪問日誌, 找到訪問量的字段, 計算累加. 這3個任務之間有先後順序,必須前乙個執行完畢之後,後乙個才可以執行. 這叫任務依賴. 不同的任務之間的依賴.在airflow裡, 通過在關聯任務實現依賴.

還有同乙個任務的時間依賴. 比如,計算新增使用者量, 我必須知道前天的資料和昨天的資料, 才能計算出增量. 那麼, 這個任務就必須依賴於昨天的任務狀態. 在airflow裡,通過設定depends_on_past來決定.

airflow裡有個功能叫backfill, 可以執行過去時間的任務. 我們把這個操作叫做補錄或者補數,為了計算以前沒計算的資料.

我們的任務是按時間執行的, 今天建立了乙個任務, 計算每天的使用者量, 那麼明天會跑出今天的資料. 這時候,我想知道過去1個月每天的使用者增量怎麼辦?

自己寫code, 只要查詢日期範圍的資料,然後分別計算就好. 但排程任務是固定的, 根據日期去執行的. 我們只能建立不同日期的任務例項去執行這些任務. backfill就是實現這種功能的.

讓跑過的任務再跑一次.

在airflow裡, 通過點選任務例項的clear按鈕, 刪除這個任務例項, 然後排程系統會再次建立並執行這個例項.

關於排程系統這個實現邏輯, 我們後面有機會來檢視原始碼了解.

本文沒太實質性的任務具體介紹, 而是引出hello world, 先跑起來,我們接下來繼續完善我們的dag.

python第乙個程式設計 第乙個 Python 程式

簡述 安裝完 python 後,windows 中 開始選單或安裝目錄下就會有 idle 開發 python 程式的基本 ide 整合開發環境 幫助手冊 模組文件等。linux 中 只需要在命令列中輸入 python 命令即可啟動互動式程式設計。互動式程式設計 互動式程式設計不需要建立指令碼檔案,是...

第乙個部落格

我不知道為什麼 我在csdn上創了乙個賬號,又開通了部落格。也許我不是名人,也許幻想著成為名人。在這裡 我不會給任何人許諾,這個部落格可能有乙個博文 有兩個博文 或者會有很多 很多 很多。不過讓我有個大膽的猜想,如果這個部落格在今後有很多很多自己寫的博文,說明我成功了 在自己眼裡 也說明這個方法時正...

第乙個爬蟲

很多人學習python的目的就是為了學習能夠實現爬蟲的功能,這裡,我使用了scrapy框架來實現了乙個簡單的爬蟲功能,這裡我簡單的介紹一下scrapy專案的建立,和執行。1,第一步是安裝scrapy,我相信到了這一步,大多數人都已經會安裝第三方庫檔案了,這裡主要是使用命令pip install sc...