Шаг 3 из 3

insert_rate

Для таска insert_rate также будем использовать PostgresOperator. Его задача — добавить запись о курсе валют в базу данных после успешного выполнения предыдущего таска get_rate.

Вставка данных будет происходить через UPSERT. Нередко бывают сбои и чтобы не допустить дублирования данных все таски в Airflow должны быть идемпотентными. В случае с таском insert_rate при обнаружении дубля по 3-м колонкам: base, currency и date, произойдёт обновление колонки rate вместо добавления ещё одной. Пример SQL запроса:

INSERT INTO
    currency_exchange_rates
VALUES ('USD', 'KZT', 420.55, '2020-01-01')
ON CONFLICT (base, currency, date) DO
    UPDATE
        SET rate = excluded.rate;

Если в таблице currency_exchange_rates уже будет запись обменного курса USD/KZT за 1 января, то у этой строчки будет обновлена колонка rate на значение 420.55. Выражение excluded.rate ссылается на значение обменного курса, предложенного для вставки (новое значение).

Код оператора:

insert_rate = PostgresOperator(
    task_id='insert_rate',
    postgres_conn_id='postgres_default',
    sql='sql/insert_rate.sql',
    params={
        'base_currency': 'USD',
        'currency': 'KZT',
    }
)

Обратите внимание на аргумент params, он позволяет передавать значения в SQL-шаблон. Конкретно в этом случае значения у нас статичные, но в конце модуля будет пример с созданием динамических тасков, где это актуально. Параметры в шаблоне доступны через переменную params. К ним можно обратиться так:

{{ params.base_currency }} 
{{ params.currency }} 

А теперь давайте взглянем как выглядит файл с SQL запросом:

Код SQL шаблона:

INSERT INTO
    currency_exchange_rates
VALUES ('{{ params.base_currency }}', '{{ params.currency }}', {{ ti.xcom_pull(task_ids="get_rate") }}, '{{ execution_date.strftime("%Y-%m-%d") }}')
ON CONFLICT (base, currency, date) DO
    UPDATE
        SET rate = excluded.rate;

Переменная execution_date вам уже хорошо знакома, с params тоже должно быть всё понятно. А что по поводу выражения?

{{ ti.xcom_pull(task_ids="get_rate") }}

Переменная ti также как и execution_date доступна в контексте выполнения DAG и ссылается на инстанс текущего таска (PostgresOperator), инстанс также доступен под названием task_instance.

Более подробно про внутреннее устройство XCom и как с ним работать я писал в заметке Apache Airflow и XCom в блоге.

Комментарии

в документации к PostgresOperator в списке параметров есть вот такая запись --- "parameters – (optional) the parameters to render the SQL query with." это тот же параметр(variable) params который вы прописываете? или переменную params можно прописать в любом операторе?

Да, верно, конкретно для PostgresOperator это и есть params с передачей словаря. Не у всех операторов он есть, также он может называться иначе.

спасибо