diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 9e3a210e5e..584db35a99 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -561,6 +561,20 @@ class DuckDBConnectionConfig(BaseDuckDBConnectionConfig): DISPLAY_NAME: t.ClassVar[t.Literal["DuckDB"]] = "DuckDB" DISPLAY_ORDER: t.ClassVar[t.Literal[1]] = 1 + @property + def _extra_engine_config(self) -> t.Dict[str, t.Any]: + config: t.Dict[str, t.Any] = {} + if self.secrets: + if isinstance(self.secrets, list): + secrets_items = [(secret_dict, "") for secret_dict in self.secrets] + else: + secrets_items = [ + (secret_dict, secret_name) + for secret_name, secret_dict in self.secrets.items() + ] + config["s3_secrets_config"] = secrets_items + return config + class SnowflakeConnectionConfig(ConnectionConfig): """Configuration for the Snowflake connection. diff --git a/sqlmesh/core/engine_adapter/duckdb.py b/sqlmesh/core/engine_adapter/duckdb.py index 3b057219e0..c354e504f5 100644 --- a/sqlmesh/core/engine_adapter/duckdb.py +++ b/sqlmesh/core/engine_adapter/duckdb.py @@ -1,5 +1,8 @@ from __future__ import annotations +import logging +import threading +import time import typing as t from sqlglot import exp from pathlib import Path @@ -23,6 +26,8 @@ from sqlmesh.core._typing import SchemaName, TableName from sqlmesh.core.engine_adapter._typing import DF +logger = logging.getLogger(__name__) + @set_catalog(override_mapping={"_get_data_objects": CatalogSupport.REQUIRES_SET_CATALOG}) class DuckDBEngineAdapter(LogicalMergeMixin, GetCurrentCatalogFromFunctionMixin, RowDiffMixin): @@ -38,6 +43,85 @@ class DuckDBEngineAdapter(LogicalMergeMixin, GetCurrentCatalogFromFunctionMixin, SUPPORTS_CREATE_DROP_CATALOG = True SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA", "TABLE", "VIEW"] + # DuckDB STS AssumeRole tokens expire after 900s (15 min, the AWS minimum). + # DuckDB's `refresh: auto` does not actually refresh credentials mid-session. + # We proactively recreate secrets before expiry to avoid HTTP 400 errors. + # See: https://github.com/duckdb/duckdb-httpfs/pull/165 + _S3_SECRET_REFRESH_INTERVAL_S = 720 # 12 minutes (80% of 900s TTL) + + def __init__(self, *args: t.Any, **kwargs: t.Any): + super().__init__(*args, **kwargs) + # s3_secrets_config arrives via _extra_engine_config from DuckDBConnectionConfig + # and is stored in self._extra_config by the base EngineAdapter.__init__. + # It propagates through with_settings() automatically via _extra_config. + self._s3_secrets_config: t.Optional[t.List[t.Tuple[t.Dict[str, t.Any], str]]] = ( + self._extra_config.get("s3_secrets_config") + ) + self._s3_secret_refresh_lock = threading.Lock() + self._last_secret_refresh_ts = time.monotonic() + + def _refresh_s3_secrets(self) -> None: + """Drop and recreate all S3 secrets to obtain fresh STS credentials. + + DuckDB's httpfs extension snapshots STS credentials at secret creation time + and never re-queries the provider chain. When using AssumeRole with the AWS + minimum session duration (900s / 15 min), credentials expire mid-run for any + plan that takes longer than 15 minutes, causing HTTP 400 errors from S3. + + This method forces fresh AssumeRole calls by dropping existing S3 secrets + and recreating them from the original config. + """ + cursor = self.cursor + + # Drop all existing S3-type secrets by querying their names from DuckDB + try: + cursor.execute( + "SELECT name FROM duckdb_secrets() WHERE type = 's3'" + ) + existing_secrets = [row[0] for row in cursor.fetchall()] + except Exception: + existing_secrets = [] + + for secret_name in existing_secrets: + try: + cursor.execute(f"DROP SECRET IF EXISTS {secret_name}") + except Exception as e: + logger.warning("Failed to drop secret '%s' during refresh: %s", secret_name, e) + + # Recreate from config + for secret_dict, secret_name in self._s3_secrets_config: # type: ignore + secret_settings = [f"{field} '{setting}'" for field, setting in secret_dict.items()] + if secret_settings: + secret_clause = ", ".join(secret_settings) + try: + cursor.execute( + f"CREATE OR REPLACE SECRET {secret_name} ({secret_clause});" + ) + except Exception as e: + logger.error("Failed to recreate secret during refresh: %s", e) + raise + + logger.info("Refreshed DuckDB S3 secrets (STS credential rotation)") + + def _maybe_refresh_s3_secrets(self) -> None: + """Refresh S3 secrets if approaching the STS token expiry threshold.""" + if not self._s3_secrets_config: + return + if time.monotonic() - self._last_secret_refresh_ts < self._S3_SECRET_REFRESH_INTERVAL_S: + return + + with self._s3_secret_refresh_lock: + # Double-check after acquiring lock — another thread may have refreshed + if time.monotonic() - self._last_secret_refresh_ts < self._S3_SECRET_REFRESH_INTERVAL_S: + return + + self._refresh_s3_secrets() + self._last_secret_refresh_ts = time.monotonic() + + def _execute(self, sql: str, track_rows_processed: bool = False, **kwargs: t.Any) -> None: + self._maybe_refresh_s3_secrets() + super()._execute(sql, track_rows_processed, **kwargs) + @property def catalog_support(self) -> CatalogSupport: return CatalogSupport.FULL_SUPPORT