Step 2 of 2

Скелет DAG

Планировщик Airflow ищет все возможные DAG по наличию инстанса объекта DAG в модуле (по пути, указанному в параметре dags_folder в конфиге airflow.cfg). Он должен быть доступен при импорте модуля/пакета планировщиком, поэтому его необходимо определять в глобальном пространстве имён. Также у каждого DAG должен быть уникальный dag_id, именно его вы видите в списке всех DAG, когда заходите в панель управления Airflow.

import random
import datetime as dt

from airflow.models import DAG
from airflow.operators.python import PythonOperator

default_args = {
    'owner': 'airflow',
    'start_date': dt.datetime(2021, 1, 20),
    'retries': 2,
    'retry_delay': dt.timedelta(seconds=10),
}

def random_dice():
    val = random.randint(1, 6)
    if val % 2 != 0:
        raise ValueError(f'Odd {val}')

with DAG(dag_id='first_dag',
         schedule_interval='@daily',
         default_args=default_args) as dag:

    dice = PythonOperator(
        task_id='random_dice',
        python_callable=random_dice,
        dag=dag,
    )

Класс DAG умеет работать с контекстным менеджером with. В коде я создаю инстанс класса DAG, передаю ему уникальный dag_id, расписание запусков и параметры по умолчанию через default_args. Параметры по умолчанию удобны, когда вы хотите одни и те же настройки передать всем операторам, входящим в DAG. К слову, вы также можете задавать индивидуальные настройки для каждого оператора. Например, количество перезапусков в случае ошибок, retries, или таймаут перед повторным запуском, retry_delay. Параметр start_date также может быть у каждого оператора свой.

У нас с вами получился простой DAG с одним лишь оператором — PythonOperator. Этот оператор принимает на вход callable (функцию, исполняемый класс или lambda выражение). Каждый оператор должен иметь уникальный task_id и dag к которому он относится.

Функция random_dice случайным образом генерирует число от 1 до 6, и в случае если выпало нечетное число возбуждает исключение ValueError. Обратите внимание, что в default_args мы задали настройки для перезапуска операторов в случае ошибок — retries=2, также таймаут между повторами равен 10 секундам, retry_delay. Это как раз поможет вам понять как Airflow ведёт себя в случае появления ошибок при выполнении кода.

Comments

Здравствуйте! А почему именно from airflow.models import DAG, а не from airflow import DAG?

Здравствуйте! Куда вставлять приведенный выше код?

Добрый день! Его необходимо сохранить в файл с разрешением .py в директорию для DAGов. Например, это может быть папка ~/airflow/dags