Шаг 3 из 10

Создание connection id

В работе с Apache Airflow вы неоднократно столкнётесь с Connections, т.к. система поощряет использование идентификаторов соединения вместо указания ссылки в коде. Для создания идентификатора необходимо перейти в раздел Admin → Connections и нажать на +:

На форме вам необходимо заполнить несколько полей:

В endpoint будем передавать название файла, т.к. эта часть меняется в зависимости от года и месяца.

Вот как выглядит код оператора проверки наличия файла:

check_file = SimpleHttpOperator(
    method='HEAD',
    endpoint='yellow_tripdata_{{ execution_date.strftime("%Y-%m") }}.csv',
    task_id='check_file',
    http_conn_id='nyc_yellow_taxi_id'
)

Обратите внимание на endpoint. У операторов Airflow есть т.н. template fields, куда можно передавать строку-шаблон и Airflow подставит нужные значения в зависимости от контекста. Переменная execution_date входит в набор переменных по умолчанию, доступных в контексте выполнения DAG (DagRun), она представляет из себя объект типа Pendulum из пакета pendulum, это datetime на стероидах. В качестве шаблонизатора Apache Airflow использует Jinja2. execution_date указывает на дату исполнения, это не дата на компьютере в момент выполнения DAG, а дата, рассчитанная относительно start_date и schedule interval. Пример как считается execution_date я описывал здесь.

Комментарии

Добрый день! Есть следующий кейс. Развернут сервер Airflow и также отдельно развернут сервер JupyterHub. На сервере Airflow стоит Python, но целевая история это использования ресурсов JupyterHub и виртуального окружения сконфигурированного на JH. То есть необходимо, чтобы операторы выполнялись на сервере JH. Вопрос: смогу ли я решить этот кейс при помощи connections и если смогу, то какие должны быть основные шаги? Спасибо!

Добрый день, Влад! Чтобы использовать ресурсы JH для выполнения задач Airflow необходимо чтобы Airflow либо стоял там же где JH либо использовать распределенный Executor типа CeleryExecutor, но в этом случае airflow всё равно необходимо будет установить в окружение JH. Connections это не про выполнение задач, а скорее про хранение данных соединений, которые используются внутри DAGs и операторов.