Шаг 4 из 10

download_file

Следующий оператор — PythonOperator с task_id download_file. Его задача загрузить файл с веб-сайта и уложить в S3 в сжатом виде (будем использовать gzip).

PythonOperator принимает объект типа callable, который будет выполнен, поэтому загрузку файла мы вынесем в отдельную функцию. Вот код этой функции:

def download_dataset(year_month: str):
    url = (
        f'https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_{year_month}.csv'
    )
    response = requests.get(url, stream=True)
    response.raise_for_status()

    s3 = S3Hook('aws_connection_id')

    s3_path = f's3://nyc-yellow-taxi-raw-data/yellow_tripdata_{year_month}.csv.gz'
    bucket, key = s3.parse_s3_url(s3_path)

    with NamedTemporaryFile('w', encoding='utf-8', delete=False) as f:
        for chunk in response.iter_lines():
            f.write('{}\n'.format(chunk.decode('utf-8')))
    s3.load_file(f.name, key, bucket, replace=True, gzip=True)

    return s3_path

Я предпочитаю выносить эту логику в отдельный модуль или даже пакет (если одного модуля недостаточно), поэтому в репозитории есть модуль functions.py.

Функция download_dataset принимает 1 аргумент, это строка вида 2020-12: год и месяц. Скачивание файла производится пакетом requests. Для работы с S3 я использую готовый Hook из Airflow — S3Hook.

Хуки это внешние интерфейсы для работы с различными сервисами: базы данных, внешние API ресурсы, распределенные хранилища типа S3, redis, memcached и т.д. Хуки являются строительными блоками операторов и берут на себя всю логику по взаимодействию с хранилищем конфигов и доступов. Используя хуки можно забыть про головную боль с хранением секретной информации в коде (например, паролей).

S3Hook даёт разработчику высокоуровневый интерфейс для работы с AWS S3, а под капотом использует библиотеку boto3. Самый главный аргумент, который нужно передать S3Hook — aws_conn_id. Соединение также необходимо создать через раздел Admin → Connections.

В коде прописан путь до S3 объекта, куда будет сохранён загружаемый файл с данными. Далее с веб-ресурса файл сохраняется во временный файл и загружается на S3 с использованием gzip=True. Функция возвращает путь до S3 объекта, которым воспользуется следующий оператор.

Демонстрацию создания бакетов и доступов в Apache Airflow смотрите в следующем видео.

Комментарии

Используя хуки можно забыть про головную боль с хранением секретной информации в коде (например, паролей). А можно, пож-та, развернуть мысль про то, как мы забудем головную боль с хранением паролей?

Поддерживаю вопрос, тоже не понял пока

В этом случае хуки могут тянуть креды напрямую из хранилища Airflow. Хотя ничто не мешает это делать явно если вы реализуете вне хуков.

Здравствуйте. мы указали delete=False в NamedTemporaryFile. когда нужно удалять временные файлы из /tmp ? делать отдельный dag с bash оператором и командой rm?

Необязательно делать даг, можно удаление реализовать отдельным финальным оператором.