Airflowを試してみる ~ Quick Start

AirflowのQuick Startを参考にして、WSL Ubuntu 20.04上にAirflowをインストールする。

以下はQuick Startからの抜粋。

# Airflow needs a home. `~/airflow` is the default, but you can put it
# somewhere else if you prefer (optional)
export AIRFLOW_HOME=~/airflow

# Install Airflow using the constraints file
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
# For example: 3.7
# For example:
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

# The Standalone command will initialise the database, make a user,
# and start all components for you.
airflow standalone

# Visit localhost:8080 in the browser and use the admin account details
# shown on the terminal to login.
# Enable the example_bash_operator dag in the home page

airflow standaloneを実行した結果、airflow.cfg ファイルが作られる。

ls airflow/  airflow.cfg  airflow.db  logs  standalone_admin_password.txt

airflow.cfg の先頭50行を抜粋。様々な変数に値が格納されている。環境変数を使用して、デフォルトの値をオーバーライドができるらしい。

You can override defaults using environment variables


# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository. This path must be absolute.
dags_folder = /home/bluen/airflow/dags

# Hostname by providing a path to a callable, which will resolve the hostname.
# The format is "package.function".
# For example, default value "" means that result from patched
# version of socket.getfqdn() - see
# No argument should be required in the function specified.
# If using IP address as hostname is preferred, use value ````
hostname_callable =

# Default timezone in case supplied date times are naive
# can be utc (default), system, or any IANA timezone string (e.g. Europe/Amsterdam)
default_timezone = utc

# The executor class that airflow should use. Choices include
# ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, ``DaskExecutor``,
# ``KubernetesExecutor``, ``CeleryKubernetesExecutor`` or the
# full import path to the class when using a custom executor.
executor = SequentialExecutor

# This defines the maximum number of task instances that can run concurrently per scheduler in
# Airflow, regardless of the worker count. Generally this value, multiplied by the number of
# schedulers in your cluster, is the maximum number of task instances with the running
# state in the metadata database.
parallelism = 32

# The maximum number of task instances allowed to run concurrently in each DAG. To calculate
# the number of tasks that is running concurrently for a DAG, add up the number of running
# tasks for all DAG runs of the DAG. This is configurable at the DAG level with ``max_active_tasks``,
# which is defaulted as ``max_active_tasks_per_dag``.
# An example scenario when this would be useful is when you want to stop a new dag with an early
# start date from stealing all the executor slots in a cluster.
max_active_tasks_per_dag = 16

# Are DAGs paused by default at creation
dags_are_paused_at_creation = True

# The maximum number of active DAG runs per DAG. The scheduler will not create more DAG runs
# if it reaches the limit. This is configurable at the DAG level with ``max_active_runs``,
# which is defaulted as ``max_active_runs_per_dag``.
max_active_runs_per_dag = 16

# Whether to load the DAG examples that ship with Airflow. It's good to
# get started, but you probably want to set this to ``False`` in a production

airflow.cfg の中にexecutor = SequentialExecutor という記述がある。SequentialExecutorというオプションでは、タスクをシングルインスタンスで実行するらしい。SequentialExecutor 以外にもLocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor, CeleryKubernetesExecutor といったExecutorが存在する。各Executorについては、Executor Types を参照してみるとよさそう。

今回は airflow standalone コマンドでAirflowサーバを起動したため、あくまでスタンドアロン、つまり SequentialExecutor となっている模様。

ちなみに、 airflow standalone コマンドでは、以下のコマンドをまとめて実行してくれているらしい。ざっと見る限り、DBを起動してユーザ作成し、ウェブサーバを起動してスケジューラーを起動しているっぽい。



airflow db init

airflow users create \
    --username admin \
    --firstname Peter \
    --lastname Parker \
    --role Admin \

airflow webserver --port 8080

airflow scheduler

ブラウザで http://localhost:8080/login/ にアクセスする。UsernameとPasswordは airflow standalone コマンドを実行したときに出力されていた値を利用する。以下のように出力されているはず。

standalone | Airflow is ready
standalone | Login with username: admin  password: kTBTQWBKZAP2S5Ns
standalone | Airflow Standalone is for development purposes only. Do not use this in production!


GUI上で、example_bash_operator の▶ボタンをクリックして、DAGを実行してみる。DAGが実行されていることがわかる。

DAGの詳細を見てみる。example_bash_operator は何をやっているのだろう?よくわからない。。。