Шаг 1 из 1

У нас успешно получилось реализовать пайплайн по загрузке курсов валют в базу. Я лично пользуюсь подобным пайплайном для загрузки ряда интересующих меня валют на ежедневной основе (а также котировок акций). Одна валюта это прикольно, а что если мы хотим грузить список таких валют?

Неужели придётся создавать на каждую валюту свой пайплайн? Это один из вариантов решения. Но такой подход нарушает принцип DRY (Do not Repeat Yourself), т.е. мы плодим один и тот же код. А что если сгенерировать на каждую валюту свою цепочку тасков?

В нашем пайплайне есть первый таск, отвечающий за создание таблицы, у всех валют он будет один и тот же. А вот последующие 2 отличаются. Можно заранее определить список валютных пар (base и currency) и в цикле генерировать 2 таска по получению курса и укладке в таблицу. В итоге должен получиться подобный граф:

А вот как выглядит код для такого пайплайна:

from datetime import datetime

from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator

from .operator import CurrencyScoopOperator

with DAG(
        dag_id='exchange_rate_dynamic',
        start_date=datetime(2021, 3, 1),
        schedule_interval='@daily',
        catchup=False,
) as dag:

    create_table = PostgresOperator(
        task_id='create_table_task',
        sql='sql/create_table.sql',
        postgres_conn_id='postgres_default',
    )

    tasks = []

    for base, currency in [
        ('USD', 'KZT'),
        ('USD', 'RUB'),
        ('USD', 'EUR'),
        ('KZT', 'RUB'),
        ('RUB', 'KZT'),
        ('EUR', 'KZT'),
        ('EUR', 'RUB'),
    ]:
        get_rate_task = CurrencyScoopOperator(
            task_id=f'get_rate_{ base }_{ currency }',
            base_currency=base,
            currency=currency,
            conn_id='cur_scoop_conn_id',
            dag=dag,
            do_xcom_push=True,
        )

        insert_rate = PostgresOperator(
            task_id=f'insert_rate_{ base }_{ currency }',
            postgres_conn_id='postgres_default',
            sql='sql/insert_rate.sql',
            params={
                'base_currency': base,
                'currency': currency,
                'get_rate_task_id': f'get_rate_{ base }_{ currency }'
            }
        )

        get_rate_task >> insert_rate

        tasks.append(get_rate_task)

    create_table.set_downstream(tasks)

Обратите внимание, что в таске insert_rate я дополнительно передаю параметр с названием таска, т.к. они динамические, то и их ID будет отличаться. Это название необходимо для получения курса валют в XCom. В связи с этим есть незначительные изменения и в шаблоне SQL:

INSERT INTO
    currency_exchange_rates
VALUES ('{{ params.base_currency }}', '{{ params.currency }}', {{ ti.xcom_pull(task_ids=params.get_rate_task_id) }}, '{{ execution_date.strftime("%Y-%m-%d") }}')
ON CONFLICT (base, currency, date) DO
    UPDATE
        SET rate = excluded.rate;

Полный код можно найти в репозитории.

Комментарии

непонятно что делают эти строчки? create_table.set_downstream(tasks) Что это за метод set_downstream? Если tasks.append(get_rate_task) добавляет в цикле новый таск get_rate_task, то почему нет строчки tasks.append(insert_rate), ведь нам также в цикле нужно создать таски insert_rate?

Илья, в строчке get_rate_task >> insert_rate мы определяем направление зависимости между get_rate_task и insert_rate. Что означает, что таск insert_rate должен запускаться после успешного выполнения get_rate_task. То есть для insert_rate upstream-таском будет get_rate_task. А для get_rate_task таск insert_rate тогда является downstream. Т.к. я определил зависимость между ними, мне достаточно в список добавить только один из них (get_rate_task). По поводу .set_downstream. В метод можно передавать как 1 таск так и список из тасков. Означает он что зависимостью для всех тасков из списка tasks будет являеться один единственный таск create_table.