Airflow核心概念

2021-10-12 12:43:27 字數 3587 閱讀 9670

airflow核心概念

概念描述

dags

dag即有向無環圖,將所有需要執行的tasks按照依賴關係組織起來,描述的是所有tasks執行的順序。

dag runs

dag runs是dag的乙個物理例項,表示dag的一次執行(狀態),即是具有特定執行時間(execution_date)的 dag。它包含了特定execution_date執行的所有任務例項。

operators

可以簡單理解為乙個class,描述了dag中乙個具體的task具體要做的事。其中,airflow內建了很多operators,如bashoperator 執行乙個bash 命令,pythonoperator 呼叫任意的python 函式,emailoperator 用於傳送郵件,httpoperator 用於傳送http請求, sqloperator 用於執行sql命令...同時,使用者可以自定義operator,這給使用者提供了極大的便利性。

tasks

task 是 operator的乙個例項,也就是dags中的乙個node。

task instance

任務例項,表示task的一次執行(狀態),即是具有特定執行時間(execution_date)的task。

task relationships

dags中的不同tasks之間可以有依賴關係,如 taska >> taskb,表明taskb依賴於taska。

task lifecycle

task生命週期,從task開始到完成,有很多的狀態,比如:success、running、failed、retry、no status等。

hooks

鉤子是外部平台和資料庫的介面,如hive,s3,mysql,postgres,hdfs和pig。 hooks盡可能實現通用介面,並充當operator的構建塊。

pools

當有太多程序同時需要執行時,某些系統可能會被壓垮。 airflow pools可用於限制任意任務集上的併發執行。 要以在ui(選單 - >管理 - >pools)中管理pools列表,通過為pools命名並為其分配多個worker slots。 然後在建立任務時(即例項化operators),可以通過使用pools引數將task與現有pools之一相關聯。

connections

外部系統的連線資訊儲存在airflow元資料資料庫中並在ui中進行管理(menu -> admin -> connections)。在那裡定義了conn_id,並附加了主機名/登入/密碼/schema資訊。 airflow pipelines可以簡單地引用集中配置管理中的conn_id,而無需在任何地方硬編碼任何此類資訊。

許多鉤子都有預設的conn_id,使用該鉤子的操作符不需要提供顯式的連線id。例如,postgreshook的預設conn_id是postgres_default。

queues

在使用celeryexecutor時,可以指定任務傳送到的celery佇列。queue是baseoperator的乙個屬性,所以任何任務都可以分配給任何佇列。環境的預設佇列是在airflow.cfg的celery -> default_queue定義的。

這定義了未指定時分配給任務的佇列,以及在啟動時airflow workers監聽哪個佇列。workers可以監聽乙個或多個任務佇列。當乙個worker啟動時(使用命airflow worker),可以指定一組以逗號分隔的佇列名稱(例如,airflow worker -q spark)。然後,這個worker將只拾取連線到指定佇列的任務。

xcoms

xcoms允許任務間交換訊息,允許更細微的控制形式和共享狀態。該名稱是「cross-communication」的縮寫。xcoms主要由乙個key,value和timestamp所定義,但也跟蹤建立xcom的task/dag,以及何時應該可見的屬性。任何可以被pickled的物件都可以用作xcom值,因此使用者應該確保使用適當大小的物件。

xcoms支援「推」(傳送)或「拉」(接收)的方式處理訊息。 當任務推送xcom時,它通常可用於其他任務。 任務可以通過呼叫xcom_push()方法隨時推送xcoms。 此外,如果任務返回乙個值(來自其operator的execute()方法,或者來自pythonoperator的python_callable函式),則會自動推送包含該值的xcom。

tasks可以呼叫xcom_pull()來檢索xcoms,可選地根據key、source task_ids和source dag_id等條件應用過濾器。 預設情況下,xcom_pull()會過濾出從執行函式返回時被自動賦予xcom的鍵(與手動推送的xcom相反)。

如果為task_ids傳遞xcom_pull單個字串,則返回該任務的最新xcom值; 如果傳遞了task_ids列表,則返回相應的xcom值列表。

variables

變數是將任意內容或配置作為乙個key/value簡單鍵值儲存的通用方法。 可以從ui(admin -> variables),**或cli列出,建立,更新和刪除變數。 此外,json配置檔案可以通過ui批量上傳。 雖然pipeline**定義和大多數常量和變數應該在**中定義並儲存在源**控制中,但是通過ui可以訪問和修改某些變數或配置項會很有用。

branching

有時您需要乙個工作流分支,或者只根據任意條件走下某條路徑,這通常與上游任務中發生的事情有關。 一種方法是使用branchpythonoperator。

branchpythonoperator與pythonoperator非常相似,只是它需要乙個返回task_id的python_callable。 返回task_id,並跳過所有其他路徑。 python函式返回的task_id必須直接引用branchpythonoperator任務下游的任務。

subdags

subdag非常適合重複模式。 在使用airflow時,定義乙個返回dag物件的函式是乙個很好的設計模式。詳見官網說明。

slas

服務水平協議或任務或dag應成功完成的時間,可以在任務級別上設定為時間增量。如果乙個或多個例項到那時還沒有成功,則會傳送警報電子郵件,詳細說明錯過sla的任務列表。事件也被記錄在資料庫中,並在瀏覽->sla下的web ui中可用,在這裡事件可以被分析和記錄。

trigger rules

雖然正常的工作流行為是在所有直接上游任務成功時觸發任務,但airflow允許更複雜的依賴設定。

所有操作符都有乙個trigger_rule引數,該引數定義了觸發所生成任務的規則。trigger_rule的預設值是all_success,可以定義為「當所有直接上游任務成功時觸發此任務」。這裡描述的所有其他規則都是基於直接父任務,並且是在建立任務時可以傳遞給任何操作符的值:all_success、all_failed、all_done、one_failed、one_success等。

詳見官網說明。

latest run only

標準的工作流行為包括在特定的日期/時間範圍內執行一系列任務。然而,有些工作流執行的任務與執行時無關,但需要按照計畫執行,這與標準的cron作業非常相似。在這些情況下,回填或執行在暫停期間錯過的作業只會浪費cpu週期。

對於這種情況,您可以使用latestonlyoperator跳過在dag的最近計畫執行期間未執行的任務。 如果現在的時間不在其execution_time和下乙個計畫的execution_time之間,則latestonlyoperator將跳過所有直接下游任務及其自身。

更詳細的說明,請見官網說明:

Airflow 中文文件 概念

airflow platform是用於描述,執行和監控工作流的工具。在airflow中,dag 或定向非迴圈圖 是您要執行的所有任務的集合,以反映其關係和依賴關係的方式進行組織。例如,乙個簡單的dag可以包含三個任務 a,b和c.可以說a必須在b可以執行之前成功執行,但c可以隨時執行。它可以說任務a...

Airflow學習之路一 概念

dag runs operators tasks task instances dag是乙個由n n 1 n geqslant1 n 1 個task構成的有向無環圖。它記錄了任務之間的邏輯關係,排程時間,任務狀態等等。以字典的形式將引數傳入dag中。官方文件1 中將引數統一放置於default ar...

Airflow安裝部署

新聞資訊是通過爬蟲獲取,使用scrapy框架進行爬蟲任務 使用airflow工作流監控平台對爬蟲任務進行管理 監控 可使用celeryexecutor分布式,也可使用localexecutor多程序進行資料採集 以下主要是對airflow的安裝和配置。目前使用的系統環境為centos linux r...