Введение в Data Engineering: дата-пайплайны: Практические задачи построения пайплайнов / Параллельное выполнение тасков
Итак, что необходимо, чтобы написать задачу, которая будет запускать пайплайны по переданному периоду месяцев?
WrapperTask
, и переопределим метод requires
%Y-%m
, для этого в luigi есть параметр MonthParameter.requires
в цикле мы будем формировать год-месяц по которому необходимо скачать данные, агрегировать и уложить в БД SQLite.Вот как это выглядит:
import luigi
from dateutil.relativedelta import relativedelta
class YellowTaxiDateRangeTask(luigi.WrapperTask):
start = luigi.MonthParameter()
stop = luigi.MonthParameter()
def requires(self):
current_month = self.start
while current_month <= self.stop:
yield CopyTaxiTripData2SQLite(date=current_month)
current_month += relativedelta(months=1)
P.S. В тасках нашего пайплайна я заменил параметры year
и month
на 1 параметр date
типа MonthParameter
.
Urev Oleg 4 Октябрь 2022
Использование WrapperTask лучше\хуже\совсем по-другому, чем class CopyTaxiTripData2SQLite(CopyToTable): years = luigi.ListParameter() months = luigi.ListParameter() def requires(self): return [DownloadTaxiTripTask(year, month) for year in self.years for month in self.months] Или такой запуск и зависимость от только от одного : @requires(DownloadTaxiTripTask) class CopyTaxiTripData2SQLite(CopyToTable): year = luigi.IntParameter() month = luigi.IntParameter() ...... luigi.build(tasks=[CopyTaxiTripData2SQLite(year=2022, month=1), CopyTaxiTripData2SQLite(year=2022, month=2), CopyTaxiTripData2SQLite(year=2022, month=3), CopyTaxiTripData2SQLite(year=2022, month=4), CopyTaxiTripData2SQLite(year=2022, month=5), CopyTaxiTripData2SQLite(year=2022, month=6), CopyTaxiTripData2SQLite(year=2022, month=7), CopyTaxiTripData2SQLite(year=2022, month=8), CopyTaxiTripData2SQLite(year=2022, month=9), CopyTaxiTripData2SQLite(year=2022, month=10)], workers=5, local_scheduler=True)