diff --git a/controlplane/session_search_path.go b/controlplane/session_search_path.go index e0aa2c39..dd05d698 100644 --- a/controlplane/session_search_path.go +++ b/controlplane/session_search_path.go @@ -26,17 +26,37 @@ const physicalIcebergCatalog = iceberg.CatalogName // effectiveSessionDefaultCommand returns the connect-time command for a // non-passthrough session, given the resolved real catalog the session defaults -// to (effectiveCatalog, one of "ducklake"/"iceberg"). A client-supplied -// search_path always wins. For DuckLake the catalog switch is owned by -// InitSessionDatabaseMetadata's defer (which also restores memory.main on the -// search_path so the pg_catalog compat macros stay resolvable), so this returns -// "" — re-issuing `USE ducklake` here would clobber that search_path. +// to (effectiveCatalog, one of "ducklake"/"iceberg"). +// +// For DuckLake the catalog switch is owned by InitSessionDatabaseMetadata's +// defer (which also restores memory.main on the search_path so the pg_catalog +// compat macros stay resolvable), so a DuckLake session only needs a command +// when the client supplied its own search_path. +// +// For Iceberg there is no such defer, so the `USE iceberg.` catalog +// switch MUST be issued here — even when the client also supplied a search_path. +// Otherwise the session stays in the ephemeral `memory` catalog while +// current_database() reports 'iceberg', and unqualified DDL/DML silently misses +// the warehouse. The Iceberg USE is load-bearing, so when it is combined with a +// client search_path the command fails closed (sessionDefaultSourceConfiguredCatalog) +// rather than treating the whole thing as a best-effort search_path. func effectiveSessionDefaultCommand(clientSearchPath, effectiveCatalog string) (string, sessionSearchPathSource) { + icebergUse := "" + if effectiveCatalog == iceberg.CatalogName { + icebergUse = fmt.Sprintf("USE %s.%s", iceberg.CatalogName, iceberg.DefaultSchema) + } + switch { case clientSearchPath != "": - return fmt.Sprintf("SET search_path = '%s'", ensureMemoryMainInSearchPath(clientSearchPath)), sessionSearchPathSourceClient - case effectiveCatalog == iceberg.CatalogName: - return fmt.Sprintf("USE %s.%s", iceberg.CatalogName, iceberg.DefaultSchema), sessionDefaultSourceConfiguredCatalog + searchPath := fmt.Sprintf("SET search_path = '%s'", ensureMemoryMainInSearchPath(clientSearchPath)) + if icebergUse != "" { + // Switch into Iceberg first, then apply the client search_path (the + // USE resets it). The catalog switch is fail-closed. + return icebergUse + "; " + searchPath, sessionDefaultSourceConfiguredCatalog + } + return searchPath, sessionSearchPathSourceClient + case icebergUse != "": + return icebergUse, sessionDefaultSourceConfiguredCatalog default: return "", "" } diff --git a/controlplane/session_search_path_test.go b/controlplane/session_search_path_test.go index 26066bfe..f84f2980 100644 --- a/controlplane/session_search_path_test.go +++ b/controlplane/session_search_path_test.go @@ -2,10 +2,25 @@ package controlplane import "testing" -func TestEffectiveSessionDefaultCommandUsesClientSearchPathBeforeCatalog(t *testing.T) { - got, source := effectiveSessionDefaultCommand("ducklake.main", "iceberg") - if got != "SET search_path = 'ducklake.main,memory.main'" { - t.Fatalf("command = %q, want SET search_path = 'ducklake.main,memory.main'", got) +func TestEffectiveSessionDefaultCommandIcebergSwitchSurvivesClientSearchPath(t *testing.T) { + // An Iceberg session with a client-supplied search_path must STILL switch + // into the Iceberg catalog (there is no InitSessionDatabaseMetadata defer to + // do it). The catalog switch precedes the search_path and is fail-closed. + got, source := effectiveSessionDefaultCommand("public", "iceberg") + if got != "USE iceberg.public; SET search_path = 'public,memory.main'" { + t.Fatalf("command = %q, want USE iceberg.public; SET search_path = 'public,memory.main'", got) + } + if source != sessionDefaultSourceConfiguredCatalog { + t.Fatalf("source = %q, want %q", source, sessionDefaultSourceConfiguredCatalog) + } +} + +func TestEffectiveSessionDefaultCommandDuckLakeClientSearchPathOnly(t *testing.T) { + // DuckLake's catalog switch is owned by InitSessionDatabaseMetadata's defer, + // so a client search_path is applied alone and best-effort. + got, source := effectiveSessionDefaultCommand("analytics", "ducklake") + if got != "SET search_path = 'analytics,memory.main'" { + t.Fatalf("command = %q, want SET search_path = 'analytics,memory.main'", got) } if source != sessionSearchPathSourceClient { t.Fatalf("source = %q, want %q", source, sessionSearchPathSourceClient) @@ -36,9 +51,9 @@ func TestEffectiveSessionDefaultCommandEmptyForDuckLake(t *testing.T) { func TestPassthroughSessionDefaultCatalogCommand(t *testing.T) { tests := []struct { - name string + name string effectiveCatalog string - want string + want string }{ {name: "ducklake selected", effectiveCatalog: "ducklake", want: "USE ducklake"}, {name: "iceberg selected", effectiveCatalog: "iceberg", want: "USE iceberg.public"}, diff --git a/server/conn.go b/server/conn.go index c94deb8d..6d06a419 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1091,6 +1091,22 @@ func (c *clientConn) serve() error { c.sendError("FATAL", "XX000", fmt.Sprintf("failed to initialize session database metadata: %v", err)) return err } + // InitSessionDatabaseMetadata's defer only restores the catalog for + // DuckLake sessions; it otherwise leaves the session in `memory`. For an + // Iceberg session we must issue the USE ourselves, or current_database() + // reports 'iceberg' while unqualified DDL/DML silently lands in the + // ephemeral in-memory catalog. Mirror server.setIcebergDefault and the + // control plane's effectiveSessionDefaultCommand: target + // iceberg. (not a bare `USE iceberg`) because DuckDB + // shadows `main` on a REST catalog. + if catalog == iceberg.CatalogName { + useStmt := fmt.Sprintf("USE %s.%s", iceberg.CatalogName, iceberg.DefaultSchema) + if _, err := c.executor.ExecContext(initCtx, useStmt); err != nil { + initCancel() + c.sendError("FATAL", "XX000", fmt.Sprintf("failed to set iceberg as default catalog: %v", err)) + return err + } + } initCancel() // Keep c.database aligned with the real catalog so observability surfaces // agree with current_database(); record the physical catalog so the @@ -1743,8 +1759,11 @@ func (c *clientConn) rewriteDirectQuery(query string) string { } func (c *clientConn) loadIcebergColumnMetadata(ctx context.Context, query string) error { + if c == nil { + return nil + } cfg := c.effectiveIcebergConfig() - if c == nil || !shouldLoadIcebergColumnMetadata(cfg, c.passthrough) { + if !shouldLoadIcebergColumnMetadata(cfg, c.physicalCatalog, c.passthrough) { return nil } return icebergmeta.LoadColumns(ctx, c.executor, query, icebergmeta.Config{ @@ -1764,8 +1783,18 @@ func (c *clientConn) effectiveIcebergConfig() IcebergConfig { return IcebergConfig{} } -func shouldLoadIcebergColumnMetadata(cfg IcebergConfig, passthrough bool) bool { +// shouldLoadIcebergColumnMetadata reports whether a query should trigger the +// on-demand Iceberg column-metadata load. physicalCatalog is the session's +// resolved DuckDB catalog (set during session init, and via +// SetConnectionPhysicalCatalog on the control plane). The metadata load issues +// Lakekeeper REST calls and DELETE/INSERT against the shared +// memory.main.__duckgres_iceberg_column_metadata table, so it must only run for +// Iceberg sessions — otherwise a DuckLake session on a dual-catalog worker would +// trigger cross-catalog REST I/O and churn shared state it can never read (the +// compat views gate reads on current_database()='iceberg'). +func shouldLoadIcebergColumnMetadata(cfg IcebergConfig, physicalCatalog string, passthrough bool) bool { return !passthrough && + physicalCatalog == iceberg.CatalogName && cfg.Enabled && cfg.ResolvedBackend() == iceberg.BackendLakekeeper && cfg.LakekeeperOAuth2ServerURI == "" diff --git a/server/iceberg_column_metadata_test.go b/server/iceberg_column_metadata_test.go index 9ac60a1d..d7d7ed3f 100644 --- a/server/iceberg_column_metadata_test.go +++ b/server/iceberg_column_metadata_test.go @@ -14,20 +14,30 @@ import ( ) func TestShouldLoadIcebergColumnMetadataOnlyForLakekeeper(t *testing.T) { - if !shouldLoadIcebergColumnMetadata(IcebergConfig{ + lakekeeperCfg := IcebergConfig{ Enabled: true, Backend: iceberg.BackendLakekeeper, LakekeeperEndpoint: "http://lakekeeper.invalid/catalog", LakekeeperWarehouse: "org-acme", - }, false) { + } + if !shouldLoadIcebergColumnMetadata(lakekeeperCfg, iceberg.CatalogName, false) { t.Fatal("expected Lakekeeper catalog to load Iceberg column metadata") } if shouldLoadIcebergColumnMetadata(IcebergConfig{ Enabled: true, Backend: iceberg.BackendLakekeeper, - }, true) { + }, iceberg.CatalogName, true) { t.Fatal("passthrough connections should not load Iceberg column metadata") } + // A DuckLake session on a dual-catalog worker must not trigger the Iceberg + // metadata load (cross-catalog REST I/O + shared-table churn it can't read). + if shouldLoadIcebergColumnMetadata(lakekeeperCfg, physicalDuckLakeCatalog, false) { + t.Fatal("DuckLake sessions should not load Iceberg column metadata") + } + // A session whose catalog has not yet resolved must not trigger the load. + if shouldLoadIcebergColumnMetadata(lakekeeperCfg, "", false) { + t.Fatal("sessions without a resolved Iceberg catalog should not load metadata") + } } func TestLoadIcebergColumnMetadataUsesConnectionIcebergConfig(t *testing.T) { @@ -42,7 +52,7 @@ func TestLoadIcebergColumnMetadataUsesConnectionIcebergConfig(t *testing.T) { }) cfg := cc.effectiveIcebergConfig() - if !shouldLoadIcebergColumnMetadata(cfg, false) { + if !shouldLoadIcebergColumnMetadata(cfg, iceberg.CatalogName, false) { t.Fatalf("expected tenant Iceberg config to enable metadata loading") } if cfg.LakekeeperEndpoint != "http://lakekeeper.example/catalog" { @@ -84,7 +94,8 @@ func TestQueryWithArgsWithMetadataLoadsIcebergColumns(t *testing.T) { exec := &metadataTrackingExecutor{} cc := &clientConn{ - executor: exec, + executor: exec, + physicalCatalog: iceberg.CatalogName, tenantIcebergConfig: IcebergConfig{ Enabled: true, Backend: iceberg.BackendLakekeeper, diff --git a/server/icebergmeta/icebergmeta.go b/server/icebergmeta/icebergmeta.go index 351834f2..60e2c2bc 100644 --- a/server/icebergmeta/icebergmeta.go +++ b/server/icebergmeta/icebergmeta.go @@ -36,7 +36,9 @@ type sourceColumn struct { } func ShouldLoadColumns(query string) bool { - return strings.Contains(strings.ToLower(query), "information_schema_columns_compat") + lower := strings.ToLower(query) + return strings.Contains(lower, "information_schema_columns_compat") || + strings.Contains(lower, "pg_attribute") } func LoadColumns(ctx context.Context, executor sqlcore.QueryExecutor, query string, configs ...Config) error { @@ -79,9 +81,17 @@ func firstConfig(configs []Config) Config { func extractTableFilter(query string) tableFilter { lower := strings.ToLower(query) + schema := extractSingleQuotedPredicate(query, lower, "table_schema") + if schema == "" { + schema = extractSingleQuotedPredicate(query, lower, "nspname") + } + name := extractSingleQuotedPredicate(query, lower, "table_name") + if name == "" { + name = extractSingleQuotedPredicate(query, lower, "relname") + } return tableFilter{ - Schema: extractSingleQuotedPredicate(query, lower, "table_schema"), - Name: extractSingleQuotedPredicate(query, lower, "table_name"), + Schema: schema, + Name: name, } } @@ -91,6 +101,12 @@ func extractSingleQuotedPredicate(query, lowerQuery, column string) string { column + "='", "c." + column + " = '", "c." + column + "='", + "n." + column + " = '", + "n." + column + "='", + "pg_namespace." + column + " = '", + "pg_namespace." + column + "='", + "pg_class_full." + column + " = '", + "pg_class_full." + column + "='", } for _, marker := range markers { idx := strings.Index(lowerQuery, marker) diff --git a/server/icebergmeta/icebergmeta_test.go b/server/icebergmeta/icebergmeta_test.go index e715f015..d5f4285d 100644 --- a/server/icebergmeta/icebergmeta_test.go +++ b/server/icebergmeta/icebergmeta_test.go @@ -22,6 +22,22 @@ func TestShouldLoadColumnsOnlyForCompatView(t *testing.T) { } } +func TestShouldLoadColumnsForPgAttributeDiscovery(t *testing.T) { + query := ` + SELECT * + FROM memory.main.pg_namespace n + JOIN memory.main.pg_class_full c ON c.relnamespace = n.oid + JOIN memory.main.pg_attribute a ON a.attrelid = c.oid + WHERE c.relkind IN ('r', 'p', 'v', 'f', 'm') + AND a.attnum > 0 + AND NOT a.attisdropped + AND current_database() = 'iceberg' + ` + if !ShouldLoadColumns(query) { + t.Fatal("expected pg_attribute metadata discovery query to require Iceberg column loading") + } +} + func TestLoadColumnsRequiresLakekeeperRESTConfig(t *testing.T) { exec := &scriptedExecutor{ rows: []sqlcore.RowSet{ @@ -205,6 +221,69 @@ func TestLoadColumnsFiltersCandidatesFromSimpleCompatPredicates(t *testing.T) { } } +func TestLoadColumnsFiltersCandidatesFromPgCatalogPredicates(t *testing.T) { + var loadedPaths []string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/v1/config": + _, _ = w.Write([]byte(`{"defaults":{"prefix":"warehouse-id"}}`)) + case "/v1/warehouse-id/namespaces/stripe_test/tables/product": + loadedPaths = append(loadedPaths, r.URL.Path) + _, _ = w.Write([]byte(`{ + "metadata": { + "current-schema-id": 1, + "schemas": [ + { + "schema-id": 1, + "type": "struct", + "fields": [ + {"id": 1, "name": "id", "type": "string", "required": true} + ] + } + ] + } + }`)) + default: + t.Fatalf("unexpected REST path: %s", r.URL.Path) + } + })) + defer srv.Close() + + exec := &scriptedExecutor{ + rows: []sqlcore.RowSet{ + &rowSet{cols: []string{"table_schema", "table_name"}, rows: [][]any{ + {"stripe_test", "product"}, + }}, + }, + } + + err := LoadColumns(context.Background(), exec, ` + SELECT * + FROM memory.main.pg_namespace n + JOIN memory.main.pg_class_full c ON c.relnamespace = n.oid + JOIN memory.main.pg_attribute a ON a.attrelid = c.oid + WHERE n.nspname = 'stripe_test' + AND c.relname = 'product' + `, Config{ + LakekeeperEndpoint: srv.URL, + LakekeeperWarehouse: "org-acme", + }) + if err != nil { + t.Fatalf("LoadColumns: %v", err) + } + + listQuery := strings.Join(exec.queries, "\n") + if !strings.Contains(listQuery, "table_schema = 'stripe_test'") { + t.Fatalf("candidate query did not filter table_schema:\n%s", listQuery) + } + if !strings.Contains(listQuery, "table_name = 'product'") { + t.Fatalf("candidate query did not filter table_name:\n%s", listQuery) + } + if got, want := len(loadedPaths), 1; got != want { + t.Fatalf("REST table loads = %d, want %d", got, want) + } +} + func TestLoadColumnsReturnsLakekeeperRESTUnavailableError(t *testing.T) { srv := httptest.NewServer(http.NotFoundHandler()) defer srv.Close() diff --git a/server/server.go b/server/server.go index feb63df4..46d2d04a 100644 --- a/server/server.go +++ b/server/server.go @@ -1224,17 +1224,10 @@ func ActivateDBConnection(db *sql.DB, cfg Config, duckLakeSem chan struct{}, use return fmt.Errorf("iceberg catalog configured but attachment failed: %w", err) } - // The pg_class/pg_namespace recreation and the duckLakeMode information_schema - // reflect the DuckLake catalog; skip the DuckLake-specific rewrites when - // there's no DuckLake attached. - if !icebergOnly { - if err := recreatePgClassForDuckLake(db); err != nil { - slog.Warn("Failed to recreate pg_class_full for DuckLake during activation.", "user", username, "error", err) - } - if err := recreatePgNamespaceForDuckLake(db); err != nil { - slog.Warn("Failed to recreate pg_namespace for DuckLake during activation.", "user", username, "error", err) - } - } + // Catalog-scoped pg_class/pg_namespace/pg_attribute views are installed by + // sessionmeta.InitSessionDatabaseMetadata for each Postgres session. Do not + // install DuckLake-only global pg_catalog views here; dual-catalog workers + // must not leak DuckLake metadata into Iceberg sessions. if err := initInformationSchema(db, !icebergOnly); err != nil { slog.Warn("Failed to initialize information_schema during activation.", "user", username, "error", err) } diff --git a/server/session_database_metadata_test.go b/server/session_database_metadata_test.go index e2990d58..24ed2054 100644 --- a/server/session_database_metadata_test.go +++ b/server/session_database_metadata_test.go @@ -4,7 +4,9 @@ import ( "context" "database/sql" "errors" + "strings" "testing" + "time" _ "github.com/duckdb/duckdb-go/v2" "github.com/posthog/duckgres/server/sessionmeta" @@ -359,6 +361,357 @@ func TestInformationSchemaCompatViewsOnlyExposeSelectedCatalog(t *testing.T) { } } +func TestPgCatalogCompatViewsOnlyExposeSelectedCatalog(t *testing.T) { + tests := []struct { + name string + selected string + useAfterInit string + wantTable string + rejectTable string + wantColumnRel string + rejectColumn string + }{ + { + name: "ducklake session sees ducklake only", + selected: "ducklake", + wantTable: "duck_only", + rejectTable: "ice_only", + wantColumnRel: "duck_only", + rejectColumn: "ice_only", + }, + { + name: "iceberg session sees iceberg only", + selected: "iceberg", + useAfterInit: "USE iceberg.public", + wantTable: "ice_only", + rejectTable: "duck_only", + wantColumnRel: "ice_only", + rejectColumn: "duck_only", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + db, err := sql.Open("duckdb", ":memory:") + if err != nil { + t.Fatalf("open duckdb: %v", err) + } + db.SetMaxOpenConns(1) + defer func() { _ = db.Close() }() + + if err := initPgCatalog(db, time.Now(), time.Now(), "test", "test"); err != nil { + t.Fatalf("init pg catalog: %v", err) + } + if _, err := db.Exec(`ATTACH ':memory:' AS ducklake`); err != nil { + t.Fatalf("attach ducklake: %v", err) + } + if _, err := db.Exec(`ATTACH ':memory:' AS iceberg`); err != nil { + t.Fatalf("attach iceberg: %v", err) + } + + if _, err := db.Exec("USE ducklake"); err != nil { + t.Fatalf("use ducklake: %v", err) + } + if _, err := db.Exec("CREATE SCHEMA shared"); err != nil { + t.Fatalf("create ducklake schema: %v", err) + } + if _, err := db.Exec("CREATE TABLE shared.duck_only(id INTEGER, payload VARCHAR)"); err != nil { + t.Fatalf("create ducklake table: %v", err) + } + + if _, err := db.Exec("CREATE SCHEMA iceberg.public"); err != nil { + t.Fatalf("create iceberg public schema: %v", err) + } + if _, err := db.Exec("CREATE SCHEMA iceberg.shared"); err != nil { + t.Fatalf("create iceberg schema: %v", err) + } + if _, err := db.Exec("CREATE TABLE iceberg.shared.ice_only(id INTEGER, payload VARCHAR)"); err != nil { + t.Fatalf("create iceberg table: %v", err) + } + if _, err := db.Exec("USE memory"); err != nil { + t.Fatalf("use memory before pg catalog rewrite: %v", err) + } + if err := recreatePgClassForDuckLake(db); err != nil { + t.Fatalf("recreate pg_class_full for ducklake: %v", err) + } + if err := recreatePgNamespaceForDuckLake(db); err != nil { + t.Fatalf("recreate pg_namespace for ducklake: %v", err) + } + + executor := NewLocalExecutor(db) + if err := sessionmeta.InitSessionDatabaseMetadata(context.Background(), executor, tc.selected); err != nil { + t.Fatalf("init session database metadata: %v", err) + } + if tc.useAfterInit != "" { + if _, err := db.Exec(tc.useAfterInit); err != nil { + t.Fatalf("apply selected catalog: %v", err) + } + } + + var schemaCount int + if err := db.QueryRow(` + SELECT COUNT(*) + FROM memory.main.pg_namespace + WHERE nspname = 'shared' + `).Scan(&schemaCount); err != nil { + t.Fatalf("query pg_namespace: %v", err) + } + if schemaCount != 1 { + t.Fatalf("shared schema count = %d, want 1", schemaCount) + } + + var tables string + if err := db.QueryRow(` + SELECT string_agg(c.relname, ',' ORDER BY c.relname) + FROM memory.main.pg_namespace n + JOIN memory.main.pg_class_full c ON c.relnamespace = n.oid + WHERE n.nspname = 'shared' + AND c.relkind IN ('r', 'v', 'f', 'm') + `).Scan(&tables); err != nil { + t.Fatalf("query pg_class_full: %v", err) + } + if tables != tc.wantTable { + t.Fatalf("tables = %q, want %q and not %q", tables, tc.wantTable, tc.rejectTable) + } + if strings.Contains(tables, tc.rejectTable) { + t.Fatalf("tables leaked %q: %q", tc.rejectTable, tables) + } + + var columnRelations string + if err := db.QueryRow(` + SELECT string_agg(DISTINCT c.relname, ',' ORDER BY c.relname) + FROM memory.main.pg_namespace n + JOIN memory.main.pg_class_full c ON c.relnamespace = n.oid + JOIN memory.main.pg_attribute a ON a.attrelid = c.oid + WHERE n.nspname = 'shared' + AND a.attname = 'id' + AND a.attnum > 0 + AND NOT a.attisdropped + `).Scan(&columnRelations); err != nil { + t.Fatalf("query pg_attribute join: %v", err) + } + if columnRelations != tc.wantColumnRel { + t.Fatalf("column relations = %q, want %q and not %q", columnRelations, tc.wantColumnRel, tc.rejectColumn) + } + if strings.Contains(columnRelations, tc.rejectColumn) { + t.Fatalf("column relations leaked %q: %q", tc.rejectColumn, columnRelations) + } + }) + } +} + +func TestHexMetadataQueryDoesNotLeakDuckLakeIntoIceberg(t *testing.T) { + db, err := sql.Open("duckdb", ":memory:") + if err != nil { + t.Fatalf("open duckdb: %v", err) + } + db.SetMaxOpenConns(1) + defer func() { _ = db.Close() }() + + if err := initPgCatalog(db, time.Now(), time.Now(), "test", "test"); err != nil { + t.Fatalf("init pg catalog: %v", err) + } + if _, err := db.Exec(`ATTACH ':memory:' AS ducklake`); err != nil { + t.Fatalf("attach ducklake: %v", err) + } + if _, err := db.Exec(`ATTACH ':memory:' AS iceberg`); err != nil { + t.Fatalf("attach iceberg: %v", err) + } + if _, err := db.Exec("USE ducklake"); err != nil { + t.Fatalf("use ducklake: %v", err) + } + if _, err := db.Exec("CREATE SCHEMA stripe"); err != nil { + t.Fatalf("create ducklake stripe: %v", err) + } + if _, err := db.Exec("CREATE TABLE stripe.cash_balance(id INTEGER)"); err != nil { + t.Fatalf("create ducklake table: %v", err) + } + if _, err := db.Exec("CREATE SCHEMA iceberg.public"); err != nil { + t.Fatalf("create iceberg public: %v", err) + } + if _, err := db.Exec("CREATE SCHEMA iceberg.stripe"); err != nil { + t.Fatalf("create iceberg stripe: %v", err) + } + if _, err := db.Exec("CREATE TABLE iceberg.stripe.product(id INTEGER)"); err != nil { + t.Fatalf("create iceberg table: %v", err) + } + if _, err := db.Exec("USE memory"); err != nil { + t.Fatalf("use memory before pg catalog rewrite: %v", err) + } + if err := recreatePgClassForDuckLake(db); err != nil { + t.Fatalf("recreate pg_class_full for ducklake: %v", err) + } + if err := recreatePgNamespaceForDuckLake(db); err != nil { + t.Fatalf("recreate pg_namespace for ducklake: %v", err) + } + + executor := NewLocalExecutor(db) + if err := sessionmeta.InitSessionDatabaseMetadata(context.Background(), executor, "iceberg"); err != nil { + t.Fatalf("init session metadata: %v", err) + } + if _, err := db.Exec("USE iceberg.public"); err != nil { + t.Fatalf("use iceberg public: %v", err) + } + + var tables string + if err := db.QueryRow(` + SELECT string_agg("TABLE_NAME", ',' ORDER BY "TABLE_NAME") + FROM ( + SELECT current_database() AS "TABLE_CAT", + n.nspname AS "TABLE_SCHEM", + c.relname AS "TABLE_NAME" + FROM memory.main.pg_namespace n, + memory.main.pg_class_full c + WHERE c.relnamespace = n.oid + AND current_database() = 'iceberg' + AND c.relkind = 'r' + AND n.nspname = 'stripe' + ) q + `).Scan(&tables); err != nil { + t.Fatalf("hex table query: %v", err) + } + if tables != "product" { + t.Fatalf("Hex table query returned %q, want product only", tables) + } +} + +func TestPgTablesViewsSequencesOnlyExposeSelectedCatalog(t *testing.T) { + // DuckDB's native pg_tables/pg_views/pg_sequences span every attached + // catalog and ignore current_database(); the session-scoped compat views + // must return only the selected catalog's objects so a dual-catalog worker + // does not leak the other catalog's object names through these discovery + // surfaces. + tests := []struct { + name string + selected string + useAfterInit string + wantTable string + wantView string + wantSequence string + rejectTable string + }{ + { + name: "ducklake session sees ducklake only", + selected: "ducklake", + wantTable: "duck_only", + wantView: "duck_view", + wantSequence: "duck_seq", + rejectTable: "ice_only", + }, + { + name: "iceberg session sees iceberg only", + selected: "iceberg", + useAfterInit: "USE iceberg.public", + wantTable: "ice_only", + wantView: "ice_view", + wantSequence: "ice_seq", + rejectTable: "duck_only", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + db, err := sql.Open("duckdb", ":memory:") + if err != nil { + t.Fatalf("open duckdb: %v", err) + } + db.SetMaxOpenConns(1) + defer func() { _ = db.Close() }() + + if _, err := db.Exec(`ATTACH ':memory:' AS ducklake`); err != nil { + t.Fatalf("attach ducklake: %v", err) + } + if _, err := db.Exec(`ATTACH ':memory:' AS iceberg`); err != nil { + t.Fatalf("attach iceberg: %v", err) + } + + if _, err := db.Exec("USE ducklake"); err != nil { + t.Fatalf("use ducklake: %v", err) + } + if _, err := db.Exec("CREATE SCHEMA shared"); err != nil { + t.Fatalf("create ducklake schema: %v", err) + } + if _, err := db.Exec("CREATE TABLE shared.duck_only(id INTEGER)"); err != nil { + t.Fatalf("create ducklake table: %v", err) + } + if _, err := db.Exec("CREATE VIEW shared.duck_view AS SELECT id FROM shared.duck_only"); err != nil { + t.Fatalf("create ducklake view: %v", err) + } + if _, err := db.Exec("CREATE SEQUENCE shared.duck_seq"); err != nil { + t.Fatalf("create ducklake sequence: %v", err) + } + + if _, err := db.Exec("CREATE SCHEMA iceberg.public"); err != nil { + t.Fatalf("create iceberg public schema: %v", err) + } + if _, err := db.Exec("CREATE SCHEMA iceberg.shared"); err != nil { + t.Fatalf("create iceberg schema: %v", err) + } + if _, err := db.Exec("CREATE TABLE iceberg.shared.ice_only(id INTEGER)"); err != nil { + t.Fatalf("create iceberg table: %v", err) + } + if _, err := db.Exec("CREATE VIEW iceberg.shared.ice_view AS SELECT id FROM iceberg.shared.ice_only"); err != nil { + t.Fatalf("create iceberg view: %v", err) + } + if _, err := db.Exec("CREATE SEQUENCE iceberg.shared.ice_seq"); err != nil { + t.Fatalf("create iceberg sequence: %v", err) + } + if _, err := db.Exec("USE memory"); err != nil { + t.Fatalf("use memory: %v", err) + } + + executor := NewLocalExecutor(db) + if err := sessionmeta.InitSessionDatabaseMetadata(context.Background(), executor, tc.selected); err != nil { + t.Fatalf("init session database metadata: %v", err) + } + if tc.useAfterInit != "" { + if _, err := db.Exec(tc.useAfterInit); err != nil { + t.Fatalf("apply selected catalog: %v", err) + } + } + + assertSingleValue := func(label, query, want string) { + t.Helper() + var got string + if err := db.QueryRow(query).Scan(&got); err != nil { + t.Fatalf("%s query: %v", label, err) + } + if got != want { + t.Fatalf("%s = %q, want %q", label, got, want) + } + } + + assertSingleValue("pg_tables", ` + SELECT string_agg(tablename, ',' ORDER BY tablename) + FROM memory.main.pg_tables + WHERE schemaname = 'shared' + `, tc.wantTable) + assertSingleValue("pg_views", ` + SELECT string_agg(viewname, ',' ORDER BY viewname) + FROM memory.main.pg_views + WHERE schemaname = 'shared' + `, tc.wantView) + assertSingleValue("pg_sequences", ` + SELECT string_agg(sequencename, ',' ORDER BY sequencename) + FROM memory.main.pg_sequences + WHERE schemaname = 'shared' + `, tc.wantSequence) + + var leakCount int + if err := db.QueryRow(` + SELECT COUNT(*) + FROM memory.main.pg_tables + WHERE schemaname = 'shared' AND tablename = ? + `, tc.rejectTable).Scan(&leakCount); err != nil { + t.Fatalf("pg_tables leak query: %v", err) + } + if leakCount != 0 { + t.Fatalf("pg_tables leaked %q into %s session", tc.rejectTable, tc.selected) + } + }) + } +} + func TestInformationSchemaColumnsCompatScopesLoadedMetadataToSelectedCatalog(t *testing.T) { db, err := sql.Open("duckdb", ":memory:") if err != nil { @@ -391,6 +744,9 @@ func TestInformationSchemaColumnsCompatScopesLoadedMetadataToSelectedCatalog(t * if _, err := db.Exec("CREATE SCHEMA iceberg.public"); err != nil { t.Fatalf("create iceberg public schema: %v", err) } + if _, err := db.Exec("CREATE TABLE iceberg.billing_public.public_api_keys(__ VARCHAR)"); err != nil { + t.Fatalf("create iceberg dummy table: %v", err) + } executor := NewLocalExecutor(db) if err := sessionmeta.InitSessionDatabaseMetadata(context.Background(), executor, "ducklake"); err != nil { @@ -479,4 +835,21 @@ func TestInformationSchemaColumnsCompatScopesLoadedMetadataToSelectedCatalog(t * t.Fatalf("use iceberg.public: %v", err) } assertColumns("iceberg.billing_public,memory.main", "iceberg", "YES", "text") + + var pgAttributeColumns string + if err := db.QueryRow(` + SELECT string_agg(a.attname, ',' ORDER BY a.attnum) + FROM memory.main.pg_namespace n + JOIN memory.main.pg_class_full c ON c.relnamespace = n.oid + JOIN memory.main.pg_attribute a ON a.attrelid = c.oid + WHERE n.nspname = 'billing_public' + AND c.relname = 'public_api_keys' + AND a.attnum > 0 + AND NOT a.attisdropped + `).Scan(&pgAttributeColumns); err != nil { + t.Fatalf("query loaded iceberg pg_attribute columns: %v", err) + } + if pgAttributeColumns != "id,permissions" { + t.Fatalf("loaded iceberg pg_attribute columns = %q, want id,permissions", pgAttributeColumns) + } } diff --git a/server/sessionmeta/sessionmeta.go b/server/sessionmeta/sessionmeta.go index 72188eb6..5a8759ad 100644 --- a/server/sessionmeta/sessionmeta.go +++ b/server/sessionmeta/sessionmeta.go @@ -136,6 +136,12 @@ func buildSessionMetadataSQL(database string) string { sessionColumnMetadataTableSQL(), sessionIcebergColumnMetadataTableSQL(), buildSessionPgDatabaseViewSQL(database), + buildSessionPgClassViewSQL(), + buildSessionPgNamespaceViewSQL(), + buildSessionPgAttributeViewSQL(), + buildSessionPgTablesViewSQL(), + buildSessionPgViewsViewSQL(), + buildSessionPgSequencesViewSQL(), buildSessionInformationSchemaColumnsViewSQL(), buildSessionInformationSchemaTablesViewSQL(), buildSessionInformationSchemaSchemataViewSQL(), @@ -234,6 +240,469 @@ func buildSessionPgDatabaseViewSQL(database string) string { `, lit, lit, lit, lit, lit, lit, lit) } +func internalCompatRelationNamesSQL() string { + return ` + 'pg_database', 'pg_class_full', 'pg_collation', 'pg_policy', 'pg_roles', + 'pg_statistic_ext', 'pg_publication_tables', 'pg_rules', 'pg_publication', + 'pg_publication_rel', 'pg_inherits', 'pg_namespace', 'pg_matviews', + 'pg_stat_user_tables', 'pg_statio_user_tables', 'pg_stat_statements', 'pg_stat_activity', + 'pg_partitioned_table', 'pg_rewrite', 'pg_type', 'pg_attribute', + 'pg_tables', 'pg_views', 'pg_sequences', + 'information_schema_columns_compat', 'information_schema_tables_compat', + 'information_schema_schemata_compat', 'information_schema_views_compat', + '__duckgres_column_metadata', '__duckgres_iceberg_column_metadata' + ` +} + +func buildSessionPgClassViewSQL() string { + internalNames := internalCompatRelationNamesSQL() + return fmt.Sprintf(` + CREATE OR REPLACE VIEW main.pg_class_full AS + WITH active_catalog AS ( + SELECT current_database() AS catalog + ), + relations AS ( + SELECT + table_oid AS oid, + table_name AS relname, + schema_oid AS relnamespace, + 0 AS reltype, + 0 AS reloftype, + 0 AS relowner, + 0 AS relam, + 0 AS relfilenode, + 0 AS reltablespace, + 0 AS relpages, + CAST(estimated_size AS FLOAT) AS reltuples, + 0 AS relallvisible, + 0 AS reltoastrelid, + 0::BIGINT AS reltoastidxid, + (index_count > 0) AS relhasindex, + false AS relisshared, + CASE WHEN temporary THEN 't' ELSE 'p' END AS relpersistence, + 'r' AS relkind, + column_count AS relnatts, + check_constraint_count AS relchecks, + false AS relhasoids, + has_primary_key AS relhaspkey, + false AS relhasrules, + false AS relhastriggers, + false AS relhassubclass, + false AS relrowsecurity, + false AS relforcerowsecurity, + true AS relispopulated, + NULL AS relreplident, + false AS relispartition, + 0 AS relrewrite, + 0 AS relfrozenxid, + NULL AS relminmxid, + NULL AS relacl, + NULL AS reloptions, + NULL AS relpartbound, + database_name + FROM duckdb_tables() + WHERE table_name NOT IN (%s) + UNION ALL + SELECT + view_oid AS oid, + view_name AS relname, + schema_oid AS relnamespace, + 0 AS reltype, + 0 AS reloftype, + 0 AS relowner, + 0 AS relam, + 0 AS relfilenode, + 0 AS reltablespace, + 0 AS relpages, + 0 AS reltuples, + 0 AS relallvisible, + 0 AS reltoastrelid, + 0::BIGINT AS reltoastidxid, + false AS relhasindex, + false AS relisshared, + CASE WHEN temporary THEN 't' ELSE 'p' END AS relpersistence, + 'v' AS relkind, + column_count AS relnatts, + 0 AS relchecks, + false AS relhasoids, + false AS relhaspkey, + false AS relhasrules, + false AS relhastriggers, + false AS relhassubclass, + false AS relrowsecurity, + false AS relforcerowsecurity, + true AS relispopulated, + NULL AS relreplident, + false AS relispartition, + 0 AS relrewrite, + 0 AS relfrozenxid, + NULL AS relminmxid, + NULL AS relacl, + NULL AS reloptions, + NULL AS relpartbound, + database_name + FROM duckdb_views() + WHERE view_name NOT IN (%s) + UNION ALL + SELECT + sequence_oid AS oid, + sequence_name AS relname, + schema_oid AS relnamespace, + 0 AS reltype, + 0 AS reloftype, + 0 AS relowner, + 0 AS relam, + 0 AS relfilenode, + 0 AS reltablespace, + 0 AS relpages, + 0 AS reltuples, + 0 AS relallvisible, + 0 AS reltoastrelid, + 0::BIGINT AS reltoastidxid, + false AS relhasindex, + false AS relisshared, + CASE WHEN temporary THEN 't' ELSE 'p' END AS relpersistence, + 'S' AS relkind, + 0 AS relnatts, + 0 AS relchecks, + false AS relhasoids, + false AS relhaspkey, + false AS relhasrules, + false AS relhastriggers, + false AS relhassubclass, + false AS relrowsecurity, + false AS relforcerowsecurity, + true AS relispopulated, + NULL AS relreplident, + false AS relispartition, + 0 AS relrewrite, + 0 AS relfrozenxid, + NULL AS relminmxid, + NULL AS relacl, + NULL AS reloptions, + NULL AS relpartbound, + database_name + FROM duckdb_sequences() + WHERE sequence_name NOT IN (%s) + UNION ALL + SELECT + index_oid AS oid, + index_name AS relname, + schema_oid AS relnamespace, + 0 AS reltype, + 0 AS reloftype, + 0 AS relowner, + 0 AS relam, + 0 AS relfilenode, + 0 AS reltablespace, + 0 AS relpages, + 0 AS reltuples, + 0 AS relallvisible, + 0 AS reltoastrelid, + 0::BIGINT AS reltoastidxid, + false AS relhasindex, + false AS relisshared, + 't' AS relpersistence, + 'i' AS relkind, + NULL AS relnatts, + 0 AS relchecks, + false AS relhasoids, + false AS relhaspkey, + false AS relhasrules, + false AS relhastriggers, + false AS relhassubclass, + false AS relrowsecurity, + false AS relforcerowsecurity, + true AS relispopulated, + NULL AS relreplident, + false AS relispartition, + 0 AS relrewrite, + 0 AS relfrozenxid, + NULL AS relminmxid, + NULL AS relacl, + NULL AS reloptions, + NULL AS relpartbound, + database_name + FROM duckdb_indexes() + WHERE index_name NOT IN (%s) + ) + SELECT + oid, relname, relnamespace, reltype, reloftype, relowner, relam, + relfilenode, reltablespace, relpages, reltuples, relallvisible, + reltoastrelid, reltoastidxid, relhasindex, relisshared, + relpersistence, relkind, relnatts, relchecks, relhasoids, + relhaspkey, relhasrules, relhastriggers, relhassubclass, + relrowsecurity, relforcerowsecurity, relispopulated, relreplident, + relispartition, relrewrite, relfrozenxid, relminmxid, relacl, + reloptions, relpartbound + FROM relations r + CROSS JOIN active_catalog ac + WHERE r.database_name = ac.catalog + `, internalNames, internalNames, internalNames, internalNames) +} + +func buildSessionPgNamespaceViewSQL() string { + return ` + CREATE OR REPLACE VIEW main.pg_namespace AS + WITH active_catalog AS ( + SELECT current_database() AS catalog + ), + user_namespaces AS ( + SELECT schema_oid AS oid, schema_name, database_name FROM duckdb_tables() + UNION + SELECT schema_oid AS oid, schema_name, database_name FROM duckdb_views() + UNION + SELECT schema_oid AS oid, schema_name, database_name FROM duckdb_sequences() + UNION + SELECT schema_oid AS oid, schema_name, database_name FROM duckdb_indexes() + ) + SELECT DISTINCT + oid, + CASE WHEN schema_name = 'main' THEN 'public' ELSE schema_name END AS nspname, + CASE WHEN schema_name = 'main' THEN 6171::BIGINT ELSE 10::BIGINT END AS nspowner, + NULL AS nspacl + FROM user_namespaces n + CROSS JOIN active_catalog ac + WHERE n.database_name = ac.catalog + AND n.schema_name NOT LIKE '__ducklake_metadata_%' + UNION ALL + SELECT 11::BIGINT AS oid, 'pg_catalog' AS nspname, 10::BIGINT AS nspowner, NULL AS nspacl + UNION ALL + SELECT 12::BIGINT AS oid, 'information_schema' AS nspname, 10::BIGINT AS nspowner, NULL AS nspacl + UNION ALL + SELECT 99::BIGINT AS oid, 'pg_toast' AS nspname, 10::BIGINT AS nspowner, NULL AS nspacl + ` +} + +func pgTypeOIDCaseSQL(dataTypeExpr, fallbackExpr string) string { + template := `CASE + WHEN {dt} LIKE 'DECIMAL%' OR {dt} LIKE 'NUMERIC%' THEN 1700::UINTEGER + WHEN {dt} = 'INTEGER' THEN 23::UINTEGER + WHEN {dt} = 'BIGINT' THEN 20::UINTEGER + WHEN {dt} = 'SMALLINT' THEN 21::UINTEGER + WHEN {dt} = 'TINYINT' THEN 21::UINTEGER + WHEN {dt} = 'HUGEINT' THEN 1700::UINTEGER + WHEN {dt} = 'UBIGINT' THEN 1700::UINTEGER + WHEN {dt} = 'UINTEGER' THEN 20::UINTEGER + WHEN {dt} = 'USMALLINT' THEN 23::UINTEGER + WHEN {dt} = 'UTINYINT' THEN 21::UINTEGER + WHEN {dt} = 'FLOAT' OR {dt} = 'DOUBLE' THEN 701::UINTEGER + WHEN {dt} = 'REAL' THEN 700::UINTEGER + WHEN {dt} = 'VARCHAR' THEN 1043::UINTEGER + WHEN {dt} = 'TEXT' THEN 25::UINTEGER + WHEN {dt} = 'CHAR' OR {dt} = 'BPCHAR' THEN 1042::UINTEGER + WHEN {dt} = 'BOOLEAN' THEN 16::UINTEGER + WHEN {dt} = 'BLOB' OR {dt} = 'BYTEA' THEN 17::UINTEGER + WHEN {dt} = 'DATE' THEN 1082::UINTEGER + WHEN {dt} = 'TIME' THEN 1083::UINTEGER + WHEN {dt} = 'TIMESTAMP' THEN 1114::UINTEGER + WHEN {dt} LIKE 'TIMESTAMP WITH TIME ZONE%' THEN 1184::UINTEGER + WHEN {dt} LIKE 'TIME WITH TIME ZONE%' THEN 1266::UINTEGER + WHEN {dt} = 'INTERVAL' THEN 1186::UINTEGER + WHEN {dt} = 'UUID' THEN 2950::UINTEGER + WHEN {dt} = 'BIT' THEN 1560::UINTEGER + WHEN {dt} = 'JSON' THEN 114::UINTEGER + WHEN {dt} = 'INTEGER[]' THEN 1007::UINTEGER + WHEN {dt} = 'BIGINT[]' THEN 1016::UINTEGER + WHEN {dt} = 'SMALLINT[]' THEN 1005::UINTEGER + WHEN {dt} = 'VARCHAR[]' THEN 1015::UINTEGER + WHEN {dt} = 'TEXT[]' THEN 1009::UINTEGER + WHEN {dt} = 'BOOLEAN[]' THEN 1000::UINTEGER + WHEN {dt} = 'FLOAT[]' OR {dt} = 'DOUBLE[]' THEN 1022::UINTEGER + WHEN {dt} = 'REAL[]' THEN 1021::UINTEGER + WHEN {dt} = 'DATE[]' THEN 1182::UINTEGER + WHEN {dt} = 'TIMESTAMP[]' THEN 1115::UINTEGER + WHEN {dt} LIKE 'NUMERIC%[]' OR {dt} LIKE 'DECIMAL%[]' THEN 1231::UINTEGER + WHEN {dt} = 'UUID[]' THEN 2951::UINTEGER + WHEN {dt} = 'INTERVAL[]' THEN 1187::UINTEGER + WHEN {dt} = 'BLOB[]' OR {dt} = 'BYTEA[]' THEN 1001::UINTEGER + WHEN {dt} LIKE 'STRUCT%' THEN 2249::UINTEGER + ELSE {fallback}::UINTEGER + END` + return strings.NewReplacer("{dt}", dataTypeExpr, "{fallback}", fallbackExpr).Replace(template) +} + +func buildSessionPgAttributeViewSQL() string { + nativeTypeOID := pgTypeOIDCaseSQL("UPPER(dc.data_type)", "a.atttypid") + loadedTypeOID := pgTypeOIDCaseSQL("UPPER(im.data_type)", "1043") + return fmt.Sprintf(` + CREATE OR REPLACE VIEW main.pg_attribute AS + WITH native_attributes AS ( + SELECT + a.attrelid::UINTEGER AS attrelid, + a.attname, + %s AS atttypid, + a.attstattarget, + a.attlen, + a.attnum::SMALLINT AS attnum, + a.attndims::SMALLINT AS attndims, + a.attcacheoff, + CASE + WHEN (dc.data_type LIKE 'DECIMAL%%' OR dc.data_type LIKE 'NUMERIC%%') AND a.atttypmod > 0 THEN + (((a.atttypmod / 1000)::INTEGER << 16) | ((a.atttypmod %% 1000)::INTEGER + 4))::INTEGER + ELSE a.atttypmod + END AS atttypmod, + a.attbyval, + a.attalign, + a.attstorage, + a.attcompression, + a.attnotnull, + a.atthasdef, + a.atthasmissing, + a.attidentity, + a.attgenerated, + a.attisdropped, + a.attislocal, + a.attinhcount::INTEGER AS attinhcount, + a.attcollation::UINTEGER AS attcollation, + a.attacl, + a.attoptions, + a.attfdwoptions, + a.attmissingval + FROM pg_catalog.pg_attribute a + JOIN main.pg_class_full c ON c.oid = a.attrelid + JOIN main.pg_namespace n ON n.oid = c.relnamespace + LEFT JOIN duckdb_columns() dc ON dc.table_oid = a.attrelid AND dc.column_name = a.attname + WHERE NOT ( + current_database() = 'iceberg' + AND a.attname = '__' + ) + AND NOT ( + current_database() = 'iceberg' + AND EXISTS ( + SELECT 1 + FROM main.__duckgres_iceberg_column_metadata im + WHERE im.table_schema = n.nspname + AND im.table_name = c.relname + ) + ) + ), + loaded_iceberg_attributes AS ( + SELECT + c.oid::UINTEGER AS attrelid, + im.column_name AS attname, + %s AS atttypid, + -1::INTEGER AS attstattarget, + -1::INTEGER AS attlen, + im.ordinal_position::SMALLINT AS attnum, + 0::SMALLINT AS attndims, + 0::INTEGER AS attcacheoff, + -1::INTEGER AS atttypmod, + false AS attbyval, + 'i' AS attalign, + 'x' AS attstorage, + NULL AS attcompression, + (im.is_nullable = 'NO') AS attnotnull, + false AS atthasdef, + false AS atthasmissing, + '' AS attidentity, + '' AS attgenerated, + false AS attisdropped, + true AS attislocal, + 0::INTEGER AS attinhcount, + 0::UINTEGER AS attcollation, + NULL AS attacl, + NULL AS attoptions, + NULL AS attfdwoptions, + NULL AS attmissingval + FROM main.__duckgres_iceberg_column_metadata im + JOIN main.pg_namespace n ON n.nspname = im.table_schema + JOIN main.pg_class_full c ON c.relnamespace = n.oid AND c.relname = im.table_name + WHERE current_database() = 'iceberg' + ) + SELECT * FROM native_attributes + UNION ALL + SELECT * FROM loaded_iceberg_attributes + `, nativeTypeOID, loadedTypeOID) +} + +// buildSessionPgTablesViewSQL builds the catalog-scoped pg_catalog.pg_tables +// compat view. DuckDB's native pg_tables spans EVERY attached catalog and +// ignores current_database(), so on a dual-catalog worker (DuckLake + Iceberg +// attached on one connection) a bare pg_tables would leak the other catalog's +// table names into the session. This view sources duckdb_tables() filtered to +// current_database() so an Iceberg session only sees Iceberg tables and a +// DuckLake session only sees DuckLake tables. Column shape mirrors DuckDB's +// native pg_tables (schemaname, tablename, tableowner, tablespace, hasindexes, +// hasrules, hastriggers) so clients see no change beyond the scoping. +func buildSessionPgTablesViewSQL() string { + internalNames := internalCompatRelationNamesSQL() + return fmt.Sprintf(` + CREATE OR REPLACE VIEW main.pg_tables AS + WITH active_catalog AS ( + SELECT current_database() AS catalog + ) + SELECT + CASE WHEN t.schema_name = 'main' THEN 'public' ELSE t.schema_name END AS schemaname, + t.table_name AS tablename, + 'duckdb' AS tableowner, + NULL AS tablespace, + (t.index_count > 0) AS hasindexes, + false AS hasrules, + false AS hastriggers + FROM duckdb_tables() t + CROSS JOIN active_catalog ac + WHERE t.database_name = ac.catalog + AND t.schema_name NOT LIKE '__ducklake_metadata_%%' + AND t.table_name NOT IN (%s) + `, internalNames) +} + +// buildSessionPgViewsViewSQL builds the catalog-scoped pg_catalog.pg_views +// compat view (same cross-catalog-leak rationale as buildSessionPgTablesViewSQL). +// Sources duckdb_views() filtered to current_database(); the compat views +// themselves live in memory.main and are excluded by name so a memory-catalog +// session does not surface them as user views. +func buildSessionPgViewsViewSQL() string { + internalNames := internalCompatRelationNamesSQL() + return fmt.Sprintf(` + CREATE OR REPLACE VIEW main.pg_views AS + WITH active_catalog AS ( + SELECT current_database() AS catalog + ) + SELECT + CASE WHEN v.schema_name = 'main' THEN 'public' ELSE v.schema_name END AS schemaname, + v.view_name AS viewname, + 'duckdb' AS viewowner, + v.sql AS definition + FROM duckdb_views() v + CROSS JOIN active_catalog ac + WHERE v.database_name = ac.catalog + AND v.schema_name NOT LIKE '__ducklake_metadata_%%' + AND v.view_name NOT IN (%s) + `, internalNames) +} + +// buildSessionPgSequencesViewSQL builds the catalog-scoped pg_catalog.pg_sequences +// compat view (same cross-catalog-leak rationale as buildSessionPgTablesViewSQL). +// Sources duckdb_sequences() filtered to current_database(). DuckDB's +// duckdb_sequences() does not expose data_type/cache_size, so those are +// synthesized to match PostgreSQL's pg_sequences shape. +func buildSessionPgSequencesViewSQL() string { + return ` + CREATE OR REPLACE VIEW main.pg_sequences AS + WITH active_catalog AS ( + SELECT current_database() AS catalog + ) + SELECT + CASE WHEN s.schema_name = 'main' THEN 'public' ELSE s.schema_name END AS schemaname, + s.sequence_name AS sequencename, + 'duckdb' AS sequenceowner, + 'bigint' AS data_type, + s.start_value AS start_value, + s.min_value AS min_value, + s.max_value AS max_value, + s.increment_by AS increment_by, + s.cycle AS cycle, + NULL AS cache_size, + s.last_value AS last_value + FROM duckdb_sequences() s + CROSS JOIN active_catalog ac + WHERE s.database_name = ac.catalog + AND s.schema_name NOT LIKE '__ducklake_metadata_%' + ` +} + func buildSessionInformationSchemaColumnsViewSQL() string { return ` CREATE OR REPLACE VIEW main.information_schema_columns_compat AS @@ -459,6 +928,7 @@ func buildSessionInformationSchemaTablesViewSQL() string { 'pg_publication_tables', 'pg_roles', 'pg_rules', 'pg_statistic_ext', 'pg_matviews', 'pg_stat_user_tables', 'pg_statio_user_tables', 'pg_stat_statements', 'pg_stat_activity', 'pg_partitioned_table', 'pg_rewrite', 'pg_attribute', + 'pg_tables', 'pg_views', 'pg_sequences', 'information_schema_columns_compat', 'information_schema_tables_compat', 'information_schema_schemata_compat', 'information_schema_views_compat' ) @@ -532,6 +1002,7 @@ func buildSessionInformationSchemaViewsViewSQL() string { 'pg_publication_tables', 'pg_roles', 'pg_rules', 'pg_statistic_ext', 'pg_matviews', 'pg_stat_user_tables', 'pg_statio_user_tables', 'pg_stat_statements', 'pg_stat_activity', 'pg_partitioned_table', 'pg_rewrite', 'pg_attribute', + 'pg_tables', 'pg_views', 'pg_sequences', 'information_schema_columns_compat', 'information_schema_tables_compat', 'information_schema_schemata_compat', 'information_schema_views_compat' ) diff --git a/transpiler/transform/pgcatalog.go b/transpiler/transform/pgcatalog.go index 6fe2d8f2..57121290 100644 --- a/transpiler/transform/pgcatalog.go +++ b/transpiler/transform/pgcatalog.go @@ -33,41 +33,44 @@ func NewPgCatalogTransformWithConfig(duckLakeMode bool) *PgCatalogTransform { return &PgCatalogTransform{ DuckLakeMode: duckLakeMode, ViewMappings: map[string]string{ - "pg_class": "pg_class_full", - "pg_database": "pg_database", - "pg_namespace": "pg_namespace", - "pg_collation": "pg_collation", - "pg_policy": "pg_policy", - "pg_roles": "pg_roles", - "pg_statistic_ext": "pg_statistic_ext", - "pg_publication_tables": "pg_publication_tables", - "pg_rules": "pg_rules", - "pg_publication": "pg_publication", - "pg_publication_rel": "pg_publication_rel", - "pg_inherits": "pg_inherits", - "pg_matviews": "pg_matviews", - "pg_stat_user_tables": "pg_stat_user_tables", - "pg_statio_user_tables": "pg_statio_user_tables", - "pg_stat_statements": "pg_stat_statements", - "pg_partitioned_table": "pg_partitioned_table", - "pg_type": "pg_type", - "pg_attribute": "pg_attribute", - "pg_constraint": "pg_constraint", - "pg_enum": "pg_enum", - "pg_indexes": "pg_indexes", - "pg_stat_activity": "pg_stat_activity", - "pg_shdescription": "pg_shdescription", - "pg_auth_members": "pg_auth_members", - "pg_opclass": "pg_opclass", - "pg_conversion": "pg_conversion", - "pg_language": "pg_language", - "pg_extension": "pg_extension", - "pg_foreign_server": "pg_foreign_server", - "pg_foreign_data_wrapper": "pg_foreign_data_wrapper", - "pg_foreign_table": "pg_foreign_table", - "pg_trigger": "pg_trigger", - "pg_locks": "pg_locks", - "pg_rewrite": "pg_rewrite", + "pg_class": "pg_class_full", + "pg_database": "pg_database", + "pg_namespace": "pg_namespace", + "pg_tables": "pg_tables", + "pg_views": "pg_views", + "pg_sequences": "pg_sequences", + "pg_collation": "pg_collation", + "pg_policy": "pg_policy", + "pg_roles": "pg_roles", + "pg_statistic_ext": "pg_statistic_ext", + "pg_publication_tables": "pg_publication_tables", + "pg_rules": "pg_rules", + "pg_publication": "pg_publication", + "pg_publication_rel": "pg_publication_rel", + "pg_inherits": "pg_inherits", + "pg_matviews": "pg_matviews", + "pg_stat_user_tables": "pg_stat_user_tables", + "pg_statio_user_tables": "pg_statio_user_tables", + "pg_stat_statements": "pg_stat_statements", + "pg_partitioned_table": "pg_partitioned_table", + "pg_type": "pg_type", + "pg_attribute": "pg_attribute", + "pg_constraint": "pg_constraint", + "pg_enum": "pg_enum", + "pg_indexes": "pg_indexes", + "pg_stat_activity": "pg_stat_activity", + "pg_shdescription": "pg_shdescription", + "pg_auth_members": "pg_auth_members", + "pg_opclass": "pg_opclass", + "pg_conversion": "pg_conversion", + "pg_language": "pg_language", + "pg_extension": "pg_extension", + "pg_foreign_server": "pg_foreign_server", + "pg_foreign_data_wrapper": "pg_foreign_data_wrapper", + "pg_foreign_table": "pg_foreign_table", + "pg_trigger": "pg_trigger", + "pg_locks": "pg_locks", + "pg_rewrite": "pg_rewrite", }, Functions: map[string]bool{ "pg_get_userbyid": true, @@ -162,31 +165,31 @@ func NewPgCatalogTransformWithConfig(duckLakeMode bool) *PgCatalogTransform { "quote_nullable": true, // Quote nullable value // ClickHouse SQL macros (server/chsql.go initClickHouseMacros) - "tostring": true, - "toint32": true, - "toint64": true, - "tofloat": true, - "toint32ornull": true, - "toint32orzero": true, - "intdiv": true, - "modulo": true, - "empty": true, - "notempty": true, - "splitbychar": true, - "lengthutf8": true, - "toyear": true, - "tomonth": true, - "todayofmonth": true, - "toyyyymmdd": true, - "toyyyymm": true, - "protocol": true, - "domain": true, - "topleveldomain": true, - "ipv4numtostring": true, + "tostring": true, + "toint32": true, + "toint64": true, + "tofloat": true, + "toint32ornull": true, + "toint32orzero": true, + "intdiv": true, + "modulo": true, + "empty": true, + "notempty": true, + "splitbychar": true, + "lengthutf8": true, + "toyear": true, + "tomonth": true, + "todayofmonth": true, + "toyyyymmdd": true, + "toyyyymm": true, + "protocol": true, + "domain": true, + "topleveldomain": true, + "ipv4numtostring": true, "jsonextractstring": true, - "jsonhas": true, - "generateuuidv4": true, - "ifnull": true, + "jsonhas": true, + "generateuuidv4": true, + "ifnull": true, }, } } diff --git a/transpiler/transpiler.go b/transpiler/transpiler.go index 95fe9337..348a3460 100644 --- a/transpiler/transpiler.go +++ b/transpiler/transpiler.go @@ -335,6 +335,7 @@ func Classify(sql string, cfg Config) Classification { "PG_INDEX", "PG_CONSTRAINT", "PG_DATABASE", "PG_ROLES", "PG_STAT", "PG_STATIO", "PG_COLLATION", "PG_POLICY", "PG_PUBLICATION", "PG_INHERITS", "PG_MATVIEWS", "PG_ENUM", "PG_INDEXES", + "PG_TABLES", "PG_VIEWS", "PG_SEQUENCES", "PG_ATTRDEF", "PG_AM", "PG_DESCRIPTION", "PG_DEPEND", "PG_SHDESCRIPTION", "PG_PROC", "PG_EXTENSION", "PG_AVAILABLE_EXTENSIONS", "PG_SETTINGS", diff --git a/transpiler/transpiler_test.go b/transpiler/transpiler_test.go index 0600cf5f..8c5a73e4 100644 --- a/transpiler/transpiler_test.go +++ b/transpiler/transpiler_test.go @@ -90,6 +90,23 @@ func TestTranspile_PgCatalog(t *testing.T) { contains: "memory.main.pg_database", excludes: "pg_catalog", }, + { + name: "pg_catalog.pg_tables -> memory.main.pg_tables", + input: "SELECT * FROM pg_catalog.pg_tables", + contains: "memory.main.pg_tables", + excludes: "pg_catalog", + }, + { + name: "unqualified pg_views -> memory.main.pg_views", + input: "SELECT * FROM pg_views", + contains: "memory.main.pg_views", + }, + { + name: "pg_catalog.pg_sequences -> memory.main.pg_sequences", + input: "SELECT * FROM pg_catalog.pg_sequences", + contains: "memory.main.pg_sequences", + excludes: "pg_catalog", + }, { name: "pg_catalog.pg_statio_user_tables -> memory.main.pg_statio_user_tables", input: "SELECT * FROM pg_catalog.pg_statio_user_tables",