Шаг 10 из 10

to_parquet

Последний оператор задача которого — конвертировать CSV в Parquet и уложить обратно в S3. Код функции преобразования:

def convert_to_parquet(year_month: str, s3_path: str):
    s3 = S3Hook('minio_id')
    bucket, key = s3.parse_s3_url(s3_path)
    file_path = s3.download_file(key, bucket)

    with gzip.open(file_path, mode='rb') as f:
        df = pd.read_csv(f)

    current_month = dt.datetime.strptime(year_month, '%Y-%m')
    next_month = current_month.replace(month=current_month.month + 1)

    df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
    df = df[
        (df['tpep_pickup_datetime'] >= f'{current_month:%Y-%m-%d}') &
        (df['tpep_pickup_datetime'] < f'{next_month:%Y-%m-%d}')
        ]
    df['pickup_date'] = df['tpep_pickup_datetime'].dt.strftime('%Y-%m-%d')

    s3_path = f's3://nyc-yellow-taxi-parquet-data/yellow_tripdata_{year_month}.parquet'
    bucket, key = s3.parse_s3_url(s3_path)

    with NamedTemporaryFile('wb', delete=False) as f:
        df.to_parquet(f)

    s3.load_file(
        f.name,
        key,
        bucket,
        replace=True,
    )
    return s3_path

Код использует библиотеку pandas для очистки данных и конвертирования в Parquet. Для работы с parquet предварительно необходимо установить библиотеку pyarrow:

pip install pyarrow==3.0.0

Если вы никогда не работали с pandas, то вам поможет мой туториал по работе с этой библиотекой. В коде функции используется всё тот же S3Hook для загрузки файла из S3 (который был сохранён таском download_file), и укладки Parquet в другой бакет S3.

Код DAG

@dag(default_args=default_args,
     schedule_interval='@monthly',
     start_date=datetime(2020, 1, 1),
)
def nyc_taxi_dataset_dag():

    check_file = SimpleHttpOperator(
        method='HEAD',
        endpoint='yellow_tripdata_{{ execution_date.strftime("%Y-%m") }}.csv',
        task_id='check_file',
        http_conn_id='nyc_yellow_taxi_id'
    )

    @task
    def download_file():
        context = get_current_context()
        return download_dataset(context['execution_date'].strftime('%Y-%m'))

    @task
    def to_parquet(file_path: str):
        context = get_current_context()
        return convert_to_parquet(context['execution_date'].strftime('%Y-%m'), file_path)

Напомню, что я использую TaskFlow API. Если вам интересно как выглядит код в старом представлении, то сморите в репозитории.

Обратите внимание, что таск to_parquet принимает аргумент file_path. Это путь до S3-объекта, который был сохранён предыдущим таском download_file.

Чтобы передать результат выполнения одного оператора в следующий, необходимо ниже написать следующий код:

file_path = download_file()
parquet_file_path = to_parquet(file_path)

Он совпадает с тем как обычно вызываются функции и результат передаётся дальше. Вся магия "бесшовной" передачи данных от одного оператора к другому происходит за счёт декоратора task. При вызове функции, обернутой в task, возвращается инстанс класса XComArg. Если функция, переданная в PythonOperator, возвращает значение, то по умолчанию оно записывается в XCom. XComArg это класс-обёртка или адаптер, который умеет извлекать это значение, т.к. принимает на вход оператор и контекст (в методе resolve). Поэтому когда в коде мы передаём в функцию to_parquet инстанс класса XComArg (file_path), Airflow понимает что это за класс и за кулисами получает из XCom результат выполнения download_file(путь до загруженного S3-объекта). До появления TaskFlow API это необходимо было делать вручную.

Объявление зависимостей происходит тем же путём что и в первом примере DAG только оперируем мы теперь не операторами, а инстансами класса XComArg:

file_path = download_file()
parquet_file_path = to_parquet(file_path)

check_file >> file_path >> parquet_file_path

Важно не забывать вызывать функции, обернутые в task, чтобы получить инстанс класса XComArg. check_file в этом случае уже является инстансом оператора SimpleHttpOperator.

Чтобы Airflow Scheduler корректно обнаружил DAG, необходимо также вызвать обернутую в декоратор dag функцию и присвоить значение переменной в глобальном пространстве. Полный код DAG ниже:

from datetime import datetime

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.providers.http.operators.http import SimpleHttpOperator
from .functions import download_dataset, convert_to_parquet

default_args = {
    'owner': 'airflow',
}

@dag(default_args=default_args,
     schedule_interval='@monthly',
     start_date=datetime(2020, 1, 1),
)
def nyc_taxi_dataset_dag():

    check_file = SimpleHttpOperator(
        method='HEAD',
        endpoint='yellow_tripdata_{{ execution_date.strftime("%Y-%m") }}.csv',
        task_id='check_file',
        http_conn_id='nyc_yellow_taxi_id'
    )

    @task
    def download_file():
        context = get_current_context()
        return download_dataset(context['execution_date'].strftime('%Y-%m'))

    @task
    def to_parquet(file_path: str):
        context = get_current_context()
        return convert_to_parquet(context['execution_date'].strftime('%Y-%m'), file_path)

    file_path = download_file()
    parquet_file_path = to_parquet(file_path)

    check_file >> file_path >> parquet_file_path

nyc_dag = nyc_taxi_dataset_dag()

Комментарии

botocore.exceptions.EndpointConnectionError: Could not connect to the endpoint URL: "http://127.0.0.1:9000/nyc-yellow-taxi-raw-data/yellow_tripdata_2021-07.csv.gz?uploads" а почему оно докидывает ?uploads в урл?

куда нужно установить pip install pyarrow==3.0.0 ?

Это нужно устанавливать в виртуальное окружение, где стоит Airflow.