Шаг 1 из 1

Если у вас ранее был опыт работы с Luigi, то Operator в Apache Airflow это аналог класса Task в Luigi. Operator это звено в цепочке задач, которое образует DAG. Представим, что вы хотите описать ряд зависимых между собой задач. Например, вам нужно скачать изображение с удалённого хоста и пометить его водным знаком. Такая цепочка может состоять из двух задач:

  1. Скачивание изображения
  2. Накладывание водяного знака

Каждый из этих шагов и будет отдельным оператором в терминах Apache Airflow.

Если снова взглянуть на картинку из описания DAG, то каждый кружочек это отдельный оператор Airflow.

В Airflow есть ряд готовых операторов, например:

  • PythonOperator — оператор для исполнения python кода
  • BashOperator — оператор для запуска bash скриптов/команд
  • PostgresOperator — оператор для вызова SQL запросов в PostgreSQL БД
  • RedshiftToS3Transfer — оператор для запуска UNLOAD команды из Redshift в S3
  • EmailOperator — оператор для отправки электронных писем

Полный список стандартных операторов можно найти в документации Apache Airflow. PythonOperator, пожалуй, это самый популярный оператор, т.к. он позволяет запускать код на Python.

Начиная с Apache Airflow 2.0 у разработчиков появилась возможность элегантным образом описывать DAG и операторы, используя декораторы.

У читателя может возникнуть вопрос, а можно ли передавать возвращаемые значения функций при запуске PythonOperator? Да, ранее это нужно было делать явно, используя механизм XСom. Начиная со второй версии Airflow при использовании декораторов dag и task, это происходит "под капотом" за счёт чего код остаётся чистым. Позже мы увидим как это работает на практике. От себя лишь хочется добавить, что я не рекомендую передавать большой объём данных между операторами, т.к. коммуникация происходит через базу данных путём сериализации/десериализации объектов, что в итоге замедлит выполнение вашего кода.

Сенсоры

Сенсоры это разновидность операторов, используемых для организации событийно-ориентированных дата пайплайнов. В Apache Airflow присутствуют следующие сенсоры:

  • PythonSensor — ждём, когда функция вернёт True
  • S3Sensor — проверяет наличие объекта по ключу в S3-бакете
  • RedisPubSubSensor — проверяет наличие сообщения в pub-sub очереди
  • RedisKeySensor — проверяет существует ли переданный ключ в Redis хранилище

Если успешность вашего пайплайна зависит от наступления события, например, появления файла на удалённом хосте или записи в базе данных, то вам нужен сенсор.

Комментарии

"сериализации/десериализации объетов, что в итоге замедлит выполнение вашего кода." - Вот сдесь поправте слово - объетов

Спасибо, поправил! 🤝

>Позже практике мы увидим как это работает Опечатка