В работе с Apache Airflow вы неоднократно столкнётесь с Connections, т.к. система поощряет использование идентификаторов соединения вместо указания ссылки в коде. Для создания идентификатора необходимо перейти в раздел Admin → Connections и нажать на +
:
На форме вам необходимо заполнить несколько полей:
В endpoint
будем передавать название файла, т.к. эта часть меняется в зависимости от года и месяца.
Вот как выглядит код оператора проверки наличия файла:
check_file = SimpleHttpOperator(
method='HEAD',
endpoint='yellow_tripdata_{{ execution_date.strftime("%Y-%m") }}.csv',
task_id='check_file',
http_conn_id='nyc_yellow_taxi_id'
)
Обратите внимание на endpoint
. У операторов Airflow есть т.н. template fields, куда можно передавать строку-шаблон и Airflow подставит нужные значения в зависимости от контекста. Переменная execution_date
входит в набор переменных по умолчанию, доступных в контексте выполнения DAG (DagRun), она представляет из себя объект типа Pendulum из пакета pendulum, это datetime на стероидах. В качестве шаблонизатора Apache Airflow использует Jinja2. execution_date
указывает на дату исполнения, это не дата на компьютере в момент выполнения DAG, а дата, рассчитанная относительно start_date и schedule interval. Пример как считается execution_date
я описывал здесь.
Добрый день, Влад! Чтобы использовать ресурсы JH для выполнения задач Airflow необходимо чтобы Airflow либо стоял там же где JH либо использовать распределенный Executor типа CeleryExecutor, но в этом случае airflow всё равно необходимо будет установить в окружение JH. Connections это не про выполнение задач, а скорее про хранение данных соединений, которые используются внутри DAGs и операторов.
Влад 20 Август 2021
Добрый день! Есть следующий кейс. Развернут сервер Airflow и также отдельно развернут сервер JupyterHub. На сервере Airflow стоит Python, но целевая история это использования ресурсов JupyterHub и виртуального окружения сконфигурированного на JH. То есть необходимо, чтобы операторы выполнялись на сервере JH. Вопрос: смогу ли я решить этот кейс при помощи connections и если смогу, то какие должны быть основные шаги? Спасибо!