Первый таск из нашего пайплайна отвечает за создание таблицы, если её нет. В качестве базы данных используется PostgreSQL, можно поднять новый docker-образ базы либо использовать существующее соединение для Airflow. Если вы предпочитаете Docker, то достаточно выполнить команду:
docker run -d -p 15432:5432 \
-e POSTGRES_PASSWORD=password \
-e POSTGRES_USER=airflow_user \
-e POSTGRES_DB=airflow_examples \
-v postgres_airflow_example:/var/lib/postgresql/data postgres:13.1
В этом случае на хост будет прокинут порт 15432 на котором крутится БД Postgres. Имя пользователя, пароль и название БД можно указать по своему желанию.
В Admin → Connections необходимо обновить соединение postgres_default
как показано на скрине:
Это соединение используется по умолчанию для всех Postgres операторов, безусловно можно создать новое и явно указывать его ID при создании оператора.
Для работы с Postgres в Apache Airflow есть PostgresOperator. Под капотом он использует библиотеку psycopg2 для работы с базой. Именно его мы будем использовать в двух из трёх наших операторах:
PostgresOperator может принимать запрос строкой либо читать его из файла. Я предпочитаю второй вариант, т.к. он делает код более читабельным без смешивания SQL и Python. Пример оператора с запросом строкой:
sql_task = PostgresOperator(
task_id='sql_task_id',
postgres_conn_id='postgres_default',
sql='SELECT foo FROM bar;',
)
Через указание пути до SQL файла:
sql_task = PostgresOperator(
task_id='sql_task_id',
postgres_conn_id='postgres_default',
sql='sql/select_query.sql',
)
Мы выбираем второй вариант, поэтому необходимо подготовить SQL запрос создания таблицы и сохранить его в файл create_table.sql
:
CREATE TABLE IF NOT EXISTS currency_exchange_rates (
base VARCHAR(3) NOT NULL,
currency VARCHAR(3) NOT NULL,
rate NUMERIC(12, 3) NOT NULL,
date DATE NOT NULL,
UNIQUE (base, currency, date)
);
При каждом запуске DAG будет выполняться таск create_table
, поэтому в SQL выражении есть конструкция IF NOT EXISTS
, которая означает, что таблица должна быть создана ТОЛЬКО, если её нет. То есть при повторном выполнении этого таска Postgres проигнорирует этот запрос, т.к. таблица уже будет существовать.
Таблица currency_exchange_rates:
Свойство UNIQUE по трём столбцам нам необходимо, чтобы делать вставку через UPSERT. Если по какой-то причине таск с одной и той же датой будет запущен несколько раз нужно не допустить дублирования. Более подробно про UPSERT в PostgreSQL можно почитать здесь.
Итак, всё готово для создания первого оператора:
create_table = PostgresOperator(
task_id='create_table_task',
sql='sql/create_table.sql',
postgres_conn_id='postgres_default',
)
Путь SQL файла необходимо указывать относительно директории, где лежит сам DAG. В данном случае папка sql лежит в одной директории с кодом DAGа. Аргумент postgres_conn_id
можно не указывать, т.к. по умолчанию он будет равен postgres_default
, но я предпочитаю явно это делать.
Добрый день! На скрине с Connections: HOST - airflow_user Это корректно?
Нет. Автор ошибся
Какой хост тогда будет корректный? Указываю localhost - получаю ошибку psycopg2.OperationalError: could not connect to server: Connection refused Is the server running on host "localhost" and accepting TCP/IP connections on port 15432? При это ДБивером к бд подключаюсь
Попробуй явно указать ip
Увы, снова получаю psycopg2.OperationalError: could not connect to server: Connection refused Is the server running on host "127.0.0.1" and accepting TCP/IP connections on port 15432? Контейнер перезапускал, переделывал - результат тот же.
Крайне вероятно что у докера другой ip
насчет хоста: тоже не оч понимала что туда вписывать, но я пользовалась докерами и помогло host.docker.internal вместо localhost
Здравствуйте! Подскажите как проверить что созданный коннектор работает? Дело в том, что я создал свой коннектор к PostgreSQL. Host = localhost Schema = stg Login = uyagr Password = pass1 Port = 5432 В моем инстансе постгреса несколько баз. Где прописывать базу не нашел. Подскажите как это сделать?
Поясните пожалуйста про коннекторы с Conn Type:=Amazon WebServises. Дело в том, что почти у всех коннекторов, если провалиться в него указан амазоновский коннектор, но когда нахожусь на страничке http://127.0.0.1:8080/connection/list/, где указан список имеющихся коннекторов, Conn Type другой. Например Conn Id = mssql_default или Conn Id = mongo_default. К чему я могу подключиться используя приведенные выше коннекторы?
Slava 10 Апрель 2022
2. currency Опечатка. Код конвертируемой валютЫ