Airflowの要点っぽいところ
AirflowではワークフローをDAGと呼ぶ。DAGは複数のTaskからなる。
Taskが様々な処理を実行し、DAGが各Taskの実行順序を定義する。
TaskにはOperartorやSensorといった種類がある。
なので、関係性は次のようなイメージ。
- DAG
- Tasks (OperatorやSensor)
Operator
には例えばbashコマンドを扱うためのBashOperator
やPython関数を呼ぶためのPythonOperator
が存在する。
HookはOperatorの構成要素で、外部プラットフォームのAPIを簡単に利用できるようにしてくれる。例えば、BigQueryの細かいAPI仕様を知らなくても、BigQuery用に用意されたBigquery Hookを使用すれば、用意された関数を使用してテーブル作成といった処理を簡単に記述できる(後述)。
Sensorは、イベントが発生したことを検知するまで待機する。例えばファイルが作成されたら、処理を開始するといったことができる。
DAGとTaskの定義
以下の例(bq.py
)では、BigQueryに空のテーブルを作成する。with DAG(...) as dag:
でDAGを宣言し、with文の中にcreate_bq_table
というTaskを配置している。
create_bq_table
というTaskでは、BQテーブルを作成するcreate_bq_table()
関数を呼んでいる。create_bq_table()
関数では、BigQueryHook()
というHookを使用して、create_empty_table()
メソッドにより空のテーブルを作成する。BigQueryHook()
には、様々なメソッドが用意されている。
BigQueryHook()
ではGCP上のBigQueryを操作するので、GCPの認証情報を渡す必要がある。Airflowでは外部プラットフォームの認証情報定義をConnectionと呼ぶ。bq.pyではgcp_conn_id='google_cloud_default'
でBigQueryHook()
で使用する認証情報を指定している。ConnectionはAirflow Web UIを使用して定義できる(参考)
''' BigQueryに空のテーブルを作成する。 ''' from datetime import datetime from airflow.models import DAG from airflow.operators.python import PythonOperator from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook def create_bq_table(): bigqueryhook: BigQueryHook = BigQueryHook(gcp_conn_id='google_cloud_default') project_id = "dev-gcplab-01" dataset_id = "devbqds01" table_id = "emp_salary" schema_fields = [ {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}, ] bigqueryhook.create_empty_table( project_id = project_id, dataset_id = dataset_id, table_id = table_id, schema_fields = schema_fields, ) with DAG( 'bq01', start_date=datetime(2023, 10, 8), schedule='@daily', # 他のインターバルプリセットはこちら。https://airflow.apache.org/docs/apache-airflow/1.10.1/scheduler.html#dag-runs catchup=False # リトライしない。 ) as dag: create_bq_table = PythonOperator( task_id = "create_table", python_callable = create_bq_table, )
DAGの定義
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#declaring-a-dag
DAGの定義方法は以下の3種類がある。
- コンテキストマネージャを使うパターン
- コンストラクターを使うパターン
- デコレータを使うパターン
コンテキストマネージャを使うパターン
from airflow import DAG with DAG( dag_id="my_dag_name", start_date=datetime.datetime(2021, 1, 1), schedule="@daily", ): EmptyOperator(task_id="task")
コンストラクターを使うパターン
from airflow import DAG my_dag = DAG( dag_id="my_dag_name", start_date=datetime.datetime(2021, 1, 1), schedule="@daily", ) EmptyOperator(task_id="task", dag=my_dag)
デコレータを使うパターン
from airflow.decorators import dag @dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily") def generate_dag(): EmptyOperator(task_id="task") generate_dag()
Airflowをローカル上で試す
ローカル上でAirflowを試すには、Composer ローカル開発 CLI ツールを使用するが楽。
GCPが提供しているAirflow実行環境サービスにCloud Composer
というのがあるが、GCPはComposer
をローカルで使用できるツールを用意してくれている(ありがたや)。ツールでなにをしているかというと、DockerコンテナとしてAirflow環境を作成・起動している。
もちろんDockerコンテナを使用してAirflow環境を構築しても良いのだけれど、面倒なので、こちらのツールを使用するのが良いと思う。
ツールを使用するために、Python バージョン 3.7 ~ 3.10
と pip
、それからGoogle Cloud CLI
をインストールしておくこと。
ツールはgitからcloneしてpipでインストールする。
$ gcloud auth application-default login $ gcloud auth login $ git clone https://github.com/GoogleCloudPlatform/composer-local-dev.git $ cd composer-local-dev $ pip install . $ composer-dev list-available-versions --include-past-releases --limit 10 $ composer-dev create --from-image-version composer-2.4.4-airflow-2.5.3 local_composer_env #事前にDockerを起動させておくこと! $ composer-dev start local_composer_env # composer-dev restart local_composer_env
composer-dev start local_composer_env
でAirflow環境を起動すると、次のようなメッセージが表示される。
Started local_composer_env environment. 1. You can put your DAGs in C:\Users\xxxxx\Documents\Airflow\composer-local-dev\composer\local_composer_env\dags 2. Access Airflow at http://localhost:8080
Airflow Webサーバーも起動されているので、ブラウザを開いてhttp://localhost:8080
にアクセスするとAirflow Web UIにアクセスできる。
作成した.py
ファイルはC:\Users\xxxxx\Documents\Airflow\composer-local-dev\composer\local_composer_env\dags
に保存すると、Airflow Web UIが定期的に読み込んでくれる。
Connection
先ほど作成したbq.py
をAirflowで実行してみたいので、まずbq.py
をAirflowが読み込んでくれるディレクトリに格納する。
さらに、GCPでの操作を実行するにはサービスアカウントの認証情報が必要になるので、Connectionを設定していく。
GCP上でサービスアカウントを作成し、サービスアカウントキーを作成する。サービスアカウントにはComposer ワーカー
ロールとBigQuery ユーザー
ロールを割り当てておく。
作成したサービスアカウントキーをdocker cp
コマンドでAirflowコンテナ内にコピーする。ファイルが配置されたことをコンテナに入って確認するには、docker exec -it composer-local-dev-local_composer_env /bin/bash
とする。
> docker cp [ローカルのサービスアカウントキーファイル] composer-local-dev-local_composer_env:/home/airflow/sa_key.json
準備ができたので、サービスアカウントキーをConnectionに設定する。
Airflow Web UI上で、Admin -> Connections を選択する。 google_cloud_default
というConnectionを編集し、Keyfile Pathにサービスアカウントキーのパスを指定する。
Test
ボタンで接続できることを確認したら、Save
する。
これでConnectionの設定は完了。本番環境では、Connection情報はGCPのSecret Manager上で管理すべきだが、ローカルでのテストなので上記の方法で設定した。
DAGの手動実行
DAGを実行する準備ができたので、実行してみる。但し、BigQuery上にデータセットdev-gcplab-01.devbqds01
を事前に作成しておくこと。
Airflow Web UI上で、DAGsビューでbq01の再生ボタンをクリックし、Trigger DAGをクリックしてDAGを実行する。
実行してしばらくすると、DAGのステータスがsuccess
になる。失敗した場合はerror
となる。
error
になった場合は、bq01のGraghビューに遷移し、エラーとなったタスクのLogを表示して、エラー原因を確認できる。
今回はここまで。
次回はタスクの実行順序やAirflowのアーキテクチャなどについてもまとめたい。