Step 2 of 9

DownloadTaxiTripTask

Задача этого таска скачать датасет и сохранить на локальном диске в папку yellow-taxi-data. В качестве Target класса будем использовать знакомый нам LocalTarget, а также принимать на вход 2 параметра:

  1. year
  2. month

Для работы с HTTP в Python по традиции используют библиотеку requests. У неё простой и удобный интерфейс. Если вы никогда ранее не работали с ней, рекомендую ознакомиться с моей статьёй Руководство по работе с HTTP в Python. Библиотека requests.

Код загрузки файла выглядит следующим образом:

import requests

def download_dataset(filename: str) -> requests.Response:
    url = f'https://s3.amazonaws.com/nyc-tlc/trip+data/{filename}'
    response = requests.get(url, stream=True)
    response.raise_for_status()
    return response

Файлы большие, например, данные за июнь 2019 года весят ~620 МБ в CSV, поэтому скачивание идёт в потоковом режиме. Так как работа идёт с текстовыми данными, удобно считывать их построчно и сразу писать в открытый файл на диск (тем самым экономя оперативную память).

Вот так выглядит код таска:

def get_filename(year: int, month: int) -> str:
    return f'yellow_tripdata_{year}-{month:02}.csv'


class DownloadTaxiTripTask(luigi.Task):
    year = luigi.IntParameter()
    month = luigi.IntParameter()

    @property
    def filename(self):
        return get_filename(self.year, self.month)

    def run(self):

        self.output().makedirs()  # in case path does not exist
        response = download_dataset(self.filename)

        with self.output().open(mode='w') as f:
            for chunk in response.iter_lines():
                f.write('{}\n'.format(chunk.decode('utf-8')))

    def output(self):
        return luigi.LocalTarget(os.path.join('yellow-taxi-data', self.filename))

У таска нет зависимостей, т.к. он является первым во всей цепочке задач (пайплайне). В качестве output используется LocalTarget. То есть результатом выполнения является csv-файл в директории yellow-taxi-data.

Метод run я постарался сделать максимально читабельным:

  • перед стартом идёт проверка существует ли папка yellow-taxi-data, если нет, то таск её создаст
  • следом вызывается функция скачивания файла
  • далее открывается файл, указанный в LocalTarget, в режиме записи и результат ответа построчно записывается в этот файл

Comments

В более менее большом проекте, у нас эти таски будут разнесены на разные модули И примерно везде мы будет ссылаться на директорию yellow-taxi-data, а как решается проблема, когда мы захотим поменять эту директорию? Нарпимре в аэрфлою, это можно сделать через Variables, есть ли в luigi что-нибудь готовое для этого?

Если речь о том, чтобы иметь возможность менять через UI, то здесь нет такого как в Airflow. Но вполне возможно прописывать пути через переменные окружения.

На текущий момент, данные не доступны по указанной ссылке https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-01.csv

Но доступны паркеты - https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-06.parquet

Спасибо за коммент про обновленные датасеты, обновлю материал.

Здравствуйте. данные стали доступны в формате .parquet получилось скачать добавив экземпляр класса NopFormat, т.к. иначе бинарный файл не пишется, в LocalTarget нет режима записи wb, надо передавать параметр format=Nop (+ использовал iter_content()) ... from luigi.format import Nop ... for chunk in response.iter_content(chunk_size=512): f.write(chunk) def output(self): return luigi.LocalTarget(os.path.join('yellow-taxi-data', self.filename), format=Nop) вопрос: какой размер chunk_size лучше использовать? для iter_lines по умолчанию стоит chunk_size=512, а для iter_content по умолчанию chunk_size=1 и скачивается долго, какой размер оптимальный при потоковом скачивании бинарных и обычных строк?