Atsushi2022の日記

データエンジニアリングに関連する記事を投稿してます

Airflowを試してみる

Airflowの要点っぽいところ

AirflowではワークフローをDAGと呼ぶ。DAGは複数のTaskからなる。

Taskが様々な処理を実行し、DAGが各Taskの実行順序を定義する。

TaskにはOperartorやSensorといった種類がある。

なので、関係性は次のようなイメージ。

  • DAG
    • Tasks (OperatorやSensor)

Operatorには例えばbashコマンドを扱うためのBashOperatorPython関数を呼ぶためのPythonOperatorが存在する。

HookはOperatorの構成要素で、外部プラットフォームのAPIを簡単に利用できるようにしてくれる。例えば、BigQueryの細かいAPI仕様を知らなくても、BigQuery用に用意されたBigquery Hookを使用すれば、用意された関数を使用してテーブル作成といった処理を簡単に記述できる(後述)。

Sensorは、イベントが発生したことを検知するまで待機する。例えばファイルが作成されたら、処理を開始するといったことができる。

DAGとTaskの定義

以下の例(bq.py)では、BigQueryに空のテーブルを作成する。with DAG(...) as dag:でDAGを宣言し、with文の中にcreate_bq_tableというTaskを配置している。

create_bq_tableというTaskでは、BQテーブルを作成するcreate_bq_table()関数を呼んでいる。create_bq_table()関数では、BigQueryHook()というHookを使用して、create_empty_table()メソッドにより空のテーブルを作成する。BigQueryHook()には、様々なメソッドが用意されている。

BigQueryHook()ではGCP上のBigQueryを操作するので、GCPの認証情報を渡す必要がある。Airflowでは外部プラットフォームの認証情報定義をConnectionと呼ぶ。bq.pyではgcp_conn_id='google_cloud_default'BigQueryHook()で使用する認証情報を指定している。ConnectionはAirflow Web UIを使用して定義できる(参考

'''
BigQueryに空のテーブルを作成する。
'''
from datetime import datetime

from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook


def create_bq_table():
    bigqueryhook: BigQueryHook = BigQueryHook(gcp_conn_id='google_cloud_default')

    project_id      = "dev-gcplab-01"
    dataset_id      = "devbqds01"
    table_id        = "emp_salary"
    schema_fields   = [
        {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
        {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
    ]

    bigqueryhook.create_empty_table(
        project_id      = project_id,
        dataset_id      = dataset_id,
        table_id        = table_id,
        schema_fields   = schema_fields,
    )

with DAG(
    'bq01', 
    start_date=datetime(2023, 10, 8), 
    schedule='@daily', # 他のインターバルプリセットはこちら。https://airflow.apache.org/docs/apache-airflow/1.10.1/scheduler.html#dag-runs
    catchup=False # リトライしない。
) as dag:
    create_bq_table = PythonOperator(
        task_id = "create_table",
        python_callable = create_bq_table,
    )

DAGの定義

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#declaring-a-dag

DAGの定義方法は以下の3種類がある。

  • コンテキストマネージャを使うパターン
  • コンストラクターを使うパターン
  • デコレータを使うパターン

コンテキストマネージャを使うパターン

from airflow import DAG

with DAG(
    dag_id="my_dag_name",
    start_date=datetime.datetime(2021, 1, 1),
    schedule="@daily",
):
    EmptyOperator(task_id="task")

コンストラクターを使うパターン

from airflow import DAG

my_dag = DAG(
    dag_id="my_dag_name",
    start_date=datetime.datetime(2021, 1, 1),
    schedule="@daily",
)
EmptyOperator(task_id="task", dag=my_dag)

デコレータを使うパターン

from airflow.decorators import dag

@dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily")
def generate_dag():
    EmptyOperator(task_id="task")


generate_dag()

Airflowをローカル上で試す

ローカル上でAirflowを試すには、Composer ローカル開発 CLI ツールを使用するが楽。

GCPが提供しているAirflow実行環境サービスにCloud Composerというのがあるが、GCPComposerをローカルで使用できるツールを用意してくれている(ありがたや)。ツールでなにをしているかというと、DockerコンテナとしてAirflow環境を作成・起動している。

もちろんDockerコンテナを使用してAirflow環境を構築しても良いのだけれど、面倒なので、こちらのツールを使用するのが良いと思う。

ツールを使用するために、Python バージョン 3.7 ~ 3.10pip、それからGoogle Cloud CLIをインストールしておくこと。

ツールはgitからcloneしてpipでインストールする。

$ gcloud auth application-default login 
$ gcloud auth login
$ git clone https://github.com/GoogleCloudPlatform/composer-local-dev.git
$ cd composer-local-dev
$ pip install .
$ composer-dev list-available-versions --include-past-releases --limit 10
$ composer-dev create --from-image-version composer-2.4.4-airflow-2.5.3 local_composer_env #事前にDockerを起動させておくこと!
$ composer-dev start local_composer_env # composer-dev restart local_composer_env

composer-dev start local_composer_envでAirflow環境を起動すると、次のようなメッセージが表示される。

Started local_composer_env environment.

1. You can put your DAGs in C:\Users\xxxxx\Documents\Airflow\composer-local-dev\composer\local_composer_env\dags
2. Access Airflow at http://localhost:8080

Airflow Webサーバーも起動されているので、ブラウザを開いてhttp://localhost:8080にアクセスするとAirflow Web UIにアクセスできる。

作成した.pyファイルはC:\Users\xxxxx\Documents\Airflow\composer-local-dev\composer\local_composer_env\dagsに保存すると、Airflow Web UIが定期的に読み込んでくれる。

Connection

先ほど作成したbq.pyをAirflowで実行してみたいので、まずbq.pyをAirflowが読み込んでくれるディレクトリに格納する。

さらに、GCPでの操作を実行するにはサービスアカウントの認証情報が必要になるので、Connectionを設定していく。

GCP上でサービスアカウントを作成し、サービスアカウントキーを作成する。サービスアカウントにはComposer ワーカーロールとBigQuery ユーザーロールを割り当てておく。

作成したサービスアカウントキーをdocker cpコマンドでAirflowコンテナ内にコピーする。ファイルが配置されたことをコンテナに入って確認するには、docker exec -it composer-local-dev-local_composer_env /bin/bashとする。

> docker cp [ローカルのサービスアカウントキーファイル] composer-local-dev-local_composer_env:/home/airflow/sa_key.json

準備ができたので、サービスアカウントキーをConnectionに設定する。

Airflow Web UI上で、Admin -> Connections を選択する。 google_cloud_defaultというConnectionを編集し、Keyfile Pathにサービスアカウントキーのパスを指定する。

Testボタンで接続できることを確認したら、Saveする。

これでConnectionの設定は完了。本番環境では、Connection情報はGCPのSecret Manager上で管理すべきだが、ローカルでのテストなので上記の方法で設定した。

DAGの手動実行

DAGを実行する準備ができたので、実行してみる。但し、BigQuery上にデータセットdev-gcplab-01.devbqds01を事前に作成しておくこと。

Airflow Web UI上で、DAGsビューでbq01の再生ボタンをクリックし、Trigger DAGをクリックしてDAGを実行する。

実行してしばらくすると、DAGのステータスがsuccessになる。失敗した場合はerrorとなる。

errorになった場合は、bq01のGraghビューに遷移し、エラーとなったタスクのLogを表示して、エラー原因を確認できる。

今回はここまで。

次回はタスクの実行順序やAirflowのアーキテクチャなどについてもまとめたい。