Для таска insert_rate
также будем использовать PostgresOperator. Его задача — добавить запись о курсе валют в базу данных после успешного выполнения предыдущего таска get_rate
.
Вставка данных будет происходить через UPSERT. Нередко бывают сбои и чтобы не допустить дублирования данных все таски в Airflow должны быть идемпотентными. В случае с таском insert_rate
при обнаружении дубля по 3-м колонкам: base, currency и date, произойдёт обновление колонки rate вместо добавления ещё одной. Пример SQL запроса:
INSERT INTO
currency_exchange_rates
VALUES ('USD', 'KZT', 420.55, '2020-01-01')
ON CONFLICT (base, currency, date) DO
UPDATE
SET rate = excluded.rate;
Если в таблице currency_exchange_rates уже будет запись обменного курса USD/KZT за 1 января, то у этой строчки будет обновлена колонка rate на значение 420.55. Выражение excluded.rate
ссылается на значение обменного курса, предложенного для вставки (новое значение).
Код оператора:
insert_rate = PostgresOperator(
task_id='insert_rate',
postgres_conn_id='postgres_default',
sql='sql/insert_rate.sql',
params={
'base_currency': 'USD',
'currency': 'KZT',
}
)
Обратите внимание на аргумент params, он позволяет передавать значения в SQL-шаблон. Конкретно в этом случае значения у нас статичные, но в конце модуля будет пример с созданием динамических тасков, где это актуально. Параметры в шаблоне доступны через переменную params. К ним можно обратиться так:
{{ params.base_currency }}
{{ params.currency }}
А теперь давайте взглянем как выглядит файл с SQL запросом:
Код SQL шаблона:
INSERT INTO
currency_exchange_rates
VALUES ('{{ params.base_currency }}', '{{ params.currency }}', {{ ti.xcom_pull(task_ids="get_rate") }}, '{{ execution_date.strftime("%Y-%m-%d") }}')
ON CONFLICT (base, currency, date) DO
UPDATE
SET rate = excluded.rate;
Переменная execution_date вам уже хорошо знакома, с params тоже должно быть всё понятно. А что по поводу выражения?
{{ ti.xcom_pull(task_ids="get_rate") }}
Переменная ti
также как и execution_date
доступна в контексте выполнения DAG и ссылается на инстанс текущего таска (PostgresOperator), инстанс также доступен под названием task_instance
.
Более подробно про внутреннее устройство XCom и как с ним работать я писал в заметке Apache Airflow и XCom в блоге.
Да, верно, конкретно для PostgresOperator это и есть params с передачей словаря. Не у всех операторов он есть, также он может называться иначе.
спасибо
kgbt++ 21 Июнь 2023
в документации к PostgresOperator в списке параметров есть вот такая запись --- "parameters – (optional) the parameters to render the SQL query with." это тот же параметр(variable) params который вы прописываете? или переменную params можно прописать в любом операторе?