Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions duckdbservice/arrow_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,24 @@ type contextQueryer interface {
QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
}

// isExplainQuery reports whether the (already upper-cased) query is an EXPLAIN
// statement, i.e. starts with the EXPLAIN keyword followed by a delimiter.
func isExplainQuery(upper string) bool {
s := strings.TrimSpace(upper)
const kw = "EXPLAIN"
if !strings.HasPrefix(s, kw) {
return false
}
if len(s) == len(kw) {
return true
}
switch s[len(kw)] {
case ' ', '\t', '\n', '\r', '(':
return true
}
return false
}

func isNil(i contextQueryer) bool {
if i == nil {
return true
Expand All @@ -89,6 +107,20 @@ func GetQuerySchema(ctx context.Context, db contextQueryer, query string, tx con
q := strings.TrimRight(strings.TrimSpace(query), ";")
queryWithLimit := q
upper := strings.ToUpper(q)
// EXPLAIN [ANALYZE] returns a fixed single-column textual plan. We must NOT
// execute it to discover its schema: EXPLAIN ANALYZE runs the statement to
// gather statistics, so executing it here as a schema probe would run (and,
// for a write, mutate) the statement a second time on top of the real DoGet
// execution. Return a synthetic schema without executing.
if isExplainQuery(upper) {
name := "physical_plan"
if strings.Contains(upper, "ANALYZE") {
name = "analyzed_plan"
}
return arrow.NewSchema([]arrow.Field{
{Name: name, Type: arrowmap.DuckDBTypeToArrow("VARCHAR"), Nullable: true},
}, nil), nil
}
// Only append LIMIT 0 for SELECT/WITH/VALUES/TABLE statements.
// SHOW, DESCRIBE, EXPLAIN, PRAGMA, CALL etc. don't support LIMIT.
if !strings.Contains(upper, "LIMIT") && arrowmap.SupportsLimit(upper) {
Expand Down
48 changes: 48 additions & 0 deletions duckdbservice/arrow_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,54 @@ func TestNestedTypesRoundTrip(t *testing.T) {
}
}

func TestGetQuerySchemaExplainDoesNotExecute(t *testing.T) {
// Regression: GetQuerySchema used to execute EXPLAIN ANALYZE to learn its
// schema, which for a write mutates — and DoGet then executes it again,
// double-inserting. EXPLAIN must now yield a synthetic schema without running.
db, err := sql.Open("duckdb", "")
if err != nil {
t.Fatalf("failed to open DuckDB: %v", err)
}
defer func() { _ = db.Close() }()

if _, err := db.Exec("CREATE TABLE t (id INTEGER)"); err != nil {
t.Fatalf("create table: %v", err)
}

cases := []struct {
name string
query string
wantCol string
}{
{"explain select", "EXPLAIN SELECT 1", "physical_plan"},
{"explain analyze insert", "EXPLAIN ANALYZE INSERT INTO t VALUES (1)", "analyzed_plan"},
{"explain analyze parens", "EXPLAIN (ANALYZE) INSERT INTO t VALUES (2)", "analyzed_plan"},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
schema, err := GetQuerySchema(context.Background(), db, tc.query, nil)
if err != nil {
t.Fatalf("GetQuerySchema(%q) error: %v", tc.query, err)
}
if schema.NumFields() != 1 {
t.Fatalf("GetQuerySchema(%q) = %d fields, want 1", tc.query, schema.NumFields())
}
if got := schema.Field(0).Name; got != tc.wantCol {
t.Errorf("column name = %q, want %q", got, tc.wantCol)
}
})
}

// The schema probes above must NOT have inserted any rows.
var n int
if err := db.QueryRow("SELECT count(*) FROM t").Scan(&n); err != nil {
t.Fatalf("count: %v", err)
}
if n != 0 {
t.Errorf("EXPLAIN ANALYZE schema probe executed the write: %d rows, want 0", n)
}
}

func TestGetQuerySchemaTrailingSemicolon(t *testing.T) {
// Regression test: queries ending with ";" caused "syntax error at or near LIMIT"
// because GetQuerySchema appended " LIMIT 0" after the semicolon, producing "; LIMIT 0".
Expand Down
121 changes: 107 additions & 14 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ type preparedStmt struct {
cursorName string // Cursor name
cursorQuery string // Transpiled inner SELECT (for DECLARE)
fetchCount int64 // FETCH row count
warnings []string // Transpiler warnings to surface as NoticeResponse at Execute
}

type portal struct {
Expand Down Expand Up @@ -450,6 +451,13 @@ func classifyErrorCode(err error) string {
// classifiers below apply; otherwise every Iceberg/DuckLake worker error
// would fall through to XX000 and the client would see the raw rpc string.
msg := unwrapFlightError(err.Error())
// Iceberg schema-evolution failure: after a column is dropped (or other
// schema churn) DuckDB's iceberg extension can fail every scan with
// "Tried to scan a snapshot created with a newer schema id ...". Surface it
// as feature_not_supported with an actionable message rather than XX000.
if isSchemaEvolutionErrorMsg(msg) {
return "0A000"
}
switch {
case strings.HasPrefix(msg, "Catalog Error:"):
return catalogErrorCode(msg)
Expand Down Expand Up @@ -496,6 +504,24 @@ func transformErrorSQLState(err error) string {
return "42704"
}

// isSchemaEvolutionErrorMsg reports whether an (already Flight-unwrapped) DuckDB
// error message is the Iceberg "newer schema id" scan failure that a DROP COLUMN
// can leave a table in. Matched on a stable substring; if the wording changes
// the error simply falls back to its generic class (still safe).
func isSchemaEvolutionErrorMsg(msg string) bool {
return strings.Contains(msg, "newer schema id")
}

// friendlyExecError rewrites known-cryptic DuckDB/Iceberg execution errors into
// actionable client messages. Returns the original message unchanged otherwise.
func friendlyExecError(err error) string {
msg := err.Error()
if isSchemaEvolutionErrorMsg(unwrapFlightError(msg)) {
return "reading this table failed because a column was dropped: DROP COLUMN is not safely supported on this catalog (it can leave the table unreadable after schema changes); recreate the table instead"
}
return msg
}

// unwrapFlightError recovers the underlying error message from an Arrow Flight /
// gRPC wrapper. gRPC errors stringify as "… rpc error: code = X desc = <message>",
// and the control plane further prefixes worker failures with "flight execute
Expand Down Expand Up @@ -1494,6 +1520,12 @@ func (c *clientConn) handleQuery(body []byte) error {
return nil
}

// Surface any transpiler warnings (e.g. an unenforced constraint stripped on a
// lake catalog) as NoticeResponse before the command result.
for _, w := range result.Warnings {
c.sendNotice("WARNING", "01000", w)
}

// Handle ignored SET parameters
if result.IsIgnoredSet {
slog.Debug("Ignoring PostgreSQL-specific SET.", "user", c.username, "query", query)
Expand Down Expand Up @@ -1587,7 +1619,7 @@ func (c *clientConn) handleQuery(body []byte) error {
}
if err != nil {
errCode := classifyErrorCode(err)
errMsg := err.Error()
errMsg := friendlyExecError(err)
if c.isCallerCancellation(err) {
errMsg = "canceling statement due to user request"
} else {
Expand Down Expand Up @@ -1675,7 +1707,7 @@ func (c *clientConn) executeQueryDirect(query, cmdType string) error {
if err != nil {
queryFinalErr = err
errCode := classifyErrorCode(err)
errMsg := err.Error()
errMsg := friendlyExecError(err)
if c.isCallerCancellation(err) {
errMsg = "canceling statement due to user request"
} else {
Expand Down Expand Up @@ -1869,7 +1901,7 @@ func (c *clientConn) executeSelectQuery(query string, cmdType string) (int64, st
if err != nil {
queryFinalErr = err
errCode := classifyErrorCode(err)
errMsg := err.Error()
errMsg := friendlyExecError(err)
if c.isCallerCancellation(err) {
errMsg = "canceling statement due to user request"
} else {
Expand All @@ -1887,7 +1919,7 @@ func (c *clientConn) executeSelectQuery(query string, cmdType string) (int64, st
if err != nil {
queryFinalErr = err
errCode := "42000"
errMsg := err.Error()
errMsg := friendlyExecError(err)
if !c.isCallerCancellation(err) {
c.logQueryError(query, err)
}
Expand All @@ -1902,7 +1934,7 @@ func (c *clientConn) executeSelectQuery(query string, cmdType string) (int64, st
if err != nil {
queryFinalErr = err
errCode := "42000"
errMsg := err.Error()
errMsg := friendlyExecError(err)
if !c.isCallerCancellation(err) {
c.logQueryError(query, err)
}
Expand Down Expand Up @@ -1947,7 +1979,7 @@ func (c *clientConn) executeSelectQuery(query string, cmdType string) (int64, st
if err := rows.Scan(valuePtrs...); err != nil {
queryFinalErr = err
errCode := "42000"
errMsg := err.Error()
errMsg := friendlyExecError(err)
if !c.isCallerCancellation(err) {
c.logQueryError(query, err)
}
Expand All @@ -1972,7 +2004,7 @@ func (c *clientConn) executeSelectQuery(query string, cmdType string) (int64, st
if err := rows.Err(); err != nil {
queryFinalErr = err
errCode := "42000"
errMsg := err.Error()
errMsg := friendlyExecError(err)
if c.isCallerCancellation(err) {
errCode = "57014"
errMsg = "canceling statement due to user request"
Expand Down Expand Up @@ -2294,7 +2326,7 @@ func (c *clientConn) executeSingleStatement(query string) (errSent bool, fatalEr
if err != nil {
queryFinalErr = err
errCode := classifyErrorCode(err)
errMsg := err.Error()
errMsg := friendlyExecError(err)
if c.isCallerCancellation(err) {
errMsg = "canceling statement due to user request"
} else {
Expand Down Expand Up @@ -2346,7 +2378,7 @@ func (c *clientConn) executeSingleStatement(query string) (errSent bool, fatalEr
if err != nil {
queryFinalErr = err
errCode := classifyErrorCode(err)
errMsg := err.Error()
errMsg := friendlyExecError(err)
if c.isCallerCancellation(err) {
errMsg = "canceling statement due to user request"
} else {
Expand Down Expand Up @@ -3127,6 +3159,41 @@ func isWithDML(query string) bool {
strings.HasPrefix(outer, "DELETE")
}

// isExplainStmt reports whether the query is an EXPLAIN statement (the EXPLAIN
// keyword followed by a space or '('). Used to avoid executing EXPLAIN at
// Describe time — EXPLAIN ANALYZE of a write mutates, and a describe-probe
// execution would run it a second time.
func isExplainStmt(query string) bool {
upper := strings.ToUpper(stripLeadingNoise(query))
const kw = "EXPLAIN"
if !strings.HasPrefix(upper, kw) {
return false
}
if len(upper) == len(kw) {
return true
}
switch upper[len(kw)] {
case ' ', '\t', '\n', '\r', '(':
return true
}
return false
}

// explainPlanColumn returns the single column name DuckDB uses for an EXPLAIN
// result: "analyzed_plan" for EXPLAIN ANALYZE, "physical_plan" otherwise.
func explainPlanColumn(query string) string {
if strings.Contains(strings.ToUpper(query), "ANALYZE") {
return "analyzed_plan"
}
return "physical_plan"
}

// staticColumnType is a minimal ColumnTyper reporting a fixed DuckDB type name,
// used to synthesize a RowDescription without executing a query.
type staticColumnType string

func (s staticColumnType) DatabaseTypeName() string { return string(s) }

// skipBalancedParens advances past a parenthesized group in an uppercased SQL
// string. i must point to the character immediately after the opening '('.
// It tracks paren depth while correctly skipping SQL constructs that may
Expand Down Expand Up @@ -5338,6 +5405,7 @@ func (c *clientConn) handleParse(body []byte) {
noOpTag: result.NoOpTag,
statements: result.Statements, // Multi-statement rewrite (writable CTE)
cleanupStatements: result.CleanupStatements, // Cleanup statements
warnings: result.Warnings, // Surfaced as NoticeResponse at Execute
}

slog.Debug("Prepared statement.", "user", c.username, "name", stmtName, "query", query)
Expand Down Expand Up @@ -5535,6 +5603,16 @@ func (c *clientConn) handleDescribe(body []byte) {
return
}

// EXPLAIN [ANALYZE] returns a single textual plan column. Describing it via
// the LIMIT-0 probe below would EXECUTE it — and EXPLAIN ANALYZE of a write
// mutates — so the statement would run at Describe and again at Execute.
// Send a synthetic RowDescription without executing.
if isExplainStmt(ps.query) {
_ = c.sendRowDescription([]string{explainPlanColumn(ps.query)}, []ColumnTyper{staticColumnType("VARCHAR")})
ps.described = true
return
}

// For SELECT, we need to describe the result columns
// The cleanest approach is to add a "WHERE false" or "LIMIT 0" clause
// to get column info without actually running the query
Expand Down Expand Up @@ -5639,6 +5717,15 @@ func (c *clientConn) handleDescribe(body []byte) {
return
}

// EXPLAIN [ANALYZE]: synthesize the single plan column without executing
// (see the statement-Describe branch above).
if isExplainStmt(p.stmt.query) {
_ = c.sendRowDescriptionWithFormats([]string{explainPlanColumn(p.stmt.query)}, []ColumnTyper{staticColumnType("VARCHAR")}, p.resultFormats)
p.described = true
p.stmt.described = true
return
}

// For SELECT, we need to describe the result columns
// We'll do a trial query with LIMIT 0 to get column info
args, err := p.decodeParams()
Expand Down Expand Up @@ -5772,6 +5859,12 @@ func (c *clientConn) handleExecute(body []byte) {

slog.Debug("Execute portal.", "user", c.username, "portal", portalName, "params", len(args), "query", p.stmt.query)

// Surface any transpiler warnings (e.g. an unenforced constraint stripped on a
// lake catalog) as NoticeResponse before the command result.
for _, w := range p.stmt.warnings {
c.sendNotice("WARNING", "01000", w)
}

// Check if this is a PostgreSQL-specific SET command that should be ignored
// (determined by transpiler during Parse)
if p.stmt.isIgnoredSet {
Expand Down Expand Up @@ -5861,7 +5954,7 @@ func (c *clientConn) handleExecute(body []byte) {
if err != nil {
queryFinalErr = err
errCode := classifyErrorCode(err)
errMsg := err.Error()
errMsg := friendlyExecError(err)
if c.isCallerCancellation(err) {
errMsg = "canceling statement due to user request"
} else {
Expand Down Expand Up @@ -5913,7 +6006,7 @@ func (c *clientConn) handleExecute(body []byte) {
if err != nil {
queryFinalErr = err
errCode := classifyErrorCode(err)
errMsg := err.Error()
errMsg := friendlyExecError(err)
if c.isCallerCancellation(err) {
errMsg = "canceling statement due to user request"
} else {
Expand Down Expand Up @@ -5987,7 +6080,7 @@ func (c *clientConn) handleExecute(body []byte) {
if err := rows.Err(); err != nil {
queryFinalErr = err
errCode := "42000"
errMsg := err.Error()
errMsg := friendlyExecError(err)
if c.isCallerCancellation(err) {
errCode = "57014"
errMsg = "canceling statement due to user request"
Expand Down Expand Up @@ -6547,7 +6640,7 @@ func (c *clientConn) handleFetchCursor(query string, stmt *pg_query.FetchStmt) e
if cursor.rows == nil {
if err := c.openCursor(cursor); err != nil {
errCode := "42000"
errMsg := err.Error()
errMsg := friendlyExecError(err)
if c.isCallerCancellation(err) {
errCode = "57014"
errMsg = "canceling statement due to user request"
Expand Down Expand Up @@ -6622,7 +6715,7 @@ func (c *clientConn) handleFetchCursor(query string, stmt *pg_query.FetchStmt) e

if err := cursor.rows.Err(); err != nil {
errCode := "42000"
errMsg := err.Error()
errMsg := friendlyExecError(err)
if c.isCallerCancellation(err) {
errCode = "57014"
errMsg = "canceling statement due to user request"
Expand Down
Loading
Loading