Step 1 of 1

В Apache Airflow 2.0 появился новый способ описания DAG и PythonOperator — TaskFlow API. TaskFlow API это синтаксический сахар, который делает код чище и проще для чтения. Во второй версии Apache Airflow были введены 2 декоратора: dag и task.

Давайте посмотрим как выглядит предыдущий код, переписанный на TaskFlow API:

import datetime as dt

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.operators.dummy import DummyOperator

default_args = {
    'owner': 'airflow',
    'start_date': dt.datetime(2021, 1, 20),
}

@dag(
    default_args=default_args,
    schedule_interval='@daily',
    dag_id='taskflow_dag_with_two_operators'
)
def first_dag_taskflow():
    @task
    def even_only():
        context = get_current_context()
        execution_date = context['execution_date']

        if execution_date.day % 2 != 0:
            raise ValueError(f'Odd day: {execution_date}')

    @task
    def dummy_task():
        pass

    even_only() >> dummy_task()

    # вариант с готовым Operator из Airflow
    # even_only() >> DummyOperator(task_id='dummy_task')

main_dag = first_dag_taskflow()

Чтобы сформировать объект DAG, необходимо обернуть функцию декоратором dag. Этот декоратор принимает те же аргументы. Чтобы превратить функцию в PythonOperator, её необходимо обернуть в декоратор task.

Обратите внимание, что инстанс DAG должен быть доступен в глобальном пространстве при импорте кода планировщиком. Именно поэтому я вызываю функцию, обернутую в декоратор dag и присваиваю переменной main_dag. Если этого не сделать, то планировщик не сможет найти DAG.

Заметьте, что я явно не указываю task_id, он берётся из названия оборачиваемой функции.

Далее во всех примерах, где будет нужен PythonOperator я буду использовать TaskFlow API. Мы также рассмотрим как TaskFlow API позволяет "бесшовно" передавать возвращаемые значения из одного оператора в другой. До TaskFlow API необходимо было явно использовать XCom.

Подробный пост на тему TaskFlow API я опубликовал у себя в блоге: TaskFlow API в Apache Airflow 2.0

Comments

Здравствуйте! При попытке запустить DAG вручную получаю ошибку: To access configuration in your DAG use {{ dag_run.conf }}. Подскажите как это побороть?

Добрый день! Хм, какая версия Airflow? Возможно ли что при запуске вы передаёте какую-то строку в conf? В каком статусе DAG находится в web UI?

Version: v2.0.1, Что бы шедулер увидел новый DAG, использую команду: airflow scheduler. В колонке Runs, как и в колонке Resent Tasks, у него не подсвечен ни один кружочек.

Немного сдвинулся. Если нажать на значок запуска дага, находясь на главной странице(треугольник), попадаю на страницу http://127.0.0.1:8080/trigger?dag_id=myPostgresTest , где и получал сообщение из поста выше. Потом заметил, что на последней странице есть кнопка "Trigger". Нажимая ее dag стартует. Поясните, зачем так сделано. Зачем для ручного запуска нажимать две кнопки? Как понимать сообщение: To access configuration in your DAG use {{ dag_run.conf }}.

Где смотреть лог ошибки запуска? Находясь на главной странице, нажал на красную окружность, в столбце Rans для проверяемого дага. Попал на страницу: http://localhost:8080/dagrun/list/?_flt_3_dag_id=myPostgresTest&_flt_3_state=failed Куда идти дальше, чтоб посмотреть ошибку?