ブログ

Airflow で異なる DAG 間の依存関係を設定する方法

こんにちは。ソフトウェアエンジニアの冨田です。
FLYWHEEL Advent Calendar 2019 の15日目は、Apache Airflow で ExternalTaskSensor を使い異なる DAG に属するタスク間に依存関係を設定する方法を紹介します。
最初にAirflowの紹介は簡単にするものの、Airflow 使用したことがある読者を想定しています。

Apache Airflow について

フライウィールが開発しているデータプラットフォームではデータパイプラインの管理に Apache Airflow を採用しています。
Airflow は Python で定義されたワークフローの管理やスケジューリングを行うためのプラットフォームで、GCP では Cloud Composer というマネージドのサービスが提供されています。
Airflow ではワークフロー内のタスクの依存関係を DAG (Directed Acyclic Graph, 有向非巡回グラフ) と呼ばれる閉路を持たない有向グラフで定義することができます。定義した DAG は定期的に実行するようスケジュールを設定したり、Web View から実行状況を確認、必要であれば再実行するなどの処理を行うことができます。
例えば下記のようなDAGを実装すると、タスクCはタスクAとタスクBが成功した場合にのみ実行されるようになります。

DAG 間の依存関係

一つの DAG の肥大化を防ぐために、私のプロジェクトでは DAG を役割ごとに複数個定義していました。日々追加される商品データを処理するための DAG、ユーザーのウェブサイト上での行動ログを処理するための DAG、受注データを処理するための DAG、といったイメージです。
しかし、新機能の開発を進めていく上で、複数の DAG 間に依存関係を設定したいケースが出てくるようになりました。状況の一例を図で示すと次のようになります。DAG-1 のタスクCの終了を待ってから、DAG-2 のタスクEを実行したい、つまり赤い点線のような依存関係を設定したい状況です。

依存関係は通常は同じ DAG 内のタスクに対して設定するのですが、このような DAG を超えた依存関係はどのように設定できるのでしょうか。

ExternalTaskSensor の活用

異なる DAG に存在するタスクに依存関係を設定するには、ExternalTaskSensor を活用することができます。ExternalTaskSensor では  external_dag_id  external_task_id  を指定することができ、指定されたタスクのステータスを定期的1に確認します。指定されたタスクの状態が  allowed_status  に含まれるいずれかの状態に遷移した時、ExternalTaskSensor 自体も success ステータスとなります。
各パラメータの細かい定義はドキュメントを参照していただきたいのですが、具体的な使用例は下記のようになります。先ほど紹介した DAG-1 を午前10時、DAG-2 を午前10時1分に実行する場合は次のような ExternalTaskSensor を定義すれば良いことになります。なお、動作検証等は Airflow Version 1.10.6 で行っています。

# dag1.py
import datetime
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
DAG_ID = "dag_1"
DEFAULT_ARGS = {'start_date': datetime.datetime(2019, 12, 15)}
with DAG(DAG_ID, default_args=DEFAULT_ARGS,
     schedule_interval="0 10 * * *") as dag:
  task_a = DummyOperator(task_id='task_a')
  task_b = DummyOperator(task_id='task_b')
  task_c = DummyOperator(task_id='task_c')
  [task_a, task_b] >> task_c
# dag2.py
import datetime
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
DAG_ID = "dag_2"
DEFAULT_ARGS = {'start_date': datetime.datetime(2019, 12, 15)}
with DAG(DAG_ID, default_args=DEFAULT_ARGS,
     schedule_interval="01 10 * * *") as dag:
  task_d = DummyOperator(task_id='task_d')
  task_e = DummyOperator(task_id='task_e')
  task_c_sensor = ExternalTaskSensor(
    task_id='task_c_sensor',
    external_dag_id='dag_1',
    external_task_id='task_c',
    allowed_states=['success'],
    execution_delta=datetime.timedelta(minutes=1))
[task_d, task_c_sensor] >> task_e

実際にこの2つの DAG を設定すると、タスクCの終了を待ってからタスクEが実行される様子が確認できます。

ExternalTaskSensor の注意点

いままで ExternalTaskSensor を使うと異なる DAG に対して依存関係を設定できることを見てきました。
個人的に ExternalTaskSensor を使う上で注意した方が良いと感じるのは、依存関係が Web View で明示的に可視化されない点です。通常、同一 DAG 内の依存関係は Airflow の Graph View で確認できるのですが、ExternalTaskSensor の依存関係はここでは可視化されません。
ExternalTaskSensor で複雑な依存関係を設定しすぎると、依存関係に気づかないで DAG を更新してしまい思わぬ事故が発生する可能性もあるので、ExternalTaskSensor の使用は必要最低限に止めるべきでしょう。

おまけ: XCOM で値も受け渡す

依存関係だけでなく、異なる DAG で計算した値を受け取って使用したい場面もあるかもしれません。先ほどの例でいうと、タスクCからタスクEに何か値を渡したいとします。
このような状況では XCOM を使用することができます。XCOM は cross-communication を意味し異なるタスク間でメッセージをやり取りするために用意された手段です。
公式ドキュメントに詳しく説明されているように、タスクは key-value ペアを XCOM に push したり、別のタスクが push した値 pull することができます。
BashOperator の場合、 xcom_push=True  というオプションを指定すると、 キーが “return_value” で値が標準出力の最後の行という key-value ペアを push してくれます。例えば下記のようにすれば、 “hello world!” という文字列を XCOM に push できます。

task_c = BashOperator(task_id='task_c',
             bash_command='sleep 120 && echo "hello world!"',
             xcom_push=True)

次にタスクEではこの値を pull します。pull は TaskInstance オブジェクトから行うことができ、マクロを使うと  {{ ti.xcom_pull(…) }}  といった形で使用できます。
タスクEで先ほどタスクCから push した値を pull する場合は下記のようになります。

task_e = BashOperator(
  task_id='task_e',
  bash_command=
  'echo "recieved: {{ ti.xcom_pull(dag_id="dag_1", task_ids="task_c", include_prior_dates=True) }}"'
)

 include_prior_dates=True  となっているのがポイントで、デフォルトの値である  include_prior_dates=False  の場合、同じ  execution_date  を持つ DAG からしか値を pull することができません。
今回の例のように  execution_date  が古い DAG (過去にトリガーされた DAG) から値を pull したい場合は、 include_prior_dates=True  とすると直近に実行された DAG に含まれるタスクから値を pull してきてくれます。

まとめ

今回は ExternalTaskSensor や XCOM を使用して異なる DAG 間で依存関係の設定や値の受け渡しを行う方法を紹介しました。
基本的には一つの DAG 内で依存関係が完結することが望ましいとは思いますが、どうしても必要な場合は ExternalTaskSensor の使用も検討してみてはいかがでしょうか。
フライウィールではこのような技術を使い、データプラットフォームを一緒に開発してくれるエンジニアを募集しています。我こそは!と思う方はぜひオフィスに遊びに来てみてください。

Notes


  1. デフォルトの設定では BaseSensorOperatorpoke_interval の初期値である60秒ごと