Шаг 1 из 2

Теперь у нас есть всё, чтобы сделать полноценный DAG для ежедневной загрузки курсов валют в базу. Убедитесь, что вы прописали соединение (postgres_default) в Admin → Connections и ваша база данных доступна.

Код DAGа:

from datetime import datetime

from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator

from .operator import CurrencyScoopOperator

with DAG(
        dag_id='exchange_rate_usd_kzt_dag',
        start_date=datetime(2021, 3, 1),
        schedule_interval='@daily',
) as dag:

    create_table = PostgresOperator(
        task_id='create_table_task',
        sql='sql/create_table.sql',
        postgres_conn_id='postgres_default',
    )

    get_rate = CurrencyScoopOperator(
        task_id='get_rate',
        base_currency='USD',
        currency='KZT',
        conn_id='cur_scoop_conn_id',
        dag=dag,
        do_xcom_push=True,
    )

    insert_rate = PostgresOperator(
        task_id='insert_rate',
        postgres_conn_id='postgres_default',
        sql='sql/insert_rate.sql',
        params={
            'base_currency': 'USD',
            'currency': 'KZT',
        }
    )

    create_table >> get_rate >> insert_rate

Ну не красота ли? А теперь давайте взглянем на это в действии.

Комментарии

У меня ошибка с этим кодом - Broken DAG: [/opt/airflow/dags/exchange_rate_usd_kzt_dag.py] Traceback (most recent call last): File "<frozen importlib._bootstrap>

Broken DAG: [/opt/airflow/dags/exchange_rate_usd_kzt_dag.py] Traceback (most recent call last): File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed File "/opt/airflow/dags/exchange_rate_usd_kzt_dag.py", line 6, in <module> from .operator import CurrencyScoopOperator ImportError: attempted relative import with no known parent package

Михаил, можно попробовать изменить импорт на: from exchange_rates.operator import CurrencyScoopOperator