Шаг 2 из 3

Backfill

Предположим, что спустя некоторое время вы решаете, что для вас важно получить данные по поездкам на такси в Нью-Йорке за весь 2019 год, но в настройках DAG start_date начинается с января 2020 года. Разработчики Apache Airflow не рекомендуют менять настройки для существующих DAG, т.к. информация о всех прошлых запусках хранится в базе и изменение start_date или schedule_interval могут запутать планировщик Airflow. Если всё таки необходимо изменить настройки, то лучше создать новый DAG, а старый поставить на паузу при необходимости.

Если интервал запусков не меняется, а вам лишь нужно прогрузить прошлые периоды, то можно воспользоваться механизмом backfill. Для этого можно использовать консоль или API:

Пример запуска в консоли:

airflow dags backfill \
    --start-date START_DATE \
    --end-date END_DATE \
    dag_id

Для прогрузки всех поездок на такси за 2019 год:

airflow dags backfill \
    --start-date 2019-01-01 \
    --end-date 2019-12-01 \
    nyc_taxi_dataset_dag

К API мы ещё вернёмся ближе к концу курса.

Комментарии

Если при запуске через командную строку pipeline упал с ошибкой, то при повторном запуске он не запускается. Падает... Почему это происходит? [2022-01-16 23:18:20,029] {backfill_job.py:477} ERROR - Task instance <TaskInstance: nyc_taxi_dataset_dag.download_file 2020-04-01 00:00:00+00:00 [failed]> with state failed [2022-01-16 23:18:20,040] {backfill_job.py:477} ERROR - Task instance <TaskInstance: nyc_taxi_dataset_dag.to_parquet 2020-04-01 00:00:00+00:00 [upstream_failed]> with state upstream_failed [2022-01-16 23:18:24,844] {dagrun.py:430} ERROR - Marking run <DagRun nyc_taxi_dataset_dag @ 2020-04-01 00:00:00+00:00: backfill__2020-04-01T00:00:00+00:00, externally triggered: False> failed