Hook это классы для работы с внешними сервисами, их можно считать своего рода кирпичиками на которых строятся операторы. Операторы, которые взаимодействуют со сторонними системами, делают это через свои хуки. Например, PostgresOperator использует PostgresHook для запросов в базу, а для работы с S3 мы уже использовали S3Hook.
Почему стоит писать свой Hook? Почему бы весь код работы с сервисом не написать в кастомном Operator классе?
Две основные, на мой взгляд, причины:
Чтобы реализовать свой хук-класс необходимо наследоваться от базового класса BaseHook. У этого класса есть методы для работы с хранилищем доступов (метод get_connection
). Ему нужно передать ваш connection_id и он вернёт все настройки по нему.
Для получения курсов валют мы будем использовать сервис CurrencyScoop, endpoint на который будем слать запросы: https://api.currencyscoop.com/v1/historical
Он принимает несколько параметров:
Также ко всем API запросам необходимо дополнительно передавать параметр api_key
с ключом доступа. Ключ мы как обычно будем хранить в базе данных Airflow и получать его по названию connection_id.
Для отправки запросов к API я буду использовать библиотеку requests. Если вы не работали с ней, то обратите внимание на мою заметку в блоге по работе с HTTP в Python.
Для начала необходимо создать соединение в Admin → Connection, название cur_scoop_conn_id
:
Код хука:
import requests
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
class CurrencyScoopHook(BaseHook):
def __init__(self, currency_conn_id: str):
super().__init__()
self.conn_id = currency_conn_id
def get_rate(self, date, base_currency: str, currency: str):
url = 'https://api.currencyscoop.com/v1/historical'
params = {
'base': base_currency.upper(),
'symbols': currency.upper(),
'api_key': self._get_api_key(),
'date': str(date),
}
response = requests.get(url, params=params)
response.raise_for_status()
return response.json()['response']['rates'][currency]
def _get_api_key(self):
conn = self.get_connection(self.conn_id)
if not conn.password:
raise AirflowException('Missing API key (password) in connection settings')
return conn.password
Единственный параметр, который принимает класс CurrencyScoopHook при инициализации — currency_conn_id
. У класса есть 1 публичный метод — get_rate
, он возвращает стоимость base_currency
в валюте currency
на момент даты date
.
Пардон, безусловно без скобок. Это какая-то проблема при копировании данных из Notion. Изначально уроки писал в Notion. Спасибо!
Несовсем понятно как правильно организовать структурую. Создавать папку с своими Hooks. Или тупо бросать в текущую папку, помойму это некрасиво?? Поправьте пожалуйста, если я не прав и скажите как правильно.
Если речь про конкретно этот пример, то у меня хук лежит рядом с оператором в пакете с DAGом: https://github.com/adilkhash/apache-airflow-course-materials/tree/master/dags/exchange_rates. Если вы спрашиваете в целом про хорошую практику работы, то зависит от того переиспользуется ли хук в других операторах или нет. Если переиспользуется, то хуки можно вынести в отдельный пакет и подключать через sys.path или plugins либо вообще сделать отдельными зависимостями.
Dmitry Kosarevsky 16 Январь 2022
А урл в угловых скобках точно нужно писать? У меня с угловыми падало с ошибкой, когда убрал их - всё взлетело и заколосилось =) url = 'https://api.currencyscoop.com/v1/historical'