Следующий оператор — PythonOperator
с task_id download_file
. Его задача загрузить файл с веб-сайта и уложить в S3 в сжатом виде (будем использовать gzip).
PythonOperator принимает объект типа callable, который будет выполнен, поэтому загрузку файла мы вынесем в отдельную функцию. Вот код этой функции:
def download_dataset(year_month: str):
url = (
f'https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_{year_month}.csv'
)
response = requests.get(url, stream=True)
response.raise_for_status()
s3 = S3Hook('aws_connection_id')
s3_path = f's3://nyc-yellow-taxi-raw-data/yellow_tripdata_{year_month}.csv.gz'
bucket, key = s3.parse_s3_url(s3_path)
with NamedTemporaryFile('w', encoding='utf-8', delete=False) as f:
for chunk in response.iter_lines():
f.write('{}\n'.format(chunk.decode('utf-8')))
s3.load_file(f.name, key, bucket, replace=True, gzip=True)
return s3_path
Я предпочитаю выносить эту логику в отдельный модуль или даже пакет (если одного модуля недостаточно), поэтому в репозитории есть модуль functions.py.
Функция download_dataset
принимает 1 аргумент, это строка вида 2020-12
: год и месяц. Скачивание файла производится пакетом requests. Для работы с S3 я использую готовый Hook из Airflow — S3Hook.
Хуки это внешние интерфейсы для работы с различными сервисами: базы данных, внешние API ресурсы, распределенные хранилища типа S3, redis, memcached и т.д. Хуки являются строительными блоками операторов и берут на себя всю логику по взаимодействию с хранилищем конфигов и доступов. Используя хуки можно забыть про головную боль с хранением секретной информации в коде (например, паролей).
S3Hook даёт разработчику высокоуровневый интерфейс для работы с AWS S3, а под капотом использует библиотеку boto3. Самый главный аргумент, который нужно передать S3Hook — aws_conn_id
. Соединение также необходимо создать через раздел Admin → Connections.
В коде прописан путь до S3 объекта, куда будет сохранён загружаемый файл с данными. Далее с веб-ресурса файл сохраняется во временный файл и загружается на S3 с использованием gzip=True. Функция возвращает путь до S3 объекта, которым воспользуется следующий оператор.
Демонстрацию создания бакетов и доступов в Apache Airflow смотрите в следующем видео.
Поддерживаю вопрос, тоже не понял пока
В этом случае хуки могут тянуть креды напрямую из хранилища Airflow. Хотя ничто не мешает это делать явно если вы реализуете вне хуков.
Здравствуйте. мы указали delete=False в NamedTemporaryFile. когда нужно удалять временные файлы из /tmp ? делать отдельный dag с bash оператором и командой rm?
Необязательно делать даг, можно удаление реализовать отдельным финальным оператором.
Dmitry Kosarevsky 27 Ноябрь 2021
Используя хуки можно забыть про головную боль с хранением секретной информации в коде (например, паролей). А можно, пож-та, развернуть мысль про то, как мы забудем головную боль с хранением паролей?