Предположим, что спустя некоторое время вы решаете, что для вас важно получить данные по поездкам на такси в Нью-Йорке за весь 2019 год, но в настройках DAG start_date
начинается с января 2020 года. Разработчики Apache Airflow не рекомендуют менять настройки для существующих DAG, т.к. информация о всех прошлых запусках хранится в базе и изменение start_date
или schedule_interval
могут запутать планировщик Airflow. Если всё таки необходимо изменить настройки, то лучше создать новый DAG, а старый поставить на паузу при необходимости.
Если интервал запусков не меняется, а вам лишь нужно прогрузить прошлые периоды, то можно воспользоваться механизмом backfill. Для этого можно использовать консоль или API:
Пример запуска в консоли:
airflow dags backfill \
--start-date START_DATE \
--end-date END_DATE \
dag_id
Для прогрузки всех поездок на такси за 2019 год:
airflow dags backfill \
--start-date 2019-01-01 \
--end-date 2019-12-01 \
nyc_taxi_dataset_dag
К API мы ещё вернёмся ближе к концу курса.
sag163 16 Январь 2022
Если при запуске через командную строку pipeline упал с ошибкой, то при повторном запуске он не запускается. Падает... Почему это происходит? [2022-01-16 23:18:20,029] {backfill_job.py:477} ERROR - Task instance <TaskInstance: nyc_taxi_dataset_dag.download_file 2020-04-01 00:00:00+00:00 [failed]> with state failed [2022-01-16 23:18:20,040] {backfill_job.py:477} ERROR - Task instance <TaskInstance: nyc_taxi_dataset_dag.to_parquet 2020-04-01 00:00:00+00:00 [upstream_failed]> with state upstream_failed [2022-01-16 23:18:24,844] {dagrun.py:430} ERROR - Marking run <DagRun nyc_taxi_dataset_dag @ 2020-04-01 00:00:00+00:00: backfill__2020-04-01T00:00:00+00:00, externally triggered: False> failed