Шаг 3 из 9

AggregateTaxiTripTask

Задача следующего в цепочке таска — сгруппировать данные по полю и посчитать метрики, а далее сохранить результат на диск. Как я ранее упоминал, группироваться будем по полю pickup_date и считать сумму чаевых и общую стоимость всех поездок. Оговорюсь, что для работы данными я использую библиотеку pandas. Так или иначе при работе с данными в экосистеме Python вам придётся с ней подружиться. В начале 2017 года я писал вводную статью про анализ данных в pandas, её достаточно, чтобы понять код таска.

Код группировки:

def group_by_pickup_date(
    file_object, group_by='pickup_date', metrics: List[str] = None
) -> pd.DataFrame:
    if metrics is None:
        metrics = ['tip_amount', 'total_amount']

    df = pd.read_csv(file_object)
    df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
    df['pickup_date'] = df['tpep_pickup_datetime'].dt.strftime('%Y-%m-%d')
    df = df.groupby(group_by)[metrics].sum().reset_index()
    return df

Комментарии

А нет такого, что эта функция не должна ничего возвращать? Ну например, если аггрегация прошла успешна, df получился, лучше результат этой работы сразу сохранить. И тогда если следующая задача отвалится, то при повторном использовании, аггрегировать заново уже не надо будет

Точнее возвращать не сам результат аггрегации, а лишь ссылку на этот результат, т.е. в простейшем случае, что будет ссылка на файлик

Извиняюсь, в таске сделано через файлик, получается агрегация не будет выполняться дважды, если файлик уже лежит

Да, всё верно. Функцию вызывает Luigi Task и он же и сохраняет результат.