Skip to content

Commit 7e7f463

Browse files
committed
Fix: Connection config parsing on the Airflow side (#1439)
1 parent f7e0e19 commit 7e7f463

3 files changed

Lines changed: 12 additions & 11 deletions

File tree

sqlmesh/core/config/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
RedshiftConnectionConfig,
1313
SnowflakeConnectionConfig,
1414
SparkConnectionConfig,
15+
parse_connection_config,
1516
)
1617
from sqlmesh.core.config.gateway import GatewayConfig
1718
from sqlmesh.core.config.loader import load_config_from_paths, load_config_from_yaml

sqlmesh/core/config/connection.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -807,12 +807,7 @@ def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:
807807
}
808808

809809

810-
def _connection_config_validator(
811-
cls: t.Type, v: ConnectionConfig | t.Dict[str, t.Any] | None
812-
) -> ConnectionConfig | None:
813-
if v is None or isinstance(v, ConnectionConfig):
814-
return v
815-
810+
def parse_connection_config(v: t.Dict[str, t.Any]) -> ConnectionConfig:
816811
if "type" not in v:
817812
raise ConfigError("Missing connection type.")
818813

@@ -823,6 +818,14 @@ def _connection_config_validator(
823818
return CONNECTION_CONFIG_TO_TYPE[connection_type](**v)
824819

825820

821+
def _connection_config_validator(
822+
cls: t.Type, v: ConnectionConfig | t.Dict[str, t.Any] | None
823+
) -> ConnectionConfig | None:
824+
if v is None or isinstance(v, ConnectionConfig):
825+
return v
826+
return parse_connection_config(v)
827+
828+
826829
connection_config_validator = field_validator(
827830
"connection",
828831
"state_connection",

sqlmesh/schedulers/airflow/util.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import typing as t
77
from datetime import timedelta
88

9-
import pydantic
109
from airflow import settings
1110
from airflow.api.common.experimental.delete_dag import delete_dag
1211
from airflow.exceptions import AirflowException, DagNotFound
@@ -17,7 +16,7 @@
1716
from sqlalchemy.orm import Session
1817

1918
from sqlmesh.core import constants as c
20-
from sqlmesh.core.config import ConnectionConfig
19+
from sqlmesh.core.config import parse_connection_config
2120
from sqlmesh.core.engine_adapter import create_engine_adapter
2221
from sqlmesh.core.state_sync import EngineAdapterStateSync, StateSync
2322
from sqlmesh.schedulers.airflow import common
@@ -53,9 +52,7 @@ def scoped_state_sync() -> t.Iterator[StateSync]:
5352

5453
logger.info("Using connection '%s' for state sync", connection.conn_id)
5554

56-
connection_config: ConnectionConfig = pydantic.parse_obj_as(
57-
ConnectionConfig, connection_config_dict # type: ignore
58-
)
55+
connection_config = parse_connection_config(connection_config_dict)
5956
engine_adapter = connection_config.create_engine_adapter()
6057
except AirflowException:
6158
logger.info("Using the Airflow database connection for state sync")

0 commit comments

Comments
 (0)