Airflowは、ワークフローをプログラムで作成、スケジュール、および監視するためのプラットフォームです。仕事でAirflowを採用することにしたので、今回はAirflowをローカルPC上で動かして、DockerOperatorで任意のDockerコンテナを実行しつつ、必要な要素(DAG実行時のパラメータ指定とか環境変数渡しなど)を試していきたいと思います。
Airflowについて
公式のトップページから引っ張ってきただけですが、以下のような原則・特徴があるそうです。
原則
スケーラブル
Airflowはモジュール化されたアーキテクチャを備えていて、Messageキューを使用して任意の数のワーカーを調整でき、無限に拡張できる
動的
AirflowのパイプラインはPythonで定義されており、動的にパイプラインを生成することができる
拡張可能
独自の演算子を簡単に定義し、ライブラリを拡張して、環境に適した抽象化レベルに適合させることができる
エレガント
Airflowのパイプラインは無駄がなく明確であり、パラメータ化はJinjaテンプレートエンジンを使用してコアに組み込まれている
特徴
純粋なPython
コマンドラインやXMLの黒魔術は必要なし!標準のPython機能を使用して、スケジュールの日時形式やタスクを動的に生成するループなどを実装し、ワークフローを完全な柔軟に構築することができる
便利なUI
堅牢で最新のWebアプリケーションを介してワークフローを監視、スケジュール、および管理できる。タスクのステータスとログを常に完全にWebUIから把握できる
堅牢な統合
Airflowは、GCP、AWS、Azure、およびその他の多くのサードパーティサービスで挿入して実行可能なオペレーターを備えている。これにより、Airflowを現在のインフラストラクチャに簡単に適用し、次世代テクノロジーに拡張できる
使いやすい
Pythonの知識があれば、誰でもワークフローをデプロイできる。Apache Airflowは、パイプラインの範囲を制限しない。これにより、MLモデルの構築、データの転送、インフラストラクチャの管理などをパイプラインの中で自由に定義・実行することができる
オープンソース
改善したい場合はIssueをあげることもできるし、Pull Requestをすることもできる。作業は単純でハードルも長時間の手順も必要なし
採用理由
ワークフローエンジンはこれまで Jenkins を利用していたのですが、何でもできる反面設定が複雑で古臭い印象があり、別のワークフローエンジンに乗り換えたいと思っていました。以下の要件で乗り換え先を探し始めたのですが、Airflowが全て条件を満たしていたので採用することにしました。
- UIからパラメータを指定してジョブを実行できる
- => DAG実行時にJSONで指定できる。v1.10.10 で導入された
- ジョブ実行履歴およびログをUIで確認できる
- => 充実したWebUIが提供されており、履歴やログを見るぐらいは余裕でできる
- ジョブを個別の独立した実行環境で実行できる
- => DockerOperatorを使えばできる
検証
DockerでAirflowを動かす
本来はこちらを元にインストールした方が勉強になると思うのですが、今回はサクッとDockerでAirflowを動かして、さらにその中でDockerOperatorを使って任意のアプリケーションを動かせるかどうかを検証したいので、docker-airflowを利用することにしました。
dockerおよびdocker-composeをインストール
まだインストールしてない場合は、以下のサイトを参考にインストールします。
git clone
docker-airflowをgit cloneします。
git clone https://github.com/puckel/docker-airflow.git
cd docker-airflow
Dockerイメージを作成
DockerFileを元にpuckel/docker-airflow:latest
というタグ名でDockerイメージを作成します。
docker build --rm --build-arg AIRFLOW_DEPS="datadog,dask" --build-arg PYTHON_DEPS="flask_oauthlib>=0.9" -t puckel/docker-airflow .
> Step 1/25 : FROM python:3.7-slim-buster
> ---> 9703nlsdfgego
> Step 25/25 : CMD ["webserver"]
> ---> Running in 7sadkdbafsv
> 省略
> Running in bf7nldasferb
> Removing intermediate container we4lndfg9fg
> ---> 8safdklje4nfgdskjg
> Successfully built 8safdklje4nfgdskjg
> Successfully tagged puckel/docker-airflow:latest
Airflowを起動
Executorとはタスクスケジューラのことで色々な選択肢があるようなのですが、今回は Celery を利用するCeleryExecutorで検証を進めようと思います。
docker-composeでAirflowを起動します。
docker-compose -f docker-compose-CeleryExecutor.yml up -d
> Starting docker-airflow_webserver_1 ... done
> Starting docker-airflow_scheduler_1 ... done
> Starting docker-airflow_flower_1 ... done
> Starting docker-airflow_worker_1 ... done
> Starting docker-airflow_postgres_1 ... done
> Starting docker-airflow_redis_1 ... done
起動してみるとわかるのですが、色々なコンテナが起動します。上から順に軽く説明します。
- webserver => AirflowのWebUI。ここからDAGを実行したり実行結果を確認したりする。Flask で実装されている。
- scheduler => タスクケジューラのCeleryを起動
- flower => CeleryをモニタリングするためのWebUIであるFlower を起動
- worker => タスクが実行される
- postgres => ジョブ管理用の各種データを管理する
- redis => ブローカー(メッセージキュー)として、schedulerとworkerを仲介する
Web UI表示
以下の画面が表示されればここまでは問題なく動いています
- airflow
- flower
チュートリアルDAGを実行
http://localhost:8080 で開くAirflowのトップページには読み込んだDAGの一覧が表示されます。
デフォルトでturorial(tuto.py)が表示されていていますが、Off=>OnにtoggleすることでDAGが有効化され、過去にスケジュールされたDAG(タスクパイプライン)がまとめて実行されます。
DAG名のリンクをクリックすると詳細画面(Tree View)に遷移します。ここではDAGの定義がTree形式で左側に表示され、右側にDAGの実行履歴が表示されます。縦で一つのDAG実行になります。
■が一つのタスクになっていて、クリックすると以下の画面が表示されます。
こちらから、ログを表示したりすることができます。
あとはTrigger DAG
からDAGを手動で実行することもできます。
チュートリアルDAGコードを確認
DAGのコードはCodeボタンから確認できます。プロジェクト配下のdags/tuto.py
と同一です。
コード全文です。
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2015, 6, 1),
"email": ["airflow@airflow.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
dag = DAG("tutorial", default_args=default_args, schedule_interval=timedelta(1))
## t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(task_id="print_date", bash_command="date", dag=dag)
t2 = BashOperator(task_id="sleep", bash_command="sleep 5", retries=3, dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id="templated",
bash_command=templated_command,
params={"my_param": "Parameter I passed in"},
dag=dag,
)
t2.set_upstream(t1)
t3.set_upstream(t1)
-
default_args
でDAGレベルでのオペレータのパラメータを定義します。定義できる値はBaseOperatorのパラメータ です。こちらはあくまでデフォルト値なのでOperatorレベルで上書きすることができます。 -
DAG("tutorial", schedule_interval=timedelta(1))
で1日毎にスケジュール実行する設定になっている -
t1
,t2
,t3
は全てBashOperatorを利用している- BashOperatorはwokerコンテナのlinuxサーバ内で実行されるため、必要なlinuxパッケージは、DockerFileで
apt get
などで事前にインストールする
- BashOperatorはwokerコンテナのlinuxサーバ内で実行されるため、必要なlinuxパッケージは、DockerFileで
-
t2.set_upstream(t1)
、t3.set_upstream(t1)
は、t2
とt3
の上流がt1
という設定だが少しわかりにくい-
>>
を使って実装できる。以下のどちらでもOK t1 >> (t2, t3)
t1 >> [t2, t3]
-
DockerOperatorを動かす
とりあえず、tutorialのDAGを実行し内容もだいたい把握できた気がするので、次は本命のDockerOperatorを試していきたいと思います。
DAGを作成
/dags
配下にdocker_bash.py
を作ります。これは、DockerOperatorでcentos:latest
イメージを使って、bashコマンド(30秒waitする)を実行するDAGです。
内容は見れば分かる様な内容だと思います。一点補足すると、docker_url="unix://var/run/docker.sock"
はデフォルト値そのままなのにあえて書いてます。これは、DockerデーモンのUnixソケットパスで、これを通じてDockerコンテナの起動などを行います。
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.docker_operator import DockerOperator
from datetime import datetime, timedelta
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2020, 11, 1),
"email": ["airflow@airflow.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
with DAG("docker_bash_sample", default_args=default_args, schedule_interval=timedelta(1)) as dag:
t1 = BashOperator(task_id="print_start", bash_command="echo $(date '+%Y-%m-%d %H:%M:%S') started.")
t2 = DockerOperator(
task_id='docker_command',
image='centos:latest',
api_version='auto',
auto_remove=True,
command="""/bin/bash -c \'echo "1" && sleep 30 && echo "2"\'""",
docker_url="unix://var/run/docker.sock",
network_mode="bridge",
)
t3 = BashOperator(task_id="print_finish", bash_command="echo $(date '+%Y-%m-%d %H:%M:%S') finished.")
t1 >> t2 >> t3
トラブル対応
AirflowのWebUIをブラウザでreloadするだけで新しいDAGが表示されますので、そこからDAGを実行してサクッと動けばいいのですが、色々とエラーが発生しますので、対応していく必要があります。
dockerパッケージが見つからない
Broken DAG: [/usr/local/airflow/dags/docker_bash.py] No module named 'docker'
Dockerfileでpythonのdocker
パッケージをインストールします。
RUN pip install docker
さらに、docker-compose-CeleryExecutor.yml
を修正します。実はここまではせっかく作ったAirflowのDockerイメージpuckel/docker-airflow:latest
ではなく、Docker Registryからダウンロードしたpuckel/docker-airflow:1.10.9
を利用していました。まずはその部分を修正します。
- image: puckel/docker-airflow:1.10.9
+ image: puckel/docker-airflow:latest
ついでに、apache-airflowのバージョンが1.10.9
と少し古いので、1.10.12
にあげておきます。Dockerfileの以下の部分を修正します。
-ARG AIRFLOW_VERSION=1.10.9
+ARG AIRFLOW_VERSION=1.10.12
SQLAlchemyでエラー
sqlalchemy.exc.NoInspectionAvailable: No inspection system is available for object of type <class 'method'>
こちらでissueが上がっているのですが、2020年11月時点だとSQLAlchemyの最新バージョンには少し問題があるようです。いずれ解決されると思いますが、現状においてはDockerfileに以下を追加して回避する必要があります。
RUN pip install 'SQLAlchemy==1.3.15'
cattrが見つからない
ImportError: cannot import name 'resolve_types' from 'attr' (/usr/local/lib/python3.7/site-packages/attr/init.py)
これはcattr
というパッケージが存在しないためにおこっている様なのですが、これもおそらく一時的なものです。こちらに書いてある様に、Dockerfileに対応を入れました。
RUN pip install apache-airflow==1.10.12 \
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-1.10.12/constraints-3.7.txt"
コンテナ内からホストマシンのDockerデーモンにアクセスできない
{{taskinstance.py:1128}} ERROR - Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))
DockerOperatorの引数で指定した、DockerデーモンのUnixソケットパスdocker_url="unix://var/run/docker.sock"
が見つからないというエラーでした。こちらを参考に対応していきます。
まず、docker-compose-CeleryExecutor.yml
のworkerコンテナの設定を追加します。これでホストマシンのDockerデーモンのUnixソケットパスがマウントされます。
worker:
volumes:
- ./dags:/usr/local/airflow/dags
+ - /var/run/docker.sock:/var/run/docker.sock
この状態だとまだ権限が足りず、以下のエラーが発生します。
{{taskinstance.py:1128}} ERROR - Error while fetching server API version: ('Connection aborted.', PermissionError(13, 'Permission denied'))
まずDockerfileを修正して、airflowユーザにsudo権限を与えます。
RUN apt-get update && \
apt-get -y install sudo
RUN echo airflow:airflow | chpasswd && adduser airflow sudo
さらにentrypoint.sh
を修正し、コンテナ起動時に/var/run/docker.sock
のパーミッションを変更します。
if [ -e /var/run/docker.sock ]; then
echo airflow | sudo -S chmod 777 /var/run/docker.sock;
fi
DAGを実行
ここまででやっとDAGが正常に実行されます。実行ログはこんな感じになりました。赤枠のところを見ると、30秒スリープしてINFO - 2
というログが出力されていることが分かると思います。
参考までですが、残念なことにコンテナ内のログが最初の数行?だけ出力されません。上のキャプチャでも本来INFO - 1
と出力されて欲しいのですが出力されていません。とはいえ、影響も少ないですし、ISSUEも上がっているみたいなのでいずれ改善されると思います。
自分で作ったDockerイメージをDockerOperatorで動かす
ここは検証せずとも動くはずですが、pythonからprintしたログがちゃんと出力されるか、など少し気になるのでやっておきます。
Dockerイメージを作成
/docker_python_sample
というフォルダを作ってそのなかにDockerfileを作成します。entrypointでpythonを実行してprintするだけの超シンプルな内容です。
FROM python:latest
ENTRYPOINT [ "python", "-c", "print('python executed.')" ]
上記のDockerfileからDockerイメージを作ります。
docker build -t python-sample:latest ./docker_python_sample
DAGを作成
/dags
配下にdocker_python.py
を作りました。内容はdocker_bash.py
とほぼ同じで、DockerOperatorのimage
パラメータを変更しただけです。
t2 = DockerOperator(
image='python_sample:latest',
)
DAGを実行
DAGをtriggerしたところ問題なく実行されました。少し懸念していたログも問題なく出力されている様です。
環境変数を読み込む
Dockerコンテナを起動する際、環境変数ファイル(.envなど)から環境変数を読み込むことが多いと思いますが、DockerOperatorには環境変数を読み込む機能はないみたいです。しかし、environment
パラメータにdict形式で環境変数を渡すことはできます。なので、python-dotenvを使って環境変数ファイルをdictに読み込んでenvironment
パラメータに渡すようにしました。
まずは、Dockerfileにpython-dotenvインストール処理を追加します。
RUN pip install python-dotenv
次に、docker-compose-CeleryExecutor.ymlに環境変数ファイルのマウント設定を追加します。
worker:
volumes:
- ./envfiles:/usr/local/airflow/envfiles
最後に、DockerOperatorを実行する箇所で、環境変数ファイルを読み込んで渡します。
from dotenv import dotenv_values
DockerOperator(
image="python:latest",
environment=dotenv_values(dotenv_path='/usr/local/airflow/envfiles/sample.env'), # 環境変数読み込み
command="""python -c "from os import environ; print(environ.get('KEY1')); print(environ.get('KEY2'));" """ # 環境変数をログ出力
)
問題なく環境変数がロードされていることをログで確認できました。
[2020-11-07 01:51:09,385] {{docker_operator.py:210}} INFO - Starting docker container from image python:latest
[2020-11-07 01:51:09,912] {{docker_operator.py:243}} INFO - VALUE1 # <= 環境変数の値がログ出力されている
VALUE2 # <= 環境変数の値がログ出力されている
クレデンシャルファイルを読み込む
クレデンシャルファイル(例えばGCPのサービスアカウントのJSONキーなど)をDockerイメージに含めるのは避けたいので、DockerOperatorのvolumes
パラメータでクレデンシャルファイルをマウントしてDockerOperator内から参照できるようにします。
DockerOperator(
volumes=["/Users/rinoguchi/workspace/docker-airflow/credentials/:/credentials/"],
)
実はこの設定でめっちゃハマりました。
volumes=<host_path>:<container_path>
という形式で値を設定するのですが、<host_path>
の部分はMacOS上でのクレデンシャルファイルのパスを指定する必要があります。今回の構成では、MacOSのDockerエンジン上でAirflowのDockerコンテナを立ち上げ、そこでDockerOperatorでアプリを起動するものの、アプリのDockerコンテナ自体はMacOS上のDockerエンジンで動くためです。
ちなみに、間違って'valumes=/usr/local/airflow/credentials/:/credentials/
のようにAirflowのDockerコンテナ内のパスを指定していた時は、以下のようなエラーが出てました。Dockerはマウント元のファイルやディレクトリが存在しない時、空のディレクトリとしてマウントしてしまうらしいです。
IsADirectoryError: [Errno 21] Is a directory: '/credentials/test-docker-operator-xxxxxxxx.json'
DAG実行時にJSONでパラメータ設定
Airflowのバージョンがv1.10.10以降であればDAG実行時に、JSON形式でパラメータを設定することができます。公式のドキュメントはこちらです。
今回は、「pythonのDockerイメージのバージョン」と「pythonコマンドへの引数」をパラメータとして渡すことを考えます。
まずは、DAGを実装します。
DockerOperator.template_fields = tuple(list(DockerOperator.template_fields) + ["image"]) # NOTE: テンプレート対象に`image`を追加
DockerOperator(
image="python:{{ dag_run.conf['version'] }}",
command="python {{ dag_run.conf['args'] }}",
)
Airflowはこちらに記載されているように、JINJA2テンプレートを使ってパラメータ埋め込みとマクロを提供しています。JINJA2で利用可能な変数やマクロはこちらで確認できます。
DAG実行時のパラメータは、Operatorの引数のうち、JINJA2テンプレート対応した引数の中でのみ利用可能で、dag_run.conf
変数に格納されています。テンプレート対応しているかどうかは、APIドキュメントに(templated)
と記載されているのでそこで確認するか、docstringでも確認できます。
今回は、command
はテンプレート対象だったのですが、image
は対象外でした。実装を確認したところ、テンプレート対象かどうかはOperatorクラスのtemplate_fields
というクラス変数で定義されていたため、それを上書きしてあげることでimage
もテンプレート対象にすることができました。
あとは、Configuration JSON (Optional)
に以下のように設定し、DAGを実行します。
{
"version": "3.10.0a2",
"args": "-c \"from datetime import datetime; print(datetime.now())\""
}
ちゃんと期待通りに実行されていることをログで確認できました。
[2020-11-07 02:15:28,890] {{docker_operator.py:210}} INFO - Starting docker container from image python:3.10.0a2 # <= 指定したバージョンになってる
[2020-11-07 02:15:29,372] {{docker_operator.py:243}} INFO - 2020-11-07 02:15:29.372122 # <= 指定した`datetime.now()`が実行されている
GCR(Google Cloud Registry)上のイメージを利用する
GCRはPrivate Registoryなので認証しないとDockerイメージをpullすることができません。通常、GCRからイメージをpullする際はdocker
コマンドの設定を追加して、DCRからdocker pull
際にはgcloud
コマンドをクレデンシャルヘルパーとして利用し認証を行ってpullします。
しかしDockerOperatorの場合はgcloud
コマンドを利用できませんので、DockerHookを使って認証を行います。
まずは、WebUIのメニューから「Admin」>「Connections」をクリックしてConnections画面を開き、さらに「Create」をクリックして以下のようにConnection定義を作成します。
- Conn Id: 好きな値を
- Conn Type:
Docker Registry
を選択 - Registry URL: GCRのリポジトリ名を正確に記載
- Username:
_json_key
を固定で記載 - Password: サービスアカウントのJSONキーファイルの中身のjson全体をCopy&Paste
次に、DockerOperatorの引数を以下のようにします。
- image: GCRのリポジトリ名を記載
- docker_conn_id: 先ほど作成してコネクション定義の
Conn Id
を記載DockerOperator( image="asia.gcr.io/cloud-registry-test/python-sample:latest", docker_conn_id="gcr_conn", )
DAGを実行すると以下のようにログが出力され、GCRからpython-sample:latest
のイメージをpullして実行できていることがわかりました。
[2020-11-07 07:01:57,489] {{taskinstance.py:901}} INFO - Executing <Task(DockerOperator): pull_from_gcr> on 2020-11-07T07:01:52.591311+00:00
[2020-11-07 07:01:57,493] {{standard_task_runner.py:54}} INFO - Started process 65 to run task
[2020-11-07 07:01:57,523] {{standard_task_runner.py:77}} INFO - Running: ['airflow', 'run', 'pull_from_gcr', 'pull_from_gcr', '2020-11-07T07:01:52.591311+00:00', '--job_id', '42', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/pull_from_gcr.py', '--cfg_path', '/tmp/tmpwayn7gp0']
[2020-11-07 07:01:57,525] {{standard_task_runner.py:78}} INFO - Job 42: Subtask pull_from_gcr
[2020-11-07 07:01:57,582] {{logging_mixin.py:112}} INFO - Running %s on host %s <TaskInstance: pull_from_gcr.pull_from_gcr 2020-11-07T07:01:52.591311+00:00 [running]> b87faasdfjhe
[2020-11-07 07:01:57,631] {{base_hook.py:89}} INFO - Using connection to: id: gcr_conn. Host: asia.gcr.io/cloud-registry-test, Port: None, Schema: None, Login: _json_key, Password: XXXXXXXX, extra: None # <= GCRに接続している
[2020-11-07 07:01:58,735] {{docker_operator.py:268}} INFO - Pulling docker image asia.gcr.io/cloud-registry-test/python-sample:latest # <= GCRからイメージをpullしている
[2020-11-07 07:01:59,825] {{docker_operator.py:273}} INFO - Pulling from cloud-registry-test/python-sample
[2020-11-07 07:01:59,829] {{docker_operator.py:273}} INFO - Digest: sha256:7vfq87vqreghjiqwef978vchdqwuirweokj2fed8becd235
[2020-11-07 07:01:59,839] {{docker_operator.py:273}} INFO - Status: Downloaded newer image for asia.gcr.io/cloud-registry-test/python-sample:latest # <= イメージのダウンロード完了
[2020-11-07 07:01:59,840] {{docker_operator.py:210}} INFO - Starting docker container from image asia.gcr.io/cloud-registry-test/python-sample:latest
[2020-11-07 07:02:00,317] {{docker_operator.py:243}} INFO - python executed. # <= コンテナ起動時の`ENTRYPOINT [ "python", "-c", "print('python executed.')" ]`の出力結果
[2020-11-07 07:02:00,553] {{taskinstance.py:1070}} INFO - Marking task as SUCCESS.dag_id=pull_from_gcr, task_id=pull_from_gcr, execution_date=20201107T070152, start_date=20201107T070157, end_date=20201107T070200
[2020-11-07 07:02:02,450] {{local_task_job.py:102}} INFO - Task exited with return code 0
Airflow再起動時の注意点
今回の構成では、docker-composeでAirflowのDockerコンテナ群を起動しています。Dockerfilelを更新した際にはコンテナを再作成したいですが、postgres
にDAGの実行履歴や管理画面での設定が記録されていますので、これを絶対に削除してはいけません。
何も考えず以下を実行すると、全てのコンテナが削除されるので要注意です。絶対にdocker-compose down
を実行しないようにしましょう。
docker-compose -f docker-compose-CeleryExecutor.yml down # <= これで全コンテナが削除される!
docker-compose -f docker-compose-CeleryExecutor.yml up -d
たぶん、docker-compose down
さえやらなければ問題ないのですが、個人的には再作成が必要な時は、以下のようにpostgres
とredis
以外のコンテナを明示的に指定して再作成するようにしてます。変更がないコンテナは再作成されません。--no-deps
をつけることで他のコンテナとの依存関係を無視してくれます。
docker-compose -f docker-compose-CeleryExecutor.yml up -d --no-deps scheduler flower webserver worker
> docker-airflow_flower_1 is up-to-date
> Recreating docker-airflow_webserver_1 ...
> Recreating docker-airflow_webserver_1 ... done
> docker-airflow_scheduler_1 is up-to-date
> docker-airflow_worker_1 is up-to-date
ちなみに、コンテナをリスタートしたいだけの時は、以下のようにしてます。
docker-compose -f docker-compose-CeleryExecutor.yml restart webserver
さいごに
AirflowでDockerOperatorを動かす件、ネット上に情報が少なくて思ったより時間がかかりましたが、大体のケースは網羅できたんじゃないかと思いますので、ここらで検証は終わりにしようと思います。
ちなみに、docker-airflowをforkして検証したリポジトリは こちら になりますので、興味があれば参照ください。