Шаг 1 из 1

Hook это классы для работы с внешними сервисами, их можно считать своего рода кирпичиками на которых строятся операторы. Операторы, которые взаимодействуют со сторонними системами, делают это через свои хуки. Например, PostgresOperator использует PostgresHook для запросов в базу, а для работы с S3 мы уже использовали S3Hook.

Почему стоит писать свой Hook? Почему бы весь код работы с сервисом не написать в кастомном Operator классе?

Две основные, на мой взгляд, причины:

  1. Разделение ответственности или Single Responsibility Principle
  2. Работа с доступами в Airflow

Чтобы реализовать свой хук-класс необходимо наследоваться от базового класса BaseHook. У этого класса есть методы для работы с хранилищем доступов (метод get_connection). Ему нужно передать ваш connection_id и он вернёт все настройки по нему.

Для получения курсов валют мы будем использовать сервис CurrencyScoop, endpoint на который будем слать запросы: https://api.currencyscoop.com/v1/historical

Он принимает несколько параметров:

  • base — базовая валюта
  • symbols — список валют в которых выражается стоимость базовой валюты
  • date — дата обменного курса

Также ко всем API запросам необходимо дополнительно передавать параметр api_key с ключом доступа. Ключ мы как обычно будем хранить в базе данных Airflow и получать его по названию connection_id.

Для отправки запросов к API я буду использовать библиотеку requests. Если вы не работали с ней, то обратите внимание на мою заметку в блоге по работе с HTTP в Python.

Для начала необходимо создать соединение в Admin → Connection, название cur_scoop_conn_id:

Код хука:

import requests
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook

class CurrencyScoopHook(BaseHook):

    def __init__(self, currency_conn_id: str):
        super().__init__()
        self.conn_id = currency_conn_id

    def get_rate(self, date, base_currency: str, currency: str):
        url = 'https://api.currencyscoop.com/v1/historical'
        params = {
            'base': base_currency.upper(),
            'symbols': currency.upper(),
            'api_key': self._get_api_key(),
            'date': str(date),
        }
        response = requests.get(url, params=params)
        response.raise_for_status()
        return response.json()['response']['rates'][currency]

    def _get_api_key(self):
        conn = self.get_connection(self.conn_id)
        if not conn.password:
            raise AirflowException('Missing API key (password) in connection settings')
        return conn.password

Единственный параметр, который принимает класс CurrencyScoopHook при инициализации — currency_conn_id. У класса есть 1 публичный метод — get_rate, он возвращает стоимость base_currency в валюте currency на момент даты date.

Комментарии

А урл в угловых скобках точно нужно писать? У меня с угловыми падало с ошибкой, когда убрал их - всё взлетело и заколосилось =) url = 'https://api.currencyscoop.com/v1/historical'

Пардон, безусловно без скобок. Это какая-то проблема при копировании данных из Notion. Изначально уроки писал в Notion. Спасибо!

Несовсем понятно как правильно организовать структурую. Создавать папку с своими Hooks. Или тупо бросать в текущую папку, помойму это некрасиво?? Поправьте пожалуйста, если я не прав и скажите как правильно.

Если речь про конкретно этот пример, то у меня хук лежит рядом с оператором в пакете с DAGом: https://github.com/adilkhash/apache-airflow-course-materials/tree/master/dags/exchange_rates. Если вы спрашиваете в целом про хорошую практику работы, то зависит от того переиспользуется ли хук в других операторах или нет. Если переиспользуется, то хуки можно вынести в отдельный пакет и подключать через sys.path или plugins либо вообще сделать отдельными зависимостями.