Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions sqlmesh/core/config/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,20 @@ class DuckDBConnectionConfig(BaseDuckDBConnectionConfig):
DISPLAY_NAME: t.ClassVar[t.Literal["DuckDB"]] = "DuckDB"
DISPLAY_ORDER: t.ClassVar[t.Literal[1]] = 1

@property
def _extra_engine_config(self) -> t.Dict[str, t.Any]:
config: t.Dict[str, t.Any] = {}
if self.secrets:
if isinstance(self.secrets, list):
secrets_items = [(secret_dict, "") for secret_dict in self.secrets]
else:
secrets_items = [
(secret_dict, secret_name)
for secret_name, secret_dict in self.secrets.items()
]
config["s3_secrets_config"] = secrets_items
return config


class SnowflakeConnectionConfig(ConnectionConfig):
"""Configuration for the Snowflake connection.
Expand Down
84 changes: 84 additions & 0 deletions sqlmesh/core/engine_adapter/duckdb.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from __future__ import annotations

import logging
import threading
import time
import typing as t
from sqlglot import exp
from pathlib import Path
Expand All @@ -23,6 +26,8 @@
from sqlmesh.core._typing import SchemaName, TableName
from sqlmesh.core.engine_adapter._typing import DF

logger = logging.getLogger(__name__)


@set_catalog(override_mapping={"_get_data_objects": CatalogSupport.REQUIRES_SET_CATALOG})
class DuckDBEngineAdapter(LogicalMergeMixin, GetCurrentCatalogFromFunctionMixin, RowDiffMixin):
Expand All @@ -38,6 +43,85 @@ class DuckDBEngineAdapter(LogicalMergeMixin, GetCurrentCatalogFromFunctionMixin,
SUPPORTS_CREATE_DROP_CATALOG = True
SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA", "TABLE", "VIEW"]

# DuckDB STS AssumeRole tokens expire after 900s (15 min, the AWS minimum).
# DuckDB's `refresh: auto` does not actually refresh credentials mid-session.
# We proactively recreate secrets before expiry to avoid HTTP 400 errors.
# See: https://github.com/duckdb/duckdb-httpfs/pull/165
_S3_SECRET_REFRESH_INTERVAL_S = 720 # 12 minutes (80% of 900s TTL)

def __init__(self, *args: t.Any, **kwargs: t.Any):
super().__init__(*args, **kwargs)
# s3_secrets_config arrives via _extra_engine_config from DuckDBConnectionConfig
# and is stored in self._extra_config by the base EngineAdapter.__init__.
# It propagates through with_settings() automatically via _extra_config.
self._s3_secrets_config: t.Optional[t.List[t.Tuple[t.Dict[str, t.Any], str]]] = (
self._extra_config.get("s3_secrets_config")
)
self._s3_secret_refresh_lock = threading.Lock()
self._last_secret_refresh_ts = time.monotonic()

def _refresh_s3_secrets(self) -> None:
"""Drop and recreate all S3 secrets to obtain fresh STS credentials.

DuckDB's httpfs extension snapshots STS credentials at secret creation time
and never re-queries the provider chain. When using AssumeRole with the AWS
minimum session duration (900s / 15 min), credentials expire mid-run for any
plan that takes longer than 15 minutes, causing HTTP 400 errors from S3.

This method forces fresh AssumeRole calls by dropping existing S3 secrets
and recreating them from the original config.
"""
cursor = self.cursor

# Drop all existing S3-type secrets by querying their names from DuckDB
try:
cursor.execute(
"SELECT name FROM duckdb_secrets() WHERE type = 's3'"
)
existing_secrets = [row[0] for row in cursor.fetchall()]
except Exception:
existing_secrets = []

for secret_name in existing_secrets:
try:
cursor.execute(f"DROP SECRET IF EXISTS {secret_name}")
except Exception as e:
logger.warning("Failed to drop secret '%s' during refresh: %s", secret_name, e)

# Recreate from config
for secret_dict, secret_name in self._s3_secrets_config: # type: ignore
secret_settings = [f"{field} '{setting}'" for field, setting in secret_dict.items()]
if secret_settings:
secret_clause = ", ".join(secret_settings)
try:
cursor.execute(
f"CREATE OR REPLACE SECRET {secret_name} ({secret_clause});"
)
except Exception as e:
logger.error("Failed to recreate secret during refresh: %s", e)
raise

logger.info("Refreshed DuckDB S3 secrets (STS credential rotation)")

def _maybe_refresh_s3_secrets(self) -> None:
"""Refresh S3 secrets if approaching the STS token expiry threshold."""
if not self._s3_secrets_config:
return
if time.monotonic() - self._last_secret_refresh_ts < self._S3_SECRET_REFRESH_INTERVAL_S:
return

with self._s3_secret_refresh_lock:
# Double-check after acquiring lock — another thread may have refreshed
if time.monotonic() - self._last_secret_refresh_ts < self._S3_SECRET_REFRESH_INTERVAL_S:
return

self._refresh_s3_secrets()
self._last_secret_refresh_ts = time.monotonic()

def _execute(self, sql: str, track_rows_processed: bool = False, **kwargs: t.Any) -> None:
self._maybe_refresh_s3_secrets()
super()._execute(sql, track_rows_processed, **kwargs)

@property
def catalog_support(self) -> CatalogSupport:
return CatalogSupport.FULL_SUPPORT
Expand Down