Задача этого таска скачать датасет и сохранить на локальном диске в папку yellow-taxi-data
. В качестве Target класса будем использовать знакомый нам LocalTarget
, а также принимать на вход 2 параметра:
Для работы с HTTP в Python по традиции используют библиотеку requests. У неё простой и удобный интерфейс. Если вы никогда ранее не работали с ней, рекомендую ознакомиться с моей статьёй Руководство по работе с HTTP в Python. Библиотека requests.
Код загрузки файла выглядит следующим образом:
import requests
def download_dataset(filename: str) -> requests.Response:
url = f'https://s3.amazonaws.com/nyc-tlc/trip+data/{filename}'
response = requests.get(url, stream=True)
response.raise_for_status()
return response
Файлы большие, например, данные за июнь 2019 года весят ~620 МБ в CSV, поэтому скачивание идёт в потоковом режиме. Так как работа идёт с текстовыми данными, удобно считывать их построчно и сразу писать в открытый файл на диск (тем самым экономя оперативную память).
Вот так выглядит код таска:
def get_filename(year: int, month: int) -> str:
return f'yellow_tripdata_{year}-{month:02}.csv'
class DownloadTaxiTripTask(luigi.Task):
year = luigi.IntParameter()
month = luigi.IntParameter()
@property
def filename(self):
return get_filename(self.year, self.month)
def run(self):
self.output().makedirs() # in case path does not exist
response = download_dataset(self.filename)
with self.output().open(mode='w') as f:
for chunk in response.iter_lines():
f.write('{}\n'.format(chunk.decode('utf-8')))
def output(self):
return luigi.LocalTarget(os.path.join('yellow-taxi-data', self.filename))
У таска нет зависимостей, т.к. он является первым во всей цепочке задач (пайплайне). В качестве output
используется LocalTarget. То есть результатом выполнения является csv-файл в директории yellow-taxi-data
.
Метод run
я постарался сделать максимально читабельным:
yellow-taxi-data
, если нет, то таск её создастЕсли речь о том, чтобы иметь возможность менять через UI, то здесь нет такого как в Airflow. Но вполне возможно прописывать пути через переменные окружения.
На текущий момент, данные не доступны по указанной ссылке https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-01.csv
Но доступны паркеты - https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-06.parquet
Спасибо за коммент про обновленные датасеты, обновлю материал.
Здравствуйте. данные стали доступны в формате .parquet получилось скачать добавив экземпляр класса NopFormat, т.к. иначе бинарный файл не пишется, в LocalTarget нет режима записи wb, надо передавать параметр format=Nop (+ использовал iter_content()) ... from luigi.format import Nop ... for chunk in response.iter_content(chunk_size=512): f.write(chunk) def output(self): return luigi.LocalTarget(os.path.join('yellow-taxi-data', self.filename), format=Nop) вопрос: какой размер chunk_size лучше использовать? для iter_lines по умолчанию стоит chunk_size=512, а для iter_content по умолчанию chunk_size=1 и скачивается долго, какой размер оптимальный при потоковом скачивании бинарных и обычных строк?
Slava 21 Август 2022
В более менее большом проекте, у нас эти таски будут разнесены на разные модули И примерно везде мы будет ссылаться на директорию yellow-taxi-data, а как решается проблема, когда мы захотим поменять эту директорию? Нарпимре в аэрфлою, это можно сделать через Variables, есть ли в luigi что-нибудь готовое для этого?