Планировщик Airflow ищет все возможные DAG по наличию инстанса объекта DAG в модуле (по пути, указанному в параметре dags_folder
в конфиге airflow.cfg
). Он должен быть доступен при импорте модуля/пакета планировщиком, поэтому его необходимо определять в глобальном пространстве имён. Также у каждого DAG должен быть уникальный dag_id
, именно его вы видите в списке всех DAG, когда заходите в панель управления Airflow.
import random
import datetime as dt
from airflow.models import DAG
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'airflow',
'start_date': dt.datetime(2021, 1, 20),
'retries': 2,
'retry_delay': dt.timedelta(seconds=10),
}
def random_dice():
val = random.randint(1, 6)
if val % 2 != 0:
raise ValueError(f'Odd {val}')
with DAG(dag_id='first_dag',
schedule_interval='@daily',
default_args=default_args) as dag:
dice = PythonOperator(
task_id='random_dice',
python_callable=random_dice,
dag=dag,
)
Класс DAG умеет работать с контекстным менеджером with
. В коде я создаю инстанс класса DAG, передаю ему уникальный dag_id
, расписание запусков и параметры по умолчанию через default_args
. Параметры по умолчанию удобны, когда вы хотите одни и те же настройки передать всем операторам, входящим в DAG. К слову, вы также можете задавать индивидуальные настройки для каждого оператора. Например, количество перезапусков в случае ошибок, retries
, или таймаут перед повторным запуском, retry_delay
. Параметр start_date
также может быть у каждого оператора свой.
У нас с вами получился простой DAG с одним лишь оператором — PythonOperator
. Этот оператор принимает на вход callable (функцию, исполняемый класс или lambda выражение). Каждый оператор должен иметь уникальный task_id
и dag
к которому он относится.
Функция random_dice
случайным образом генерирует число от 1 до 6, и в случае если выпало нечетное число возбуждает исключение ValueError
. Обратите внимание, что в default_args
мы задали настройки для перезапуска операторов в случае ошибок — retries=2
, также таймаут между повторами равен 10 секундам, retry_delay
. Это как раз поможет вам понять как Airflow ведёт себя в случае появления ошибок при выполнении кода.
Здравствуйте! Куда вставлять приведенный выше код?
Добрый день! Его необходимо сохранить в файл с разрешением .py в директорию для DAGов. Например, это может быть папка ~/airflow/dags
pavel_durugyan 1 Июнь 2022
Здравствуйте! А почему именно from airflow.models import DAG, а не from airflow import DAG?