Step 3 of 5

Итак, что необходимо, чтобы написать задачу, которая будет запускать пайплайны по переданному периоду месяцев?

  • новый таск наследуем от WrapperTask, и переопределим метод requires
  • таск-обёртка будет принимать 2 аргумента:
    • start — период начала, в формате %Y-%m, для этого в luigi есть параметр MonthParameter.
    • stop — окончание периода (включительно).
  • в методе requires в цикле мы будем формировать год-месяц по которому необходимо скачать данные, агрегировать и уложить в БД SQLite.

Вот как это выглядит:

import luigi
from dateutil.relativedelta import relativedelta


class YellowTaxiDateRangeTask(luigi.WrapperTask):
    start = luigi.MonthParameter()
    stop = luigi.MonthParameter()

    def requires(self):
        current_month = self.start
        while current_month <= self.stop:
            yield CopyTaxiTripData2SQLite(date=current_month)
            current_month += relativedelta(months=1)

P.S. В тасках нашего пайплайна я заменил параметры year и month на 1 параметр date типа MonthParameter.

Comments

Использование WrapperTask лучше\хуже\совсем по-другому, чем class CopyTaxiTripData2SQLite(CopyToTable): years = luigi.ListParameter() months = luigi.ListParameter() def requires(self): return [DownloadTaxiTripTask(year, month) for year in self.years for month in self.months] Или такой запуск и зависимость от только от одного : @requires(DownloadTaxiTripTask) class CopyTaxiTripData2SQLite(CopyToTable): year = luigi.IntParameter() month = luigi.IntParameter() ...... luigi.build(tasks=[CopyTaxiTripData2SQLite(year=2022, month=1), CopyTaxiTripData2SQLite(year=2022, month=2), CopyTaxiTripData2SQLite(year=2022, month=3), CopyTaxiTripData2SQLite(year=2022, month=4), CopyTaxiTripData2SQLite(year=2022, month=5), CopyTaxiTripData2SQLite(year=2022, month=6), CopyTaxiTripData2SQLite(year=2022, month=7), CopyTaxiTripData2SQLite(year=2022, month=8), CopyTaxiTripData2SQLite(year=2022, month=9), CopyTaxiTripData2SQLite(year=2022, month=10)], workers=5, local_scheduler=True)