Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions controlplane/session_search_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,37 @@ const physicalIcebergCatalog = iceberg.CatalogName

// effectiveSessionDefaultCommand returns the connect-time command for a
// non-passthrough session, given the resolved real catalog the session defaults
// to (effectiveCatalog, one of "ducklake"/"iceberg"). A client-supplied
// search_path always wins. For DuckLake the catalog switch is owned by
// InitSessionDatabaseMetadata's defer (which also restores memory.main on the
// search_path so the pg_catalog compat macros stay resolvable), so this returns
// "" — re-issuing `USE ducklake` here would clobber that search_path.
// to (effectiveCatalog, one of "ducklake"/"iceberg").
//
// For DuckLake the catalog switch is owned by InitSessionDatabaseMetadata's
// defer (which also restores memory.main on the search_path so the pg_catalog
// compat macros stay resolvable), so a DuckLake session only needs a command
// when the client supplied its own search_path.
//
// For Iceberg there is no such defer, so the `USE iceberg.<schema>` catalog
// switch MUST be issued here — even when the client also supplied a search_path.
// Otherwise the session stays in the ephemeral `memory` catalog while
// current_database() reports 'iceberg', and unqualified DDL/DML silently misses
// the warehouse. The Iceberg USE is load-bearing, so when it is combined with a
// client search_path the command fails closed (sessionDefaultSourceConfiguredCatalog)
// rather than treating the whole thing as a best-effort search_path.
func effectiveSessionDefaultCommand(clientSearchPath, effectiveCatalog string) (string, sessionSearchPathSource) {
icebergUse := ""
if effectiveCatalog == iceberg.CatalogName {
icebergUse = fmt.Sprintf("USE %s.%s", iceberg.CatalogName, iceberg.DefaultSchema)
}

switch {
case clientSearchPath != "":
return fmt.Sprintf("SET search_path = '%s'", ensureMemoryMainInSearchPath(clientSearchPath)), sessionSearchPathSourceClient
case effectiveCatalog == iceberg.CatalogName:
return fmt.Sprintf("USE %s.%s", iceberg.CatalogName, iceberg.DefaultSchema), sessionDefaultSourceConfiguredCatalog
searchPath := fmt.Sprintf("SET search_path = '%s'", ensureMemoryMainInSearchPath(clientSearchPath))
if icebergUse != "" {
// Switch into Iceberg first, then apply the client search_path (the
// USE resets it). The catalog switch is fail-closed.
return icebergUse + "; " + searchPath, sessionDefaultSourceConfiguredCatalog
}
return searchPath, sessionSearchPathSourceClient
case icebergUse != "":
return icebergUse, sessionDefaultSourceConfiguredCatalog
default:
return "", ""
}
Expand Down
27 changes: 21 additions & 6 deletions controlplane/session_search_path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,25 @@ package controlplane

import "testing"

func TestEffectiveSessionDefaultCommandUsesClientSearchPathBeforeCatalog(t *testing.T) {
got, source := effectiveSessionDefaultCommand("ducklake.main", "iceberg")
if got != "SET search_path = 'ducklake.main,memory.main'" {
t.Fatalf("command = %q, want SET search_path = 'ducklake.main,memory.main'", got)
func TestEffectiveSessionDefaultCommandIcebergSwitchSurvivesClientSearchPath(t *testing.T) {
// An Iceberg session with a client-supplied search_path must STILL switch
// into the Iceberg catalog (there is no InitSessionDatabaseMetadata defer to
// do it). The catalog switch precedes the search_path and is fail-closed.
got, source := effectiveSessionDefaultCommand("public", "iceberg")
if got != "USE iceberg.public; SET search_path = 'public,memory.main'" {
t.Fatalf("command = %q, want USE iceberg.public; SET search_path = 'public,memory.main'", got)
}
if source != sessionDefaultSourceConfiguredCatalog {
t.Fatalf("source = %q, want %q", source, sessionDefaultSourceConfiguredCatalog)
}
}

func TestEffectiveSessionDefaultCommandDuckLakeClientSearchPathOnly(t *testing.T) {
// DuckLake's catalog switch is owned by InitSessionDatabaseMetadata's defer,
// so a client search_path is applied alone and best-effort.
got, source := effectiveSessionDefaultCommand("analytics", "ducklake")
if got != "SET search_path = 'analytics,memory.main'" {
t.Fatalf("command = %q, want SET search_path = 'analytics,memory.main'", got)
}
if source != sessionSearchPathSourceClient {
t.Fatalf("source = %q, want %q", source, sessionSearchPathSourceClient)
Expand Down Expand Up @@ -36,9 +51,9 @@ func TestEffectiveSessionDefaultCommandEmptyForDuckLake(t *testing.T) {

func TestPassthroughSessionDefaultCatalogCommand(t *testing.T) {
tests := []struct {
name string
name string
effectiveCatalog string
want string
want string
}{
{name: "ducklake selected", effectiveCatalog: "ducklake", want: "USE ducklake"},
{name: "iceberg selected", effectiveCatalog: "iceberg", want: "USE iceberg.public"},
Expand Down
33 changes: 31 additions & 2 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,22 @@ func (c *clientConn) serve() error {
c.sendError("FATAL", "XX000", fmt.Sprintf("failed to initialize session database metadata: %v", err))
return err
}
// InitSessionDatabaseMetadata's defer only restores the catalog for
// DuckLake sessions; it otherwise leaves the session in `memory`. For an
// Iceberg session we must issue the USE ourselves, or current_database()
// reports 'iceberg' while unqualified DDL/DML silently lands in the
// ephemeral in-memory catalog. Mirror server.setIcebergDefault and the
// control plane's effectiveSessionDefaultCommand: target
// iceberg.<DefaultSchema> (not a bare `USE iceberg`) because DuckDB
// shadows `main` on a REST catalog.
if catalog == iceberg.CatalogName {
useStmt := fmt.Sprintf("USE %s.%s", iceberg.CatalogName, iceberg.DefaultSchema)
if _, err := c.executor.ExecContext(initCtx, useStmt); err != nil {
initCancel()
c.sendError("FATAL", "XX000", fmt.Sprintf("failed to set iceberg as default catalog: %v", err))
return err
}
}
initCancel()
// Keep c.database aligned with the real catalog so observability surfaces
// agree with current_database(); record the physical catalog so the
Expand Down Expand Up @@ -1743,8 +1759,11 @@ func (c *clientConn) rewriteDirectQuery(query string) string {
}

func (c *clientConn) loadIcebergColumnMetadata(ctx context.Context, query string) error {
if c == nil {
return nil
}
cfg := c.effectiveIcebergConfig()
if c == nil || !shouldLoadIcebergColumnMetadata(cfg, c.passthrough) {
if !shouldLoadIcebergColumnMetadata(cfg, c.physicalCatalog, c.passthrough) {
return nil
}
return icebergmeta.LoadColumns(ctx, c.executor, query, icebergmeta.Config{
Expand All @@ -1764,8 +1783,18 @@ func (c *clientConn) effectiveIcebergConfig() IcebergConfig {
return IcebergConfig{}
}

func shouldLoadIcebergColumnMetadata(cfg IcebergConfig, passthrough bool) bool {
// shouldLoadIcebergColumnMetadata reports whether a query should trigger the
// on-demand Iceberg column-metadata load. physicalCatalog is the session's
// resolved DuckDB catalog (set during session init, and via
// SetConnectionPhysicalCatalog on the control plane). The metadata load issues
// Lakekeeper REST calls and DELETE/INSERT against the shared
// memory.main.__duckgres_iceberg_column_metadata table, so it must only run for
// Iceberg sessions — otherwise a DuckLake session on a dual-catalog worker would
// trigger cross-catalog REST I/O and churn shared state it can never read (the
// compat views gate reads on current_database()='iceberg').
func shouldLoadIcebergColumnMetadata(cfg IcebergConfig, physicalCatalog string, passthrough bool) bool {
return !passthrough &&
physicalCatalog == iceberg.CatalogName &&
cfg.Enabled &&
cfg.ResolvedBackend() == iceberg.BackendLakekeeper &&
cfg.LakekeeperOAuth2ServerURI == ""
Expand Down
21 changes: 16 additions & 5 deletions server/iceberg_column_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,30 @@ import (
)

func TestShouldLoadIcebergColumnMetadataOnlyForLakekeeper(t *testing.T) {
if !shouldLoadIcebergColumnMetadata(IcebergConfig{
lakekeeperCfg := IcebergConfig{
Enabled: true,
Backend: iceberg.BackendLakekeeper,
LakekeeperEndpoint: "http://lakekeeper.invalid/catalog",
LakekeeperWarehouse: "org-acme",
}, false) {
}
if !shouldLoadIcebergColumnMetadata(lakekeeperCfg, iceberg.CatalogName, false) {
t.Fatal("expected Lakekeeper catalog to load Iceberg column metadata")
}
if shouldLoadIcebergColumnMetadata(IcebergConfig{
Enabled: true,
Backend: iceberg.BackendLakekeeper,
}, true) {
}, iceberg.CatalogName, true) {
t.Fatal("passthrough connections should not load Iceberg column metadata")
}
// A DuckLake session on a dual-catalog worker must not trigger the Iceberg
// metadata load (cross-catalog REST I/O + shared-table churn it can't read).
if shouldLoadIcebergColumnMetadata(lakekeeperCfg, physicalDuckLakeCatalog, false) {
t.Fatal("DuckLake sessions should not load Iceberg column metadata")
}
// A session whose catalog has not yet resolved must not trigger the load.
if shouldLoadIcebergColumnMetadata(lakekeeperCfg, "", false) {
t.Fatal("sessions without a resolved Iceberg catalog should not load metadata")
}
}

func TestLoadIcebergColumnMetadataUsesConnectionIcebergConfig(t *testing.T) {
Expand All @@ -42,7 +52,7 @@ func TestLoadIcebergColumnMetadataUsesConnectionIcebergConfig(t *testing.T) {
})

cfg := cc.effectiveIcebergConfig()
if !shouldLoadIcebergColumnMetadata(cfg, false) {
if !shouldLoadIcebergColumnMetadata(cfg, iceberg.CatalogName, false) {
t.Fatalf("expected tenant Iceberg config to enable metadata loading")
}
if cfg.LakekeeperEndpoint != "http://lakekeeper.example/catalog" {
Expand Down Expand Up @@ -84,7 +94,8 @@ func TestQueryWithArgsWithMetadataLoadsIcebergColumns(t *testing.T) {

exec := &metadataTrackingExecutor{}
cc := &clientConn{
executor: exec,
executor: exec,
physicalCatalog: iceberg.CatalogName,
tenantIcebergConfig: IcebergConfig{
Enabled: true,
Backend: iceberg.BackendLakekeeper,
Expand Down
22 changes: 19 additions & 3 deletions server/icebergmeta/icebergmeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ type sourceColumn struct {
}

func ShouldLoadColumns(query string) bool {
return strings.Contains(strings.ToLower(query), "information_schema_columns_compat")
lower := strings.ToLower(query)
return strings.Contains(lower, "information_schema_columns_compat") ||
strings.Contains(lower, "pg_attribute")
}

func LoadColumns(ctx context.Context, executor sqlcore.QueryExecutor, query string, configs ...Config) error {
Expand Down Expand Up @@ -79,9 +81,17 @@ func firstConfig(configs []Config) Config {

func extractTableFilter(query string) tableFilter {
lower := strings.ToLower(query)
schema := extractSingleQuotedPredicate(query, lower, "table_schema")
if schema == "" {
schema = extractSingleQuotedPredicate(query, lower, "nspname")
}
name := extractSingleQuotedPredicate(query, lower, "table_name")
if name == "" {
name = extractSingleQuotedPredicate(query, lower, "relname")
}
return tableFilter{
Schema: extractSingleQuotedPredicate(query, lower, "table_schema"),
Name: extractSingleQuotedPredicate(query, lower, "table_name"),
Schema: schema,
Name: name,
}
}

Expand All @@ -91,6 +101,12 @@ func extractSingleQuotedPredicate(query, lowerQuery, column string) string {
column + "='",
"c." + column + " = '",
"c." + column + "='",
"n." + column + " = '",
"n." + column + "='",
"pg_namespace." + column + " = '",
"pg_namespace." + column + "='",
"pg_class_full." + column + " = '",
"pg_class_full." + column + "='",
}
for _, marker := range markers {
idx := strings.Index(lowerQuery, marker)
Expand Down
79 changes: 79 additions & 0 deletions server/icebergmeta/icebergmeta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,22 @@ func TestShouldLoadColumnsOnlyForCompatView(t *testing.T) {
}
}

func TestShouldLoadColumnsForPgAttributeDiscovery(t *testing.T) {
query := `
SELECT *
FROM memory.main.pg_namespace n
JOIN memory.main.pg_class_full c ON c.relnamespace = n.oid
JOIN memory.main.pg_attribute a ON a.attrelid = c.oid
WHERE c.relkind IN ('r', 'p', 'v', 'f', 'm')
AND a.attnum > 0
AND NOT a.attisdropped
AND current_database() = 'iceberg'
`
if !ShouldLoadColumns(query) {
t.Fatal("expected pg_attribute metadata discovery query to require Iceberg column loading")
}
}

func TestLoadColumnsRequiresLakekeeperRESTConfig(t *testing.T) {
exec := &scriptedExecutor{
rows: []sqlcore.RowSet{
Expand Down Expand Up @@ -205,6 +221,69 @@ func TestLoadColumnsFiltersCandidatesFromSimpleCompatPredicates(t *testing.T) {
}
}

func TestLoadColumnsFiltersCandidatesFromPgCatalogPredicates(t *testing.T) {
var loadedPaths []string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/v1/config":
_, _ = w.Write([]byte(`{"defaults":{"prefix":"warehouse-id"}}`))
case "/v1/warehouse-id/namespaces/stripe_test/tables/product":
loadedPaths = append(loadedPaths, r.URL.Path)
_, _ = w.Write([]byte(`{
"metadata": {
"current-schema-id": 1,
"schemas": [
{
"schema-id": 1,
"type": "struct",
"fields": [
{"id": 1, "name": "id", "type": "string", "required": true}
]
}
]
}
}`))
default:
t.Fatalf("unexpected REST path: %s", r.URL.Path)
}
}))
defer srv.Close()

exec := &scriptedExecutor{
rows: []sqlcore.RowSet{
&rowSet{cols: []string{"table_schema", "table_name"}, rows: [][]any{
{"stripe_test", "product"},
}},
},
}

err := LoadColumns(context.Background(), exec, `
SELECT *
FROM memory.main.pg_namespace n
JOIN memory.main.pg_class_full c ON c.relnamespace = n.oid
JOIN memory.main.pg_attribute a ON a.attrelid = c.oid
WHERE n.nspname = 'stripe_test'
AND c.relname = 'product'
`, Config{
LakekeeperEndpoint: srv.URL,
LakekeeperWarehouse: "org-acme",
})
if err != nil {
t.Fatalf("LoadColumns: %v", err)
}

listQuery := strings.Join(exec.queries, "\n")
if !strings.Contains(listQuery, "table_schema = 'stripe_test'") {
t.Fatalf("candidate query did not filter table_schema:\n%s", listQuery)
}
if !strings.Contains(listQuery, "table_name = 'product'") {
t.Fatalf("candidate query did not filter table_name:\n%s", listQuery)
}
if got, want := len(loadedPaths), 1; got != want {
t.Fatalf("REST table loads = %d, want %d", got, want)
}
}

func TestLoadColumnsReturnsLakekeeperRESTUnavailableError(t *testing.T) {
srv := httptest.NewServer(http.NotFoundHandler())
defer srv.Close()
Expand Down
15 changes: 4 additions & 11 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1224,17 +1224,10 @@ func ActivateDBConnection(db *sql.DB, cfg Config, duckLakeSem chan struct{}, use
return fmt.Errorf("iceberg catalog configured but attachment failed: %w", err)
}

// The pg_class/pg_namespace recreation and the duckLakeMode information_schema
// reflect the DuckLake catalog; skip the DuckLake-specific rewrites when
// there's no DuckLake attached.
if !icebergOnly {
if err := recreatePgClassForDuckLake(db); err != nil {
slog.Warn("Failed to recreate pg_class_full for DuckLake during activation.", "user", username, "error", err)
}
if err := recreatePgNamespaceForDuckLake(db); err != nil {
slog.Warn("Failed to recreate pg_namespace for DuckLake during activation.", "user", username, "error", err)
}
}
// Catalog-scoped pg_class/pg_namespace/pg_attribute views are installed by
// sessionmeta.InitSessionDatabaseMetadata for each Postgres session. Do not
// install DuckLake-only global pg_catalog views here; dual-catalog workers
// must not leak DuckLake metadata into Iceberg sessions.
if err := initInformationSchema(db, !icebergOnly); err != nil {
slog.Warn("Failed to initialize information_schema during activation.", "user", username, "error", err)
}
Expand Down
Loading
Loading