Шаг 9 из 10

Код оператора download_file и DAG:

# опускаю импорты

@dag(default_args=default_args,
     schedule_interval='@monthly',
     start_date=datetime(2020, 1, 1),
)
def nyc_taxi_dataset_dag():

    check_file = SimpleHttpOperator(
        method='HEAD',
        endpoint='yellow_tripdata_{{ execution_date.strftime("%Y-%m") }}.csv',
        task_id='check_file',
        http_conn_id='nyc_yellow_taxi_id'
    )

    @task
    def download_file():
        context = get_current_context()
        return download_dataset(context['execution_date'].strftime('%Y-%m'))

Обратите внимание, что декорируемая функция вызывает get_current_context. Эта функция возвращает контекст выполнения DAG, в нём хранится различная полезная информация в том числе настройки, текущая дата выполнения (execution_date) и многое другое. Нам нужна текущая дата выполнения для передачи года и месяца в функцию download_dataset. Вызов get_current_context вне оператора вернёт ошибку, т.к. отсутствует контекст.

Комментарии