Шаг 1 из 1

Чтобы создать свой оператор необходимо наследоваться от базового класса BaseOperator:

from airflow.models.baseoperator import BaseOperator

class CurrencyScoopOperator(BaseOperator):
    pass

Все стандартные операторы Airflow учитывают параметры DAG в котором они вызываются. Ранее я уже писал о том, что настройки в default_args, передаваемые инстансу DAG, наследуется всеми операторами. Чтобы сохранить такое поведение, необходимо метод __init__ оператора обернуть в декоратор apply_defaults:

from airflow.utils.decorators import apply_defaults
from airflow.models.baseoperator import BaseOperator

class CurrencyScoopOperator(BaseOperator):

    @apply_defaults
    def __init__(
            self,
            base_currency: str,
            currency: str,
            conn_id: str = 'currency_scoop_conn_id',
            **kwargs) -> None:
        super().__init__(**kwargs)
        self.conn_id = conn_id
        self.base_currency = base_currency
        self.currency = currency

Оператор дополнительно будет принимать 3 аргумента:

  1. base_currency — базовый код валюты
  2. currency — валюта в которой отражается стоимость базовой
  3. conn_id — ключ к настройкам доступа

Логику работы оператора необходимо описать в методе execute , который принимает всего 1 аргумент — контекст.

Полный код оператора:

from typing import Any

from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults

from .hook import CurrencyScoopHook

class CurrencyScoopOperator(BaseOperator):

    @apply_defaults
    def __init__(
            self,
            base_currency: str,
            currency: str,
            conn_id: str = 'currency_scoop_conn_id',
            **kwargs) -> None:
        super().__init__(**kwargs)
        self.conn_id = conn_id
        self.base_currency = base_currency
        self.currency = currency

    def execute(self, context: Any):
        api = CurrencyScoopHook(self.conn_id)
        return api.get_rate(context['execution_date'].date(), self.base_currency, self.currency)

Контекст нам нужен для получения даты выполнения DAGа. В методе execute мы создаём инстанс нашего Hook и вызываем метод получения курса валюты. Напомню, что если метод execute у оператора возвращает значение, оно автоматически записывается в XCom.

Комментарии