Шаг 1 из 1

Прелесть любого workflow менеджера в умении формировать граф зависимостей между тасками и успешно его выполнять. В предыдущем примере мы рассмотрели пайплайн всего лишь с одним таском PythonOperator even_only. Но в реальных пайплайнах количество тасков может достигать огромных значений, давайте добавим ещё один таск, и сделаем его зависимым от первого.

Я буду использовать DummyOperator, который ничего не делает. Все исходники лежат в репозитории.

import datetime as dt

from airflow.models import DAG
from airflow.operators.python import PythonOperator, get_current_context
from airflow.operators.dummy import DummyOperator

default_args = {
    'owner': 'airflow',
    'start_date': dt.datetime(2021, 1, 20),
}

def even_only():
    context = get_current_context()
    execution_date = context['execution_date']

    if execution_date.day % 2 != 0:
        raise ValueError(f'Odd day: {execution_date}')

with DAG(dag_id='dag_with_two_tasks',
         schedule_interval='@daily',
         default_args=default_args) as dag:

    even_only = PythonOperator(
        task_id='even_only',
        python_callable=even_only,
        dag=dag,
    )

    dummy = DummyOperator(
        task_id='dummy_task',
        dag=dag
    )

    even_only >> dummy

Обратите внимание на последнюю строку:

even_only >> dummy

Это способ указать направление выполнения тасков и их зависимость. Если в UI открыть детали DAG и посмотреть графическое представление зависимостей, то вы увидите:

Что означает, что dummy_task зависит от выполнения even_only. Этот же код можно записать в других вариациях:

# dummy.set_upstream(even_only)
# even_only.set_downstream(dummy)
# dummy << even_only

Я предпочитаю использовать вариант со стрелочками, т.к. они интуитивно понятны, направление стрелок указывает на порядок выполнения.

Если стартовать ваш новый DAG, то часть запусков провалится (в нечетный день), а другая часть будет успешно выполнена (в четный день). Я специально выбрал такой пример, чтобы у вас была возможность посмотреть в интерфейсе как выглядят ситуации с неудачными запусками пайплайнов, изучить статусы операторов.

Пример выполнения в нечетный день (11 февраля 2021):

Обратите внимание, что Airflow подкрашивает операторы в цвет, соответствующий статусу в легенде. Красный означает, что оператор even_only вернул ошибку, а оранжевый сигнализирует о том, что предшествующий таск (для dummy_task это even_only) не был успешно выполнен (upstream failed).

Важно помнить, что успешное выполнение пайплайна возможно только при условии, что все операторы завершились без ошибок.

В четных днях вы должны увидеть следующую картину:

Комментарии

"Важно помнить, что успешное выполнение пайплайна возможно только при условии, что все операторы были завершились без ошибок." - заменить "были завершились" на "были завершены" или "завершились" =)

"Важно помнить, что успешное выполнение пайплайна возможно только при условии, что все операторы завершились без ошибок." trigger_rule - ну да, ну да, пошли мы нафиг не все, а последний в цепочке, а все остальные могут быть хоть в ошибке, хоть вообще не выполняться

почему нельзя испольовать datetime now()?