From 325c9aa4aca741c85540218a078c77d939c19862 Mon Sep 17 00:00:00 2001 From: Mike Menzies Date: Sun, 1 Mar 2026 16:11:32 -0600 Subject: [PATCH] fix(duckdb): proactively refresh S3 secrets to prevent STS credential expiry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DuckDB's httpfs extension snapshots STS credentials into a secret at creation time and never re-queries the provider chain, even with `refresh: auto` configured. When using `CREDENTIAL_CHAIN` with `chain: sts` (AssumeRole), DuckDB requests the AWS minimum session duration of 900 seconds (15 minutes). Any SQLMesh plan that takes longer than 15 minutes to complete — such as large incremental backfills — fails with HTTP 400 (Bad Request) from S3 once the token expires. This was confirmed through controlled experiments (binary search narrowed the threshold to exactly ~15 minutes, and `duckdb_secrets()` showed the same `key_id` throughout a run — no refresh ever occurs). The upstream fix (duckdb/duckdb-httpfs#165) adds credential refresh at the httpfs layer but has not been merged yet. This patch adds a timer-based secret refresh mechanism to the DuckDB engine adapter as a workaround: - Before each SQL execution, checks if 12 minutes (80% of the 900s TTL) have elapsed since the last secret creation - If so, queries `duckdb_secrets()` for existing S3 secret names, drops them, and recreates from the original config — forcing a fresh STS AssumeRole call - Uses double-check locking to prevent concurrent refresh when `concurrent_tasks > 1` - Zero overhead for configs without S3 secrets (early return on null check) and minimal overhead otherwise (monotonic clock comparison on hot path) Changes: - sqlmesh/core/engine_adapter/duckdb.py: Add __init__, _execute override, and secret refresh methods to DuckDBEngineAdapter - sqlmesh/core/config/connection.py: Add _extra_engine_config to DuckDBConnectionConfig to pass secrets config to the adapter This workaround can be removed once the upstream duckdb-httpfs fix lands and we upgrade DuckDB. Refs: https://github.com/duckdb/duckdb-httpfs/pull/165 Refs: https://github.com/duckdb/duckdb-aws/issues/26 --- sqlmesh/core/config/connection.py | 14 +++++ sqlmesh/core/engine_adapter/duckdb.py | 84 +++++++++++++++++++++++++++ 2 files changed, 98 insertions(+) diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 9e3a210e5e..584db35a99 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -561,6 +561,20 @@ class DuckDBConnectionConfig(BaseDuckDBConnectionConfig): DISPLAY_NAME: t.ClassVar[t.Literal["DuckDB"]] = "DuckDB" DISPLAY_ORDER: t.ClassVar[t.Literal[1]] = 1 + @property + def _extra_engine_config(self) -> t.Dict[str, t.Any]: + config: t.Dict[str, t.Any] = {} + if self.secrets: + if isinstance(self.secrets, list): + secrets_items = [(secret_dict, "") for secret_dict in self.secrets] + else: + secrets_items = [ + (secret_dict, secret_name) + for secret_name, secret_dict in self.secrets.items() + ] + config["s3_secrets_config"] = secrets_items + return config + class SnowflakeConnectionConfig(ConnectionConfig): """Configuration for the Snowflake connection. diff --git a/sqlmesh/core/engine_adapter/duckdb.py b/sqlmesh/core/engine_adapter/duckdb.py index 3b057219e0..c354e504f5 100644 --- a/sqlmesh/core/engine_adapter/duckdb.py +++ b/sqlmesh/core/engine_adapter/duckdb.py @@ -1,5 +1,8 @@ from __future__ import annotations +import logging +import threading +import time import typing as t from sqlglot import exp from pathlib import Path @@ -23,6 +26,8 @@ from sqlmesh.core._typing import SchemaName, TableName from sqlmesh.core.engine_adapter._typing import DF +logger = logging.getLogger(__name__) + @set_catalog(override_mapping={"_get_data_objects": CatalogSupport.REQUIRES_SET_CATALOG}) class DuckDBEngineAdapter(LogicalMergeMixin, GetCurrentCatalogFromFunctionMixin, RowDiffMixin): @@ -38,6 +43,85 @@ class DuckDBEngineAdapter(LogicalMergeMixin, GetCurrentCatalogFromFunctionMixin, SUPPORTS_CREATE_DROP_CATALOG = True SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA", "TABLE", "VIEW"] + # DuckDB STS AssumeRole tokens expire after 900s (15 min, the AWS minimum). + # DuckDB's `refresh: auto` does not actually refresh credentials mid-session. + # We proactively recreate secrets before expiry to avoid HTTP 400 errors. + # See: https://github.com/duckdb/duckdb-httpfs/pull/165 + _S3_SECRET_REFRESH_INTERVAL_S = 720 # 12 minutes (80% of 900s TTL) + + def __init__(self, *args: t.Any, **kwargs: t.Any): + super().__init__(*args, **kwargs) + # s3_secrets_config arrives via _extra_engine_config from DuckDBConnectionConfig + # and is stored in self._extra_config by the base EngineAdapter.__init__. + # It propagates through with_settings() automatically via _extra_config. + self._s3_secrets_config: t.Optional[t.List[t.Tuple[t.Dict[str, t.Any], str]]] = ( + self._extra_config.get("s3_secrets_config") + ) + self._s3_secret_refresh_lock = threading.Lock() + self._last_secret_refresh_ts = time.monotonic() + + def _refresh_s3_secrets(self) -> None: + """Drop and recreate all S3 secrets to obtain fresh STS credentials. + + DuckDB's httpfs extension snapshots STS credentials at secret creation time + and never re-queries the provider chain. When using AssumeRole with the AWS + minimum session duration (900s / 15 min), credentials expire mid-run for any + plan that takes longer than 15 minutes, causing HTTP 400 errors from S3. + + This method forces fresh AssumeRole calls by dropping existing S3 secrets + and recreating them from the original config. + """ + cursor = self.cursor + + # Drop all existing S3-type secrets by querying their names from DuckDB + try: + cursor.execute( + "SELECT name FROM duckdb_secrets() WHERE type = 's3'" + ) + existing_secrets = [row[0] for row in cursor.fetchall()] + except Exception: + existing_secrets = [] + + for secret_name in existing_secrets: + try: + cursor.execute(f"DROP SECRET IF EXISTS {secret_name}") + except Exception as e: + logger.warning("Failed to drop secret '%s' during refresh: %s", secret_name, e) + + # Recreate from config + for secret_dict, secret_name in self._s3_secrets_config: # type: ignore + secret_settings = [f"{field} '{setting}'" for field, setting in secret_dict.items()] + if secret_settings: + secret_clause = ", ".join(secret_settings) + try: + cursor.execute( + f"CREATE OR REPLACE SECRET {secret_name} ({secret_clause});" + ) + except Exception as e: + logger.error("Failed to recreate secret during refresh: %s", e) + raise + + logger.info("Refreshed DuckDB S3 secrets (STS credential rotation)") + + def _maybe_refresh_s3_secrets(self) -> None: + """Refresh S3 secrets if approaching the STS token expiry threshold.""" + if not self._s3_secrets_config: + return + if time.monotonic() - self._last_secret_refresh_ts < self._S3_SECRET_REFRESH_INTERVAL_S: + return + + with self._s3_secret_refresh_lock: + # Double-check after acquiring lock — another thread may have refreshed + if time.monotonic() - self._last_secret_refresh_ts < self._S3_SECRET_REFRESH_INTERVAL_S: + return + + self._refresh_s3_secrets() + self._last_secret_refresh_ts = time.monotonic() + + def _execute(self, sql: str, track_rows_processed: bool = False, **kwargs: t.Any) -> None: + self._maybe_refresh_s3_secrets() + super()._execute(sql, track_rows_processed, **kwargs) + @property def catalog_support(self) -> CatalogSupport: return CatalogSupport.FULL_SUPPORT