Шаг 1 из 1

Командой airflow webserver -p 8080 мы запустили веб-сервер. Давайте познакомимся с пользовательским интерфейсом или как я его ещё называю — диспетчерской дата инженера.

В адресной строке браузере введите http://127.0.0.1:8080.

В предыдущем разделе мы создали пользователя airflow с паролем airflow. Необходимо войти в систему под этими данными.

Красной рамкой я выделил сообщение от Airflow, что ещё не запущен Планировщик. Что это за компонент я расскажу далее по тексту.

The scheduler does not appear to be running. The DAGs list may not update, and new tasks will not be scheduled.

На главном экране синей рамкой я выделил список примеров, которые Airflow грузит в панель по умолчанию. Чтобы отключить их загрузку необходимо в airflow.cfg поменять параметр:

load_examples = False

Если ранее этот параметр был True и вы инициализировали БД, то после изменения на False необходимо сбросить всю информацию о пайплайнах командой:

airflow db reset

Пересоздавать пользователей не нужно.

Примеры нужны исключительно в ознакомительных целях, никакой иной пользы они не несут.

Сразу после входа в панель Airflow вы видите перед собой список всех DAG, которые подгрузил Airflow для вас:

  • Pause/Unpause DAG — слева находится переключатель для включения/выключения DAG. По умолчанию все новые DAG будут остановлены. Чтобы запустить DAG его необходимо предварительно включить.
  • Колонка Owner обозначает владельца/автора DAG. Эта опция задаётся в коде, позже мы поговорим об атрибутах DAG, создавая свой первый пайлайн.
  • Runs — показывает состояние запусков прошлых DAG. Есть 3 состояния:
    1. Успешно выполнен
    2. Выполняется
    3. Есть ошибки при выполнении
  • Schedule — показывает с какой периодичностью будет запускаться DAG. Если значение None, то запуска по расписанию не будет, но можно запускать вручную, например.
  • Last Run — дата и время последнего запуска DAG
  • Recent Tasks — отражает текущее состояние последних запусков DAG (по факту последний запуск DAG и его операторов)
  • Actions — кнопки манипуляции с DAG. Можно запустить DAG вручную, обновить или удалить DAG.
  • Links — список для быстрого доступа к просмотру кода DAG, деталей выполнения, просмотру в виде графа или диаграммы Ганта, анализ времени выполнения задач и т.д.

Просмотр DAG

Если кликнуть на любой DAG из списка, то мы увидим детальную информацию по нему.

  • Tree View — просмотр DAG и зависимостей между операторами в виде дерева, также справа находится статус выполнения всех операторов, входящих в DAG с легендой. На статусы можно кликать, чтобы посмотреть более детальную информацию.
  • Graph View — фактически то же самое, что и Tree View, только в виде графа.
  • Task Duration — график времени выполнения DAG и операторов.
  • Task Tries — показывает график повторных запусков операторов по периодам. Также в Airflow есть механизм повторных запусков в случае неудач, о нём мы поговорим в практической части.
  • Landing Times — график показывает время (в зависимости от периодичности, это могут быть дни, минуты и т.д. по оси Y) прошедшее относительно периода запуска и непосредственным запуском (DagRun) планировщиком. Представьте что есть DAG с schedule_interval=@daily запуск с execution_date=2020-01-01 должен быть запланирован на 2020-01-02T00:00:00, но планировщик из-за нагрузки запускает его в 2020-01-02T11:00:00, тогда Landing Time для него будет 11 часов или 0.46 дней.
  • Gantt — отображает историю выполнения DAG в виде Диаграммы Ганта. Очень удобно, когда в DAG входит множество операторов, которые могут выполняться параллельно.
  • Details — детальная информация по DAG, включая названия операторов, автора, начальное время старта DAG и т.д.
  • Code — просмотр кода DAG.

Панель администратора Airflow


Variables

В этом разделе можно управлять переменными, которые будут храниться в базе данных Airflow. Удобно для хранения различной информации, например, адреса хостов удалённых сервисов. Нежелательно в этом разделе хранить данные доступов, т.к. переменные хранятся в открытом виде и могут быть скомпрометированы третьими лицами.

Configurations

Раздел позволяет редактировать настройки Airflow, используя графический интерфейс. По умолчанию опция отключена из соображений безопасности.

Connections

Раздел для хранения данных о доступе к различным сервисам: базам данных, облачным провайдерам, сторонним API сервисам и т.д. Я не рекомендую в DAG коде хранить пароли и секретные ключи, а использовать для этого раздел Connections, например.

При использовании Connections в коде вам необходимо лишь будет передать значение conn_id. На практике мы увидим как это удобно и просто.

Plugins

Раздел для управления плагинами Airflow. По умолчанию плагины можно подгрузить, разместив код по пути AIRFLOW_HOME/plugins.

Pools

Возможность задать пул или другими словами некое ограничение на количество параллельно выполняющихся задач. Представим, что вы хотите регулярно качать данные с внешнего сервиса. У сервиса есть ограничение на количество одновременных соединений — 2. В этом случае вы создаёте пул со значением 2 и присваиваете ему имя. Далее во всех операторах, отвечающих за скачивание данных, передаёте название этого пула.

XComs

Это механизм обмена сообщениями между операторами. Т.к. задачи могут выполняться на разных компьютерах, то получить возвращаемое значение одного оператора и передать другому привычным способом не получится (например, путём цепочек вызовов или присвоением значения переменной). Для таких ситуаций и создан XCom (cross-communication). Механизм работает через базу данных. По сути это аналог key-value базы, один оператор записывает значение под определённым ключом, а другой оператор может получить это значением, используя этот ключ. Не рекомендуется таким способом передавать большие сообщения, т.к. это требует дополнительных накладных расходов.

В этом разделе можно увидеть все существующие сообщения, созданные операторами.

Комментарии

"Я не рекомендую в DAG коде хранить пароли и секретные ключа..." - тут опечатка

Если я запускаю web server не на локальной машине, нужно ли мне как то конфигурировать airflow, или все должно работать сразу по адресу моего сервера http://my_ip:8080? У меня что-то не работает.

Позже в этом курсе будет отдельный урок по деплою Airflow на проде. Сейчас же можно пойти двумя путями: airflow webserver -H 0.0.0.0 -p 8080 или запустить Airflow локально, и проксировать запросы через nginx.

Если в Variables значении ключа докинуть постфикс _secret, то переменные хранятся в закрытом виде ;)

Здравствуйте! В airflow.cfg изменил параметр: load_examples = False Но рестарт не проходит. Команда airflow db reset Результат airflow: command not found Что нужно сделать чтоб эта команда выполнилась?

Добрый день! Если вы видите сообщение о том, что команда airflow не найдена, то скорее всего вам необходимо активировать виртуальное окружение в которое вы устанавливали Apache Airflow.

Да, выполнив такие команды в терминале: cd airflow source .venv/bin/activate смог запуска команды airflow

После того как терминал начал понимать команды, я запустил команду запуска шедулера: airflow scheduler Получаю такое сообщение: [2023-04-06 18:59:43,589] {scheduler_job.py:1247} INFO - Starting the scheduler [2023-04-06 18:59:43,590] {scheduler_job.py:1252} INFO - Processing each file at most -1 times [2023-04-06 18:59:43,681] {dag_processing.py:250} INFO - Launched DagFileProcessorManager with pid: 370185 [2023-04-06 18:59:43,684] {scheduler_job.py:1834} INFO - Resetting orphaned tasks for active dag runs [2023-04-06 18:59:43,779] {settings.py:54} INFO - Configured default timezone Timezone('UTC') [2023-04-06 19:04:43,767] {scheduler_job.py:1834} INFO - Resetting orphaned tasks for active dag runs [2023-04-06 19:09:43,803] {scheduler_job.py:1834} INFO - Resetting orphaned tasks for active dag runs Последнее сообщение выводится каждые 5 минут. Почему выполнение команды не заканчивается? Ожидал, что после выполнения этой команды, терминал вернется в с вое исходное состояние, то есть даст вводить другие команды

Не заканчивается потому что планировщик продолжает работу. Чтобы запустить его в фоне можно воспользоваться процесс-менеджерами типа systemd или supervisord.