Шаг 5 из 5

DAG

Всё готово для реализации DAGа на сенсоре. Необходимо заменить SimpleHttpOperator на HttpSensor и добавить 2 дополнительных аргумента: mode и poke_interval. Делать проверку чаще 1 раза в сутки нет мысла, но можно делать реже. Я для простоты указал интервал раз в 24 часа в секундах (60 * 60 * 24).

import datetime as dt

from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import get_current_context
from airflow.providers.http.sensors.http import HttpSensor

from nyc_taxi.functions import download_dataset, convert_to_parquet

default_args = {
    'owner': 'airflow',
}

with DAG(
    start_date=dt.datetime(2021, 1, 1),
    dag_id='nyc_taxi_2021_dag',
    schedule_interval='@monthly',
    default_args=default_args,
) as dag:

    check_if_exists = HttpSensor(
        method='HEAD',
        endpoint='yellow_tripdata_{{ execution_date.strftime("%Y-%m") }}.csv',
        http_conn_id='nyc_yellow_taxi_id',
        task_id='check_if_exists',
        poke_interval=60 * 60 * 24,  # раз в 24 часа
        mode='reschedule',
    )

    @task
    def download_file():
        context = get_current_context()
        return download_dataset(context['execution_date'].strftime('%Y-%m'))

    @task
    def to_parquet(file_path: str):
        context = get_current_context()
        return convert_to_parquet(context['execution_date'].strftime('%Y-%m'), file_path)

    file_path = download_file()
    parquet_file_path = to_parquet(file_path)

    check_if_exists >> file_path >> parquet_file_path

Ссылка на DAG в репозитории — nyc_taxi_2021_sensor.py

Комментарии