Последний оператор задача которого — конвертировать CSV в Parquet и уложить обратно в S3. Код функции преобразования:
def convert_to_parquet(year_month: str, s3_path: str):
s3 = S3Hook('minio_id')
bucket, key = s3.parse_s3_url(s3_path)
file_path = s3.download_file(key, bucket)
with gzip.open(file_path, mode='rb') as f:
df = pd.read_csv(f)
current_month = dt.datetime.strptime(year_month, '%Y-%m')
next_month = current_month.replace(month=current_month.month + 1)
df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
df = df[
(df['tpep_pickup_datetime'] >= f'{current_month:%Y-%m-%d}') &
(df['tpep_pickup_datetime'] < f'{next_month:%Y-%m-%d}')
]
df['pickup_date'] = df['tpep_pickup_datetime'].dt.strftime('%Y-%m-%d')
s3_path = f's3://nyc-yellow-taxi-parquet-data/yellow_tripdata_{year_month}.parquet'
bucket, key = s3.parse_s3_url(s3_path)
with NamedTemporaryFile('wb', delete=False) as f:
df.to_parquet(f)
s3.load_file(
f.name,
key,
bucket,
replace=True,
)
return s3_path
Код использует библиотеку pandas для очистки данных и конвертирования в Parquet. Для работы с parquet предварительно необходимо установить библиотеку pyarrow:
pip install pyarrow==3.0.0
Если вы никогда не работали с pandas, то вам поможет мой туториал по работе с этой библиотекой. В коде функции используется всё тот же S3Hook
для загрузки файла из S3 (который был сохранён таском download_file), и укладки Parquet в другой бакет S3.
Код DAG
@dag(default_args=default_args,
schedule_interval='@monthly',
start_date=datetime(2020, 1, 1),
)
def nyc_taxi_dataset_dag():
check_file = SimpleHttpOperator(
method='HEAD',
endpoint='yellow_tripdata_{{ execution_date.strftime("%Y-%m") }}.csv',
task_id='check_file',
http_conn_id='nyc_yellow_taxi_id'
)
@task
def download_file():
context = get_current_context()
return download_dataset(context['execution_date'].strftime('%Y-%m'))
@task
def to_parquet(file_path: str):
context = get_current_context()
return convert_to_parquet(context['execution_date'].strftime('%Y-%m'), file_path)
Напомню, что я использую TaskFlow API. Если вам интересно как выглядит код в старом представлении, то сморите в репозитории.
Обратите внимание, что таск to_parquet
принимает аргумент file_path
. Это путь до S3-объекта, который был сохранён предыдущим таском download_file
.
Чтобы передать результат выполнения одного оператора в следующий, необходимо ниже написать следующий код:
file_path = download_file()
parquet_file_path = to_parquet(file_path)
Он совпадает с тем как обычно вызываются функции и результат передаётся дальше. Вся магия "бесшовной" передачи данных от одного оператора к другому происходит за счёт декоратора task. При вызове функции, обернутой в task, возвращается инстанс класса XComArg. Если функция, переданная в PythonOperator
, возвращает значение, то по умолчанию оно записывается в XCom. XComArg это класс-обёртка или адаптер, который умеет извлекать это значение, т.к. принимает на вход оператор и контекст (в методе resolve
). Поэтому когда в коде мы передаём в функцию to_parquet
инстанс класса XComArg (file_path
), Airflow понимает что это за класс и за кулисами получает из XCom результат выполнения download_file
(путь до загруженного S3-объекта). До появления TaskFlow API это необходимо было делать вручную.
Объявление зависимостей происходит тем же путём что и в первом примере DAG только оперируем мы теперь не операторами, а инстансами класса XComArg:
file_path = download_file()
parquet_file_path = to_parquet(file_path)
check_file >> file_path >> parquet_file_path
Важно не забывать вызывать функции, обернутые в task, чтобы получить инстанс класса XComArg. check_file
в этом случае уже является инстансом оператора SimpleHttpOperator
.
Чтобы Airflow Scheduler корректно обнаружил DAG, необходимо также вызвать обернутую в декоратор dag функцию и присвоить значение переменной в глобальном пространстве. Полный код DAG ниже:
from datetime import datetime
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.providers.http.operators.http import SimpleHttpOperator
from .functions import download_dataset, convert_to_parquet
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args,
schedule_interval='@monthly',
start_date=datetime(2020, 1, 1),
)
def nyc_taxi_dataset_dag():
check_file = SimpleHttpOperator(
method='HEAD',
endpoint='yellow_tripdata_{{ execution_date.strftime("%Y-%m") }}.csv',
task_id='check_file',
http_conn_id='nyc_yellow_taxi_id'
)
@task
def download_file():
context = get_current_context()
return download_dataset(context['execution_date'].strftime('%Y-%m'))
@task
def to_parquet(file_path: str):
context = get_current_context()
return convert_to_parquet(context['execution_date'].strftime('%Y-%m'), file_path)
file_path = download_file()
parquet_file_path = to_parquet(file_path)
check_file >> file_path >> parquet_file_path
nyc_dag = nyc_taxi_dataset_dag()
куда нужно установить pip install pyarrow==3.0.0 ?
Это нужно устанавливать в виртуальное окружение, где стоит Airflow.
Dmitry Kosarevsky 28 Ноябрь 2021
botocore.exceptions.EndpointConnectionError: Could not connect to the endpoint URL: "http://127.0.0.1:9000/nyc-yellow-taxi-raw-data/yellow_tripdata_2021-07.csv.gz?uploads" а почему оно докидывает ?uploads в урл?