Airflowは、ワークフローをプログラムで作成、スケジュール、および監視するためのプラットフォームです。仕事でAirflowを採用することにしたので、今回はAirflowをローカルPC上で動かして、DockerOperatorで任意のDockerコンテナを実行しつつ、必要な要素(DAG実行時のパラメータ指定とか環境変数渡しなど)を試していきたいと思います。

Airflowについて

公式のトップページから引っ張ってきただけですが、以下のような原則・特徴があるそうです。

原則

スケーラブル
Airflowはモジュール化されたアーキテクチャを備えていて、Messageキューを使用して任意の数のワーカーを調整でき、無限に拡張できる

動的
AirflowのパイプラインはPythonで定義されており、動的にパイプラインを生成することができる

拡張可能
独自の演算子を簡単に定義し、ライブラリを拡張して、環境に適した抽象化レベルに適合させることができる

エレガント
Airflowのパイプラインは無駄がなく明確であり、パラメータ化はJinjaテンプレートエンジンを使用してコアに組み込まれている

特徴

純粋なPython
コマンドラインやXMLの黒魔術は必要なし!標準のPython機能を使用して、スケジュールの日時形式やタスクを動的に生成するループなどを実装し、ワークフローを完全な柔軟に構築することができる

便利なUI
堅牢で最新のWebアプリケーションを介してワークフローを監視、スケジュール、および管理できる。タスクのステータスとログを常に完全にWebUIから把握できる

堅牢な統合
Airflowは、GCP、AWS、Azure、およびその他の多くのサードパーティサービスで挿入して実行可能なオペレーターを備えている。これにより、Airflowを現在のインフラストラクチャに簡単に適用し、次世代テクノロジーに拡張できる

使いやすい
Pythonの知識があれば、誰でもワークフローをデプロイできる。Apache Airflowは、パイプラインの範囲を制限しない。これにより、MLモデルの構築、データの転送、インフラストラクチャの管理などをパイプラインの中で自由に定義・実行することができる

オープンソース
改善したい場合はIssueをあげることもできるし、Pull Requestをすることもできる。作業は単純でハードルも長時間の手順も必要なし

採用理由

ワークフローエンジンはこれまで Jenkins を利用していたのですが、何でもできる反面設定が複雑で古臭い印象があり、別のワークフローエンジンに乗り換えたいと思っていました。以下の要件で乗り換え先を探し始めたのですが、Airflowが全て条件を満たしていたので採用することにしました。

  • UIからパラメータを指定してジョブを実行できる
    • => DAG実行時にJSONで指定できる。v1.10.10 で導入された
  • ジョブ実行履歴およびログをUIで確認できる
    • => 充実したWebUIが提供されており、履歴やログを見るぐらいは余裕でできる
  • ジョブを個別の独立した実行環境で実行できる
    • => DockerOperatorを使えばできる

検証

DockerでAirflowを動かす

本来はこちらを元にインストールした方が勉強になると思うのですが、今回はサクッとDockerでAirflowを動かして、さらにその中でDockerOperatorを使って任意のアプリケーションを動かせるかどうかを検証したいので、docker-airflowを利用することにしました。

dockerおよびdocker-composeをインストール

まだインストールしてない場合は、以下のサイトを参考にインストールします。

git clone

docker-airflowをgit cloneします。

git clone https://github.com/puckel/docker-airflow.git
cd docker-airflow

Dockerイメージを作成

DockerFileを元にpuckel/docker-airflow:latestというタグ名でDockerイメージを作成します。

docker build --rm --build-arg AIRFLOW_DEPS="datadog,dask" --build-arg PYTHON_DEPS="flask_oauthlib>=0.9" -t puckel/docker-airflow .
> Step 1/25 : FROM python:3.7-slim-buster
>  ---> 9703nlsdfgego
> Step 25/25 : CMD ["webserver"]
>  ---> Running in 7sadkdbafsv
> 省略
> Running in bf7nldasferb
> Removing intermediate container we4lndfg9fg
>  ---> 8safdklje4nfgdskjg
> Successfully built 8safdklje4nfgdskjg
> Successfully tagged puckel/docker-airflow:latest

Airflowを起動

Executorとはタスクスケジューラのことで色々な選択肢があるようなのですが、今回は Celery を利用するCeleryExecutorで検証を進めようと思います。
docker-composeでAirflowを起動します。

docker-compose -f docker-compose-CeleryExecutor.yml up -d
> Starting docker-airflow_webserver_1 ... done
> Starting docker-airflow_scheduler_1 ... done
> Starting docker-airflow_flower_1    ... done
> Starting docker-airflow_worker_1    ... done
> Starting docker-airflow_postgres_1 ... done
> Starting docker-airflow_redis_1    ... done

起動してみるとわかるのですが、色々なコンテナが起動します。上から順に軽く説明します。

  • webserver => AirflowのWebUI。ここからDAGを実行したり実行結果を確認したりする。Flask で実装されている。
  • scheduler => タスクケジューラのCeleryを起動
  • flower => CeleryをモニタリングするためのWebUIであるFlower を起動
  • worker => タスクが実行される
  • postgres => ジョブ管理用の各種データを管理する
  • redis => ブローカー(メッセージキュー)として、schedulerとworkerを仲介する

Web UI表示

以下の画面が表示されればここまでは問題なく動いています

チュートリアルDAGを実行

http://localhost:8080 で開くAirflowのトップページには読み込んだDAGの一覧が表示されます。
デフォルトでturorial(tuto.py)が表示されていていますが、Off=>OnにtoggleすることでDAGが有効化され、過去にスケジュールされたDAG(タスクパイプライン)がまとめて実行されます。

DAG名のリンクをクリックすると詳細画面(Tree View)に遷移します。ここではDAGの定義がTree形式で左側に表示され、右側にDAGの実行履歴が表示されます。縦で一つのDAG実行になります。

■が一つのタスクになっていて、クリックすると以下の画面が表示されます。

こちらから、ログを表示したりすることができます。

あとはTrigger DAGからDAGを手動で実行することもできます。

チュートリアルDAGコードを確認

DAGのコードはCodeボタンから確認できます。プロジェクト配下のdags/tuto.pyと同一です。

コード全文です。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2015, 6, 1),
    "email": ["airflow@airflow.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

dag = DAG("tutorial", default_args=default_args, schedule_interval=timedelta(1))

## t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(task_id="print_date", bash_command="date", dag=dag)

t2 = BashOperator(task_id="sleep", bash_command="sleep 5", retries=3, dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id="templated",
    bash_command=templated_command,
    params={"my_param": "Parameter I passed in"},
    dag=dag,
)

t2.set_upstream(t1)
t3.set_upstream(t1)
  • default_argsでDAGレベルでのオペレータのパラメータを定義します。定義できる値はBaseOperatorのパラメータ です。こちらはあくまでデフォルト値なのでOperatorレベルで上書きすることができます。
  • DAG("tutorial", schedule_interval=timedelta(1))で1日毎にスケジュール実行する設定になっている
  • t1, t2, t3は全てBashOperatorを利用している
    • BashOperatorはwokerコンテナのlinuxサーバ内で実行されるため、必要なlinuxパッケージは、DockerFileでapt getなどで事前にインストールする
  • t2.set_upstream(t1)t3.set_upstream(t1)は、t2t3の上流がt1という設定だが少しわかりにくい
    • >>を使って実装できる。以下のどちらでもOK
    • t1 >> (t2, t3)
    • t1 >> [t2, t3]

DockerOperatorを動かす

とりあえず、tutorialのDAGを実行し内容もだいたい把握できた気がするので、次は本命のDockerOperatorを試していきたいと思います。

DAGを作成

/dags配下にdocker_bash.pyを作ります。これは、DockerOperatorでcentos:latestイメージを使って、bashコマンド(30秒waitする)を実行するDAGです。

内容は見れば分かる様な内容だと思います。一点補足すると、docker_url="unix://var/run/docker.sock"はデフォルト値そのままなのにあえて書いてます。これは、DockerデーモンのUnixソケットパスで、これを通じてDockerコンテナの起動などを行います。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.docker_operator import DockerOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2020, 11, 1),
    "email": ["airflow@airflow.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

with DAG("docker_bash_sample", default_args=default_args, schedule_interval=timedelta(1)) as dag:
    t1 = BashOperator(task_id="print_start", bash_command="echo $(date '+%Y-%m-%d %H:%M:%S') started.")

    t2 = DockerOperator(
        task_id='docker_command',
        image='centos:latest',
        api_version='auto',
        auto_remove=True,
        command="""/bin/bash -c \'echo "1" && sleep 30 && echo "2"\'""",
        docker_url="unix://var/run/docker.sock",
        network_mode="bridge",
    )

    t3 = BashOperator(task_id="print_finish", bash_command="echo $(date '+%Y-%m-%d %H:%M:%S') finished.")
    t1 >> t2 >> t3

トラブル対応

AirflowのWebUIをブラウザでreloadするだけで新しいDAGが表示されますので、そこからDAGを実行してサクッと動けばいいのですが、色々とエラーが発生しますので、対応していく必要があります。

dockerパッケージが見つからない

Broken DAG: [/usr/local/airflow/dags/docker_bash.py] No module named 'docker'

Dockerfileでpythonのdockerパッケージをインストールします。

RUN pip install docker

さらに、docker-compose-CeleryExecutor.ymlを修正します。実はここまではせっかく作ったAirflowのDockerイメージpuckel/docker-airflow:latestではなく、Docker Registryからダウンロードしたpuckel/docker-airflow:1.10.9を利用していました。まずはその部分を修正します。

-        image: puckel/docker-airflow:1.10.9
+        image: puckel/docker-airflow:latest

ついでに、apache-airflowのバージョンが1.10.9と少し古いので、1.10.12にあげておきます。Dockerfileの以下の部分を修正します。

-ARG AIRFLOW_VERSION=1.10.9
+ARG AIRFLOW_VERSION=1.10.12
SQLAlchemyでエラー

sqlalchemy.exc.NoInspectionAvailable: No inspection system is available for object of type <class 'method'>

こちらでissueが上がっているのですが、2020年11月時点だとSQLAlchemyの最新バージョンには少し問題があるようです。いずれ解決されると思いますが、現状においてはDockerfileに以下を追加して回避する必要があります。

RUN pip install 'SQLAlchemy==1.3.15'
cattrが見つからない

ImportError: cannot import name 'resolve_types' from 'attr' (/usr/local/lib/python3.7/site-packages/attr/init.py)

これはcattrというパッケージが存在しないためにおこっている様なのですが、これもおそらく一時的なものです。こちらに書いてある様に、Dockerfileに対応を入れました。

RUN pip install apache-airflow==1.10.12 \
 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-1.10.12/constraints-3.7.txt"
コンテナ内からホストマシンのDockerデーモンにアクセスできない

{{taskinstance.py:1128}} ERROR - Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))

DockerOperatorの引数で指定した、DockerデーモンのUnixソケットパスdocker_url="unix://var/run/docker.sock"が見つからないというエラーでした。こちらを参考に対応していきます。

まず、docker-compose-CeleryExecutor.ymlのworkerコンテナの設定を追加します。これでホストマシンのDockerデーモンのUnixソケットパスがマウントされます。

     worker:
         volumes:
             - ./dags:/usr/local/airflow/dags
+            - /var/run/docker.sock:/var/run/docker.sock

この状態だとまだ権限が足りず、以下のエラーが発生します。

{{taskinstance.py:1128}} ERROR - Error while fetching server API version: ('Connection aborted.', PermissionError(13, 'Permission denied'))

まずDockerfileを修正して、airflowユーザにsudo権限を与えます。

RUN apt-get update && \
    apt-get -y install sudo
RUN echo airflow:airflow | chpasswd && adduser airflow sudo

さらにentrypoint.shを修正し、コンテナ起動時に/var/run/docker.sockのパーミッションを変更します。

if [ -e /var/run/docker.sock ]; then 
  echo airflow | sudo -S chmod 777 /var/run/docker.sock; 
fi

DAGを実行

ここまででやっとDAGが正常に実行されます。実行ログはこんな感じになりました。赤枠のところを見ると、30秒スリープしてINFO - 2というログが出力されていることが分かると思います。

参考までですが、残念なことにコンテナ内のログが最初の数行?だけ出力されません。上のキャプチャでも本来INFO - 1と出力されて欲しいのですが出力されていません。とはいえ、影響も少ないですし、ISSUEも上がっているみたいなのでいずれ改善されると思います。

自分で作ったDockerイメージをDockerOperatorで動かす

ここは検証せずとも動くはずですが、pythonからprintしたログがちゃんと出力されるか、など少し気になるのでやっておきます。

Dockerイメージを作成

/docker_python_sampleというフォルダを作ってそのなかにDockerfileを作成します。entrypointでpythonを実行してprintするだけの超シンプルな内容です。

FROM python:latest
ENTRYPOINT [ "python", "-c", "print('python executed.')" ]

上記のDockerfileからDockerイメージを作ります。

docker build -t python-sample:latest ./docker_python_sample

DAGを作成

/dags配下にdocker_python.pyを作りました。内容はdocker_bash.pyとほぼ同じで、DockerOperatorのimageパラメータを変更しただけです。

    t2 = DockerOperator(
        image='python_sample:latest',
    )

DAGを実行

DAGをtriggerしたところ問題なく実行されました。少し懸念していたログも問題なく出力されている様です。

環境変数を読み込む

Dockerコンテナを起動する際、環境変数ファイル(.envなど)から環境変数を読み込むことが多いと思いますが、DockerOperatorには環境変数を読み込む機能はないみたいです。しかし、environmentパラメータにdict形式で環境変数を渡すことはできます。なので、python-dotenvを使って環境変数ファイルをdictに読み込んでenvironmentパラメータに渡すようにしました。

まずは、Dockerfileにpython-dotenvインストール処理を追加します。

RUN pip install python-dotenv

次に、docker-compose-CeleryExecutor.ymlに環境変数ファイルのマウント設定を追加します。

    worker:
        volumes:
            - ./envfiles:/usr/local/airflow/envfiles

最後に、DockerOperatorを実行する箇所で、環境変数ファイルを読み込んで渡します。

from dotenv import dotenv_values
DockerOperator(
    image="python:latest",
    environment=dotenv_values(dotenv_path='/usr/local/airflow/envfiles/sample.env'),  #  環境変数読み込み
    command="""python -c "from os import environ; print(environ.get('KEY1')); print(environ.get('KEY2'));" """  #  環境変数をログ出力
)

問題なく環境変数がロードされていることをログで確認できました。

[2020-11-07 01:51:09,385] {{docker_operator.py:210}} INFO - Starting docker container from image python:latest
[2020-11-07 01:51:09,912] {{docker_operator.py:243}} INFO - VALUE1  # <= 環境変数の値がログ出力されている
VALUE2  # <= 環境変数の値がログ出力されている

クレデンシャルファイルを読み込む

クレデンシャルファイル(例えばGCPのサービスアカウントのJSONキーなど)をDockerイメージに含めるのは避けたいので、DockerOperatorのvolumesパラメータでクレデンシャルファイルをマウントしてDockerOperator内から参照できるようにします。

DockerOperator(
    volumes=["/Users/rinoguchi/workspace/docker-airflow/credentials/:/credentials/"],
)

実はこの設定でめっちゃハマりました。
volumes=<host_path>:<container_path>という形式で値を設定するのですが、<host_path>の部分はMacOS上でのクレデンシャルファイルのパスを指定する必要があります。今回の構成では、MacOSのDockerエンジン上でAirflowのDockerコンテナを立ち上げ、そこでDockerOperatorでアプリを起動するものの、アプリのDockerコンテナ自体はMacOS上のDockerエンジンで動くためです。

ちなみに、間違って'valumes=/usr/local/airflow/credentials/:/credentials/のようにAirflowのDockerコンテナ内のパスを指定していた時は、以下のようなエラーが出てました。Dockerはマウント元のファイルやディレクトリが存在しない時、空のディレクトリとしてマウントしてしまうらしいです。

IsADirectoryError: [Errno 21] Is a directory: '/credentials/test-docker-operator-xxxxxxxx.json'

DAG実行時にJSONでパラメータ設定

Airflowのバージョンがv1.10.10以降であればDAG実行時に、JSON形式でパラメータを設定することができます。公式のドキュメントはこちらです。

今回は、「pythonのDockerイメージのバージョン」と「pythonコマンドへの引数」をパラメータとして渡すことを考えます。

まずは、DAGを実装します。

    DockerOperator.template_fields = tuple(list(DockerOperator.template_fields) + ["image"])  # NOTE: テンプレート対象に`image`を追加
    DockerOperator(
        image="python:{{ dag_run.conf['version'] }}",
        command="python {{ dag_run.conf['args'] }}",
    )

Airflowはこちらに記載されているように、JINJA2テンプレートを使ってパラメータ埋め込みとマクロを提供しています。JINJA2で利用可能な変数やマクロはこちらで確認できます。

DAG実行時のパラメータは、Operatorの引数のうち、JINJA2テンプレート対応した引数の中でのみ利用可能で、dag_run.conf変数に格納されています。テンプレート対応しているかどうかは、APIドキュメント(templated)と記載されているのでそこで確認するか、docstringでも確認できます。

今回は、commandはテンプレート対象だったのですが、imageは対象外でした。実装を確認したところ、テンプレート対象かどうかはOperatorクラスのtemplate_fieldsというクラス変数で定義されていたため、それを上書きしてあげることでimageもテンプレート対象にすることができました。

あとは、Configuration JSON (Optional)に以下のように設定し、DAGを実行します。

{
    "version": "3.10.0a2",
    "args": "-c \"from datetime import datetime; print(datetime.now())\""
}

ちゃんと期待通りに実行されていることをログで確認できました。

[2020-11-07 02:15:28,890] {{docker_operator.py:210}} INFO - Starting docker container from image python:3.10.0a2  # <= 指定したバージョンになってる
[2020-11-07 02:15:29,372] {{docker_operator.py:243}} INFO - 2020-11-07 02:15:29.372122  # <= 指定した`datetime.now()`が実行されている

GCR(Google Cloud Registry)上のイメージを利用する

GCRはPrivate Registoryなので認証しないとDockerイメージをpullすることができません。通常、GCRからイメージをpullする際はdockerコマンドの設定を追加して、DCRからdocker pull際にはgcloudコマンドをクレデンシャルヘルパーとして利用し認証を行ってpullします。

しかしDockerOperatorの場合はgcloudコマンドを利用できませんので、DockerHookを使って認証を行います。

まずは、WebUIのメニューから「Admin」>「Connections」をクリックしてConnections画面を開き、さらに「Create」をクリックして以下のようにConnection定義を作成します。

  • Conn Id: 好きな値を
  • Conn Type: Docker Registryを選択
  • Registry URL: GCRのリポジトリ名を正確に記載
  • Username: _json_keyを固定で記載
  • Password: サービスアカウントのJSONキーファイルの中身のjson全体をCopy&Paste

次に、DockerOperatorの引数を以下のようにします。

  • image: GCRのリポジトリ名を記載
  • docker_conn_id: 先ほど作成してコネクション定義のConn Idを記載
    DockerOperator(
        image="asia.gcr.io/cloud-registry-test/python-sample:latest",
        docker_conn_id="gcr_conn",
    )

DAGを実行すると以下のようにログが出力され、GCRからpython-sample:latestのイメージをpullして実行できていることがわかりました。

[2020-11-07 07:01:57,489] {{taskinstance.py:901}} INFO - Executing <Task(DockerOperator): pull_from_gcr> on 2020-11-07T07:01:52.591311+00:00
[2020-11-07 07:01:57,493] {{standard_task_runner.py:54}} INFO - Started process 65 to run task
[2020-11-07 07:01:57,523] {{standard_task_runner.py:77}} INFO - Running: ['airflow', 'run', 'pull_from_gcr', 'pull_from_gcr', '2020-11-07T07:01:52.591311+00:00', '--job_id', '42', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/pull_from_gcr.py', '--cfg_path', '/tmp/tmpwayn7gp0']
[2020-11-07 07:01:57,525] {{standard_task_runner.py:78}} INFO - Job 42: Subtask pull_from_gcr
[2020-11-07 07:01:57,582] {{logging_mixin.py:112}} INFO - Running %s on host %s <TaskInstance: pull_from_gcr.pull_from_gcr 2020-11-07T07:01:52.591311+00:00 [running]> b87faasdfjhe
[2020-11-07 07:01:57,631] {{base_hook.py:89}} INFO - Using connection to: id: gcr_conn. Host: asia.gcr.io/cloud-registry-test, Port: None, Schema: None, Login: _json_key, Password: XXXXXXXX, extra: None # <= GCRに接続している
[2020-11-07 07:01:58,735] {{docker_operator.py:268}} INFO - Pulling docker image asia.gcr.io/cloud-registry-test/python-sample:latest # <= GCRからイメージをpullしている
[2020-11-07 07:01:59,825] {{docker_operator.py:273}} INFO - Pulling from cloud-registry-test/python-sample
[2020-11-07 07:01:59,829] {{docker_operator.py:273}} INFO - Digest: sha256:7vfq87vqreghjiqwef978vchdqwuirweokj2fed8becd235
[2020-11-07 07:01:59,839] {{docker_operator.py:273}} INFO - Status: Downloaded newer image for asia.gcr.io/cloud-registry-test/python-sample:latest # <= イメージのダウンロード完了
[2020-11-07 07:01:59,840] {{docker_operator.py:210}} INFO - Starting docker container from image asia.gcr.io/cloud-registry-test/python-sample:latest
[2020-11-07 07:02:00,317] {{docker_operator.py:243}} INFO - python executed. # <= コンテナ起動時の`ENTRYPOINT [ "python", "-c", "print('python executed.')" ]`の出力結果
[2020-11-07 07:02:00,553] {{taskinstance.py:1070}} INFO - Marking task as SUCCESS.dag_id=pull_from_gcr, task_id=pull_from_gcr, execution_date=20201107T070152, start_date=20201107T070157, end_date=20201107T070200
[2020-11-07 07:02:02,450] {{local_task_job.py:102}} INFO - Task exited with return code 0

Airflow再起動時の注意点

今回の構成では、docker-composeでAirflowのDockerコンテナ群を起動しています。Dockerfilelを更新した際にはコンテナを再作成したいですが、postgresにDAGの実行履歴や管理画面での設定が記録されていますので、これを絶対に削除してはいけません。

何も考えず以下を実行すると、全てのコンテナが削除されるので要注意です。絶対にdocker-compose downを実行しないようにしましょう。

docker-compose -f docker-compose-CeleryExecutor.yml down # <= これで全コンテナが削除される!
docker-compose -f docker-compose-CeleryExecutor.yml up -d

たぶん、docker-compose downさえやらなければ問題ないのですが、個人的には再作成が必要な時は、以下のようにpostgresredis以外のコンテナを明示的に指定して再作成するようにしてます。変更がないコンテナは再作成されません。--no-depsをつけることで他のコンテナとの依存関係を無視してくれます。

docker-compose -f docker-compose-CeleryExecutor.yml up -d --no-deps scheduler flower webserver worker
> docker-airflow_flower_1  is up-to-date
> Recreating docker-airflow_webserver_1 ... 
> Recreating docker-airflow_webserver_1 ... done
> docker-airflow_scheduler_1 is up-to-date
> docker-airflow_worker_1 is up-to-date

ちなみに、コンテナをリスタートしたいだけの時は、以下のようにしてます。

docker-compose -f docker-compose-CeleryExecutor.yml restart webserver

さいごに

AirflowでDockerOperatorを動かす件、ネット上に情報が少なくて思ったより時間がかかりましたが、大体のケースは網羅できたんじゃないかと思いますので、ここらで検証は終わりにしようと思います。

ちなみに、docker-airflowをforkして検証したリポジトリは こちら になりますので、興味があれば参照ください。