Apache Airflow 2.2: практический курс: New York Yellow Taxi Data Pipeline / Операторы DAG
Код оператора download_file и DAG:
# опускаю импорты
@dag(default_args=default_args,
schedule_interval='@monthly',
start_date=datetime(2020, 1, 1),
)
def nyc_taxi_dataset_dag():
check_file = SimpleHttpOperator(
method='HEAD',
endpoint='yellow_tripdata_{{ execution_date.strftime("%Y-%m") }}.csv',
task_id='check_file',
http_conn_id='nyc_yellow_taxi_id'
)
@task
def download_file():
context = get_current_context()
return download_dataset(context['execution_date'].strftime('%Y-%m'))
Обратите внимание, что декорируемая функция вызывает get_current_context. Эта функция возвращает контекст выполнения DAG, в нём хранится различная полезная информация в том числе настройки, текущая дата выполнения (execution_date) и многое другое. Нам нужна текущая дата выполнения для передачи года и месяца в функцию download_dataset
. Вызов get_current_context
вне оператора вернёт ошибку, т.к. отсутствует контекст.