Шаг 1 из 3

create_table

Первый таск из нашего пайплайна отвечает за создание таблицы, если её нет. В качестве базы данных используется PostgreSQL, можно поднять новый docker-образ базы либо использовать существующее соединение для Airflow. Если вы предпочитаете Docker, то достаточно выполнить команду:

docker run -d -p 15432:5432 \
-e POSTGRES_PASSWORD=password \
-e POSTGRES_USER=airflow_user \
-e POSTGRES_DB=airflow_examples \
-v postgres_airflow_example:/var/lib/postgresql/data postgres:13.1

В этом случае на хост будет прокинут порт 15432 на котором крутится БД Postgres. Имя пользователя, пароль и название БД можно указать по своему желанию.

В Admin → Connections необходимо обновить соединение postgres_default как показано на скрине:

Это соединение используется по умолчанию для всех Postgres операторов, безусловно можно создать новое и явно указывать его ID при создании оператора.

Для работы с Postgres в Apache Airflow есть PostgresOperator. Под капотом он использует библиотеку psycopg2 для работы с базой. Именно его мы будем использовать в двух из трёх наших операторах:

  1. create_table
  2. insert_rate

PostgresOperator может принимать запрос строкой либо читать его из файла. Я предпочитаю второй вариант, т.к. он делает код более читабельным без смешивания SQL и Python. Пример оператора с запросом строкой:

sql_task = PostgresOperator(
    task_id='sql_task_id',
    postgres_conn_id='postgres_default',
    sql='SELECT foo FROM bar;',
)

Через указание пути до SQL файла:

sql_task = PostgresOperator(
    task_id='sql_task_id',
    postgres_conn_id='postgres_default',
    sql='sql/select_query.sql',
)

Мы выбираем второй вариант, поэтому необходимо подготовить SQL запрос создания таблицы и сохранить его в файл create_table.sql:

CREATE TABLE IF NOT EXISTS currency_exchange_rates (
    base VARCHAR(3) NOT NULL,
    currency VARCHAR(3) NOT NULL,
    rate NUMERIC(12, 3) NOT NULL,
    date DATE NOT NULL,
    UNIQUE (base, currency, date)
);

При каждом запуске DAG будет выполняться таск create_table, поэтому в SQL выражении есть конструкция IF NOT EXISTS, которая означает, что таблица должна быть создана ТОЛЬКО, если её нет. То есть при повторном выполнении этого таска Postgres проигнорирует этот запрос, т.к. таблица уже будет существовать.

Таблица currency_exchange_rates:

  1. base — хранится код базовой валюты
  2. currency — хранится код конвертируемой валюты
  3. rate — обменный курс между currency и base, т.е. стоимость base в currency
  4. date — дата

Свойство UNIQUE по трём столбцам нам необходимо, чтобы делать вставку через UPSERT. Если по какой-то причине таск с одной и той же датой будет запущен несколько раз нужно не допустить дублирования. Более подробно про UPSERT в PostgreSQL можно почитать здесь.

Итак, всё готово для создания первого оператора:

create_table = PostgresOperator(
    task_id='create_table_task',
    sql='sql/create_table.sql',
    postgres_conn_id='postgres_default',
)

Путь SQL файла необходимо указывать относительно директории, где лежит сам DAG. В данном случае папка sql лежит в одной директории с кодом DAGа. Аргумент postgres_conn_id можно не указывать, т.к. по умолчанию он будет равен postgres_default, но я предпочитаю явно это делать.

Комментарии

2. currency Опечатка. Код конвертируемой валютЫ

Добрый день! На скрине с Connections: HOST - airflow_user Это корректно?

Нет. Автор ошибся

Какой хост тогда будет корректный? Указываю localhost - получаю ошибку psycopg2.OperationalError: could not connect to server: Connection refused Is the server running on host "localhost" and accepting TCP/IP connections on port 15432? При это ДБивером к бд подключаюсь

Попробуй явно указать ip

Увы, снова получаю psycopg2.OperationalError: could not connect to server: Connection refused Is the server running on host "127.0.0.1" and accepting TCP/IP connections on port 15432? Контейнер перезапускал, переделывал - результат тот же.

Крайне вероятно что у докера другой ip

насчет хоста: тоже не оч понимала что туда вписывать, но я пользовалась докерами и помогло host.docker.internal вместо localhost

Здравствуйте! Подскажите как проверить что созданный коннектор работает? Дело в том, что я создал свой коннектор к PostgreSQL. Host = localhost Schema = stg Login = uyagr Password = pass1 Port = 5432 В моем инстансе постгреса несколько баз. Где прописывать базу не нашел. Подскажите как это сделать?

Поясните пожалуйста про коннекторы с Conn Type:=Amazon WebServises. Дело в том, что почти у всех коннекторов, если провалиться в него указан амазоновский коннектор, но когда нахожусь на страничке http://127.0.0.1:8080/connection/list/, где указан список имеющихся коннекторов, Conn Type другой. Например Conn Id = mssql_default или Conn Id = mongo_default. К чему я могу подключиться используя приведенные выше коннекторы?