Задача следующего в цепочке таска — сгруппировать данные по полю и посчитать метрики, а далее сохранить результат на диск. Как я ранее упоминал, группироваться будем по полю pickup_date
и считать сумму чаевых и общую стоимость всех поездок. Оговорюсь, что для работы данными я использую библиотеку pandas. Так или иначе при работе с данными в экосистеме Python вам придётся с ней подружиться. В начале 2017 года я писал вводную статью про анализ данных в pandas, её достаточно, чтобы понять код таска.
Код группировки:
def group_by_pickup_date(
file_object, group_by='pickup_date', metrics: List[str] = None
) -> pd.DataFrame:
if metrics is None:
metrics = ['tip_amount', 'total_amount']
df = pd.read_csv(file_object)
df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
df['pickup_date'] = df['tpep_pickup_datetime'].dt.strftime('%Y-%m-%d')
df = df.groupby(group_by)[metrics].sum().reset_index()
return df
Точнее возвращать не сам результат аггрегации, а лишь ссылку на этот результат, т.е. в простейшем случае, что будет ссылка на файлик
Извиняюсь, в таске сделано через файлик, получается агрегация не будет выполняться дважды, если файлик уже лежит
Да, всё верно. Функцию вызывает Luigi Task и он же и сохраняет результат.
Slava 21 Август 2022
А нет такого, что эта функция не должна ничего возвращать? Ну например, если аггрегация прошла успешна, df получился, лучше результат этой работы сразу сохранить. И тогда если следующая задача отвалится, то при повторном использовании, аггрегировать заново уже не надо будет