Чтобы создать свой оператор необходимо наследоваться от базового класса BaseOperator:
from airflow.models.baseoperator import BaseOperator
class CurrencyScoopOperator(BaseOperator):
pass
Все стандартные операторы Airflow учитывают параметры DAG в котором они вызываются. Ранее я уже писал о том, что настройки в default_args
, передаваемые инстансу DAG, наследуется всеми операторами. Чтобы сохранить такое поведение, необходимо метод __init__
оператора обернуть в декоратор apply_defaults
:
from airflow.utils.decorators import apply_defaults
from airflow.models.baseoperator import BaseOperator
class CurrencyScoopOperator(BaseOperator):
@apply_defaults
def __init__(
self,
base_currency: str,
currency: str,
conn_id: str = 'currency_scoop_conn_id',
**kwargs) -> None:
super().__init__(**kwargs)
self.conn_id = conn_id
self.base_currency = base_currency
self.currency = currency
Оператор дополнительно будет принимать 3 аргумента:
Логику работы оператора необходимо описать в методе execute
, который принимает всего 1 аргумент — контекст.
Полный код оператора:
from typing import Any
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
from .hook import CurrencyScoopHook
class CurrencyScoopOperator(BaseOperator):
@apply_defaults
def __init__(
self,
base_currency: str,
currency: str,
conn_id: str = 'currency_scoop_conn_id',
**kwargs) -> None:
super().__init__(**kwargs)
self.conn_id = conn_id
self.base_currency = base_currency
self.currency = currency
def execute(self, context: Any):
api = CurrencyScoopHook(self.conn_id)
return api.get_rate(context['execution_date'].date(), self.base_currency, self.currency)
Контекст нам нужен для получения даты выполнения DAGа. В методе execute
мы создаём инстанс нашего Hook и вызываем метод получения курса валюты. Напомню, что если метод execute
у оператора возвращает значение, оно автоматически записывается в XCom.
адексей 25 Июль 2024
Не понимаю в какой папке и где это создавать Как хук так и оператор