Прелесть любого workflow менеджера в умении формировать граф зависимостей между тасками и успешно его выполнять. В предыдущем примере мы рассмотрели пайплайн всего лишь с одним таском PythonOperator even_only
. Но в реальных пайплайнах количество тасков может достигать огромных значений, давайте добавим ещё один таск, и сделаем его зависимым от первого.
Я буду использовать DummyOperator, который ничего не делает. Все исходники лежат в репозитории.
import datetime as dt
from airflow.models import DAG
from airflow.operators.python import PythonOperator, get_current_context
from airflow.operators.dummy import DummyOperator
default_args = {
'owner': 'airflow',
'start_date': dt.datetime(2021, 1, 20),
}
def even_only():
context = get_current_context()
execution_date = context['execution_date']
if execution_date.day % 2 != 0:
raise ValueError(f'Odd day: {execution_date}')
with DAG(dag_id='dag_with_two_tasks',
schedule_interval='@daily',
default_args=default_args) as dag:
even_only = PythonOperator(
task_id='even_only',
python_callable=even_only,
dag=dag,
)
dummy = DummyOperator(
task_id='dummy_task',
dag=dag
)
even_only >> dummy
Обратите внимание на последнюю строку:
even_only >> dummy
Это способ указать направление выполнения тасков и их зависимость. Если в UI открыть детали DAG и посмотреть графическое представление зависимостей, то вы увидите:
Что означает, что dummy_task
зависит от выполнения even_only
. Этот же код можно записать в других вариациях:
# dummy.set_upstream(even_only)
# even_only.set_downstream(dummy)
# dummy << even_only
Я предпочитаю использовать вариант со стрелочками, т.к. они интуитивно понятны, направление стрелок указывает на порядок выполнения.
Если стартовать ваш новый DAG, то часть запусков провалится (в нечетный день), а другая часть будет успешно выполнена (в четный день). Я специально выбрал такой пример, чтобы у вас была возможность посмотреть в интерфейсе как выглядят ситуации с неудачными запусками пайплайнов, изучить статусы операторов.
Пример выполнения в нечетный день (11 февраля 2021):
Обратите внимание, что Airflow подкрашивает операторы в цвет, соответствующий статусу в легенде. Красный означает, что оператор even_only
вернул ошибку, а оранжевый сигнализирует о том, что предшествующий таск (для dummy_task
это even_only
) не был успешно выполнен (upstream failed).
Важно помнить, что успешное выполнение пайплайна возможно только при условии, что все операторы завершились без ошибок.
В четных днях вы должны увидеть следующую картину:
"Важно помнить, что успешное выполнение пайплайна возможно только при условии, что все операторы завершились без ошибок." trigger_rule - ну да, ну да, пошли мы нафиг не все, а последний в цепочке, а все остальные могут быть хоть в ошибке, хоть вообще не выполняться
почему нельзя испольовать datetime now()?
hookprod 14 Декабрь 2021
"Важно помнить, что успешное выполнение пайплайна возможно только при условии, что все операторы были завершились без ошибок." - заменить "были завершились" на "были завершены" или "завершились" =)