diff --git a/c/driver_manager/adbc_driver_manager.cc b/c/driver_manager/adbc_driver_manager.cc index be20a9e3e0..9ac56c83d1 100644 --- a/c/driver_manager/adbc_driver_manager.cc +++ b/c/driver_manager/adbc_driver_manager.cc @@ -16,11 +16,19 @@ // under the License. #if defined(_WIN32) -#include // Must come first - +// These version macros gate which Win32 APIs the SDK headers declare. They MUST +// be set before is included -- once windows.h pulls in winnt.h, the +// internal API-availability macros are fixed and later #defines have no effect. +// In particular, SHGetKnownFolderPath (used below) requires _WIN32_WINNT >= 0x0600 +// (Vista). Without this, builds with toolchains that default _WIN32_WINNT below +// Vista (e.g. TDM-GCC 10.x) fail with "SHGetKnownFolderPath was not declared". +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0A00 // Windows 10 +#endif #ifndef NTDDI_VERSION #define NTDDI_VERSION 0x0A00000C // For SHGetKnownFolderPath in ShlObj_core.h in ShlObj.h #endif +#include // Must come first #include #include diff --git a/go/adbc/adbc.go b/go/adbc/adbc.go index d66e143a33..7e85352b82 100644 --- a/go/adbc/adbc.go +++ b/go/adbc/adbc.go @@ -264,6 +264,28 @@ const ( OptionKeyPassword = "password" // EXPERIMENTAL. Sets/Gets the trace parent on OpenTelemetry traces OptionKeyTelemetryTraceParent = "adbc.telemetry.trace_parent" + // EXPERIMENTAL. Selects the OpenTelemetry traces exporter when the + // driver initializes its tracer provider. Accepts the same values as + // the OpenTelemetry "OTEL_TRACES_EXPORTER" environment variable (see + // the OptionTelemetryExporter constants below: "none", "otlp", + // "console", "adbcfile"). When this option is set on a database it + // takes precedence over the OTEL_TRACES_EXPORTER environment + // variable, which lets operators select an exporter via the ADBC + // driver manager / TOML profile mechanism without having to mutate + // the host process's environment. When neither this option nor the + // environment variable is set, the driver falls back to the + // process-global OpenTelemetry tracer provider. + OptionKeyTelemetryTracesExporter = "adbc.telemetry.traces_exporter" + // EXPERIMENTAL. Selects the on-disk folder used by the "adbcfile" + // traces exporter. When the exporter is "adbcfile" and this option + // is set, rotated trace files are written to the supplied folder + // (which is created if it does not exist) instead of the default + // "/.adbc/traces" path. The option is ignored for + // other exporters; it exists so an operator can route trace files + // to a location their support workflow already collects (e.g. a + // shared diagnostics folder) via the ADBC driver-manager / TOML + // profile mechanism. + OptionKeyTelemetryTracesFolderPath = "adbc.telemetry.traces_folder_path" ) // EXPERIMENTAL. Traces Telemetry exporter option type diff --git a/go/adbc/driver/flightsql/flightsql_adbc_server_test.go b/go/adbc/driver/flightsql/flightsql_adbc_server_test.go index 6567c0008b..14b25f2eb4 100644 --- a/go/adbc/driver/flightsql/flightsql_adbc_server_test.go +++ b/go/adbc/driver/flightsql/flightsql_adbc_server_test.go @@ -729,7 +729,12 @@ func (ts *ErrorDetailsTests) TestBinaryDetails() { ts.Equal(int32(codes.FailedPrecondition), adbcErr.VendorCode) - ts.Equal(2, len(adbcErr.Details)) + // Expected detail set: + // - x-header-bin, x-trailer-bin: server-supplied details + // - grpc_code, grpc_message: canonical gRPC code/message added + // by adbcFromFlightStatusWithDetails for every Flight error + // - statement_id, connection_id: driver-attached correlation IDs + ts.Equal(6, len(adbcErr.Details)) headerFound := false trailerFound := false @@ -745,6 +750,14 @@ func (ts *ErrorDetailsTests) TestBinaryDetails() { ts.NoError(err) ts.Equal([]byte{111, 0, 112}, val) trailerFound = true + case "statement_id", "connection_id": + val, err := wrapper.Serialize() + ts.NoError(err) + ts.NotEmpty(val) + case "grpc_code", "grpc_message": + val, err := wrapper.Serialize() + ts.NoError(err) + ts.NotEmpty(val) default: ts.Failf("Unexpected detail key: %s", wrapper.Key()) } @@ -766,9 +779,21 @@ func (ts *ErrorDetailsTests) TestGetFlightInfo() { ts.Equal(int32(codes.Unknown), adbcErr.VendorCode) - ts.Equal(1, len(adbcErr.Details)) - - wrapper := adbcErr.Details[0] + // Expected detail set: + // - grpc-status-details-bin: one server-supplied protobuf detail + // - grpc_code, grpc_message: canonical gRPC code/message added + // by adbcFromFlightStatusWithDetails for every Flight error + // - statement_id, connection_id: driver-attached correlation IDs + ts.Equal(5, len(adbcErr.Details)) + + var wrapper adbc.ErrorDetail + for _, d := range adbcErr.Details { + if d.Key() == "grpc-status-details-bin" { + wrapper = d + break + } + } + ts.NotNil(wrapper, "grpc-status-details-bin detail not found") ts.Equal("grpc-status-details-bin", wrapper.Key()) raw, err := wrapper.Serialize() @@ -801,9 +826,21 @@ func (ts *ErrorDetailsTests) TestDoGet() { var adbcErr adbc.Error ts.ErrorAs(err, &adbcErr, "Error was: %#v", err) - ts.Equal(1, len(adbcErr.Details)) - - wrapper := adbcErr.Details[0] + // Expected detail set: + // - grpc-status-details-bin: one server-supplied protobuf detail + // - grpc_code, grpc_message: canonical gRPC code/message added + // by adbcFromFlightStatusWithDetails for every Flight error + // - statement_id, connection_id: driver-attached correlation IDs + ts.Equal(5, len(adbcErr.Details)) + + var wrapper adbc.ErrorDetail + for _, d := range adbcErr.Details { + if d.Key() == "grpc-status-details-bin" { + wrapper = d + break + } + } + ts.NotNil(wrapper, "grpc-status-details-bin detail not found") ts.Equal("grpc-status-details-bin", wrapper.Key()) raw, err := wrapper.Serialize() diff --git a/go/adbc/driver/flightsql/flightsql_bulk_ingest.go b/go/adbc/driver/flightsql/flightsql_bulk_ingest.go index 9ae22ba84d..7b12d35fdb 100644 --- a/go/adbc/driver/flightsql/flightsql_bulk_ingest.go +++ b/go/adbc/driver/flightsql/flightsql_bulk_ingest.go @@ -20,6 +20,8 @@ package flightsql import ( "context" "fmt" + "log/slog" + "time" "github.com/apache/arrow-adbc/go/adbc" "github.com/apache/arrow-go/v18/arrow" @@ -111,6 +113,26 @@ func (s *statement) executeIngest(ctx context.Context) (int64, error) { } } + startTime := time.Now() + catalogStr := "" + if s.catalog != nil { + catalogStr = *s.catalog + } + dbSchemaStr := "" + if s.dbSchema != nil { + dbSchemaStr = *s.dbSchema + } + startAttrs := []any{ + slog.String("target_table", s.targetTable), + slog.String("mode", s.ingestMode), + slog.String("catalog", catalogStr), + slog.String("db_schema", dbSchemaStr), + slog.Bool("temporary", s.temporary), + slog.Bool("streamBind", s.streamBind != nil), + slog.Bool("recordBound", s.bound != nil), + } + s.log.InfoContext(ctx, "FlightSQL ExecuteIngest start", startAttrs...) + opts := ingestOptions{ targetTable: s.targetTable, mode: s.ingestMode, @@ -129,6 +151,10 @@ func (s *statement) executeIngest(ctx context.Context) (int64, error) { } else { rdr, err = createRecordReaderFromBatch(s.bound) if err != nil { + s.log.WarnContext(ctx, "FlightSQL ExecuteIngest finished with error", + slog.Duration("duration", time.Since(startTime)), + "err", err, + ) return -1, err } } @@ -138,9 +164,21 @@ func (s *statement) executeIngest(ctx context.Context) (int64, error) { callOpts := append([]grpc.CallOption{}, grpc.Header(&header), grpc.Trailer(&trailer), s.timeouts) nRows, err := s.cnxn.cl.ExecuteIngest(ctx, rdr, ingestOpts, callOpts...) + finishAttrs := []any{ + slog.Duration("duration", time.Since(startTime)), + slog.Int64("rowsIngested", nRows), + } + finishAttrs = append(finishAttrs, correlationHeaderAttrs(header)...) + finishAttrs = append(finishAttrs, correlationHeaderAttrs(trailer)...) if err != nil { - return -1, adbcFromFlightStatusWithDetails(err, header, trailer, "ExecuteIngest") + wrapped := withOperationIDs( + adbcFromFlightStatusWithDetails(err, header, trailer, "ExecuteIngest"), + s.id, s.cnxn.id) + finishAttrs = append(finishAttrs, "err", wrapped) + s.log.WarnContext(ctx, "FlightSQL ExecuteIngest finished with error", finishAttrs...) + return -1, wrapped } + s.log.InfoContext(ctx, "FlightSQL ExecuteIngest finished", finishAttrs...) return nRows, nil } diff --git a/go/adbc/driver/flightsql/flightsql_connection.go b/go/adbc/driver/flightsql/flightsql_connection.go index 4f672d298c..c8c65f0adb 100644 --- a/go/adbc/driver/flightsql/flightsql_connection.go +++ b/go/adbc/driver/flightsql/flightsql_connection.go @@ -23,8 +23,10 @@ import ( "encoding/json" "fmt" "io" + "log/slog" "math" "strings" + "time" "github.com/apache/arrow-adbc/go/adbc" "github.com/apache/arrow-adbc/go/adbc/driver/internal" @@ -55,6 +57,14 @@ type connectionImpl struct { timeouts timeoutOption txn *flightsql.Txn supportInfo support + + // id is a short random identifier assigned at Open time. It is stamped + // onto every log record emitted by this connection (via Logger.With) + id string + + // wall-clock time at which Open() finished initializing + // this connection. + openedAt time.Time } type flightSqlMetadata struct { @@ -222,14 +232,33 @@ var adbcToFlightSQLInfo = map[adbc.InfoCode]flightsql.SqlInfo{ adbc.InfoVendorSubstraitMaxVersion: flightsql.SqlInfoFlightSqlServerSubstraitMaxVersion, } -func doGet(ctx context.Context, cl *flightsql.Client, endpoint *flight.FlightEndpoint, clientCache gcache.Cache, opts ...grpc.CallOption) (rdr *flight.Reader, err error) { +// doGetWithLogger performs DoGet against an endpoint's locations, logging each +// attempt and joining all per-location failures into the returned error so the +// caller can see every location that was tried. logger may be nil. +func doGetWithLogger(ctx context.Context, cl *flightsql.Client, endpoint *flight.FlightEndpoint, clientCache gcache.Cache, logger *slog.Logger, opts ...grpc.CallOption) (rdr *flight.Reader, err error) { + log := safeLogger(logger) if len(endpoint.Location) == 0 { - return cl.DoGet(ctx, endpoint.Ticket, opts...) + log.DebugContext(ctx, "FlightSQL doGet", + "phase", "noLocations", + ) + start := time.Now() + rdr, err = cl.DoGet(ctx, endpoint.Ticket, opts...) + log.DebugContext(ctx, "FlightSQL doGet", + "phase", "defaultClientResult", + "duration", time.Since(start), + "err", err, + ) + return rdr, err } var ( cc interface{} hasFallback bool + // attemptErrors collects every per-location attempt failure so that + // the final error returned to the caller can describe all the + // locations that were attempted (in order) rather than discarding + // information when fallback also fails. + attemptErrors []string ) for _, loc := range endpoint.Location { @@ -238,22 +267,62 @@ func doGet(ctx context.Context, cl *flightsql.Client, endpoint *flight.FlightEnd continue } + start := time.Now() cc, err = clientCache.Get(loc.Uri) if err != nil { + attemptErrors = append(attemptErrors, fmt.Sprintf("clientCache.Get(%q): %s", loc.Uri, err.Error())) + log.WarnContext(ctx, "FlightSQL doGet location attempt failed", + "phase", "clientCacheGet", + "location", loc.Uri, + "duration", time.Since(start), + "err", err, + ) continue } conn := cc.(*flightsql.Client) rdr, err = conn.DoGet(ctx, endpoint.Ticket, opts...) if err != nil { + attemptErrors = append(attemptErrors, fmt.Sprintf("DoGet(%q): %s", loc.Uri, err.Error())) + log.WarnContext(ctx, "FlightSQL doGet location attempt failed", + "phase", "doGet", + "location", loc.Uri, + "duration", time.Since(start), + "err", err, + ) continue } + log.DebugContext(ctx, "FlightSQL doGet succeeded", + "location", loc.Uri, + "duration", time.Since(start), + ) return } if hasFallback { - return cl.DoGet(ctx, endpoint.Ticket, opts...) + start := time.Now() + rdr, err = cl.DoGet(ctx, endpoint.Ticket, opts...) + if err != nil { + attemptErrors = append(attemptErrors, fmt.Sprintf("DoGet(fallback to default client): %s", err.Error())) + log.WarnContext(ctx, "FlightSQL doGet fallback to default client failed", + "duration", time.Since(start), + "err", err, + ) + // Wrap the error with the full attempt history so the diagnostic + // trail is preserved even if the caller does not log err.Error() + // itself (it may only end up surfaced through the C ADBC layer). + return nil, fmt.Errorf("all DoGet attempts failed: %s; final: %w", strings.Join(attemptErrors, "; "), err) + } + log.DebugContext(ctx, "FlightSQL doGet succeeded via default client fallback", + "duration", time.Since(start), + ) + return rdr, nil + } + + if err != nil && len(attemptErrors) > 1 { + err = fmt.Errorf("all %d DoGet location(s) failed: %s; final: %w", + len(attemptErrors), strings.Join(attemptErrors, "; "), err) } return nil, err @@ -642,7 +711,7 @@ func (c *connectionImpl) PrepareDriverInfo(ctx context.Context, infoCodes []adbc // No error, go get the SqlInfo from the server for i, endpoint := range info.Endpoint { var header, trailer metadata.MD - rdr, err := doGet(ctx, c.cl, endpoint, c.clientCache, grpc.Header(&header), grpc.Trailer(&trailer), c.timeouts) + rdr, err := doGetWithLogger(ctx, c.cl, endpoint, c.clientCache, c.Logger, grpc.Header(&header), grpc.Trailer(&trailer), c.timeouts) if err != nil { return adbcFromFlightStatusWithDetails(err, header, trailer, "GetInfo(DoGet): endpoint %d: %s", i, endpoint.Location) } @@ -701,7 +770,14 @@ func (c *connectionImpl) PrepareDriverInfo(ctx context.Context, infoCodes []adbc // Helper function to read and validate a metadata stream func (c *connectionImpl) readInfo(ctx context.Context, expectedSchema *arrow.Schema, info *flight.FlightInfo, opts ...grpc.CallOption) (array.RecordReader, error) { // use a default queueSize for the reader - rdr, err := newRecordReader(ctx, c.db.Alloc, c.cl, info, c.clientCache, 5, opts...) + rdr, err := newRecordReader(ctx, recordReaderConfig{ + alloc: c.db.Alloc, + cl: c.cl, + info: info, + clientCache: c.clientCache, + bufferSize: 5, + logger: c.Logger, + }, opts...) if err != nil { return nil, adbcFromFlightStatus(err, "DoGet") } @@ -898,7 +974,7 @@ func (c *connectionImpl) GetTableSchema(ctx context.Context, catalog *string, db header = metadata.MD{} trailer = metadata.MD{} - rdr, err := doGet(ctx, c.cl, info.Endpoint[0], c.clientCache, c.timeouts, grpc.Header(&header), grpc.Trailer(&trailer)) + rdr, err := doGetWithLogger(ctx, c.cl, info.Endpoint[0], c.clientCache, c.Logger, c.timeouts, grpc.Header(&header), grpc.Trailer(&trailer)) if err != nil { return nil, adbcFromFlightStatusWithDetails(err, header, trailer, "GetTableSchema(DoGet)") } @@ -968,7 +1044,14 @@ func (c *connectionImpl) GetTableTypes(ctx context.Context) (array.RecordReader, return nil, adbcFromFlightStatusWithDetails(err, header, trailer, "GetTableTypes") } - return newRecordReader(ctx, c.db.Alloc, c.cl, info, c.clientCache, 5) + return newRecordReader(ctx, recordReaderConfig{ + alloc: c.db.Alloc, + cl: c.cl, + info: info, + clientCache: c.clientCache, + bufferSize: 5, + logger: c.Logger, + }) } // Commit commits any pending transactions on this connection, it should @@ -1019,6 +1102,13 @@ func (c *connectionImpl) Rollback(ctx context.Context) error { // NewStatement initializes a new statement object tied to this connection func (c *connectionImpl) NewStatement() (adbc.Statement, error) { + id := newRandomID("stmt") + // Build a statement-scoped logger so every record emitted for this + // statement carries both connection_id (inherited from c.Logger via the + // With() called in databaseImpl.Open) and statement_id. The discard + // fallback in safeLogger keeps callers free of nil-checks if no logger + // is wired up by the host. + log := safeLogger(c.Logger).With("statement_id", id) return &statement{ alloc: c.db.Alloc, clientCache: c.clientCache, @@ -1026,6 +1116,8 @@ func (c *connectionImpl) NewStatement() (adbc.Statement, error) { queueSize: 5, timeouts: c.timeouts, cnxn: c, + id: id, + log: log, }, nil } @@ -1118,6 +1210,20 @@ func (c *connectionImpl) Close() error { } } + closeStart := time.Now() + // Snapshot fields needed for the post-close log line before we + // tear down c.cl. We deliberately log "starting" + "finished" + // separately rather than a single "closed" line because the + // underlying CloseSession can block on the server, and operators + // triaging a hung shutdown need to see when the close began. + logger := safeLogger(c.Logger) + connID := c.id + openedAt := c.openedAt + + logger.Info("FlightSQL connection closing", + "connection_id", connID, + ) + ctx := metadata.NewOutgoingContext(context.Background(), c.hdrs) var header, trailer metadata.MD _, err := c.cl.CloseSession(ctx, &flight.CloseSessionRequest{}, grpc.Header(&header), grpc.Trailer(&trailer), c.timeouts) @@ -1134,6 +1240,22 @@ func (c *connectionImpl) Close() error { c.clientCache.Purge() err = c.cl.Close() c.cl = nil + + args := []any{ + "connection_id", connID, + "close_duration", time.Since(closeStart), + } + if !openedAt.IsZero() { + args = append(args, "lifetime", time.Since(openedAt)) + } + if err != nil { + args = append(args, "err", err) + args = append(args, grpcStatusAttrs(err)...) + logger.Info("FlightSQL connection closed with error", args...) + } else { + logger.Info("FlightSQL connection closed", args...) + } + return adbcFromFlightStatus(err, "Close") } @@ -1159,7 +1281,7 @@ func (c *connectionImpl) ReadPartition(ctx context.Context, serializedPartition } ctx = metadata.NewOutgoingContext(ctx, c.hdrs) - rdr, err = doGet(ctx, c.cl, info.Endpoint[0], c.clientCache, c.timeouts) + rdr, err = doGetWithLogger(ctx, c.cl, info.Endpoint[0], c.clientCache, c.Logger, c.timeouts) if err != nil { return nil, adbcFromFlightStatus(err, "ReadPartition(DoGet)") } diff --git a/go/adbc/driver/flightsql/flightsql_database.go b/go/adbc/driver/flightsql/flightsql_database.go index 316ad778be..a38832fa7e 100644 --- a/go/adbc/driver/flightsql/flightsql_database.go +++ b/go/adbc/driver/flightsql/flightsql_database.go @@ -22,6 +22,7 @@ import ( "crypto/tls" "crypto/x509" "fmt" + "log/slog" "net/url" "strconv" "strings" @@ -367,6 +368,15 @@ func (d *databaseImpl) SetOptionDouble(key string, value float64) error { } func (d *databaseImpl) Close() error { + // Emit a structured log line on close so that connection / database + // lifetime events can be reconstructed end-to-end from the driver's + // log stream alone. This is a no-op release otherwise; resources + // owned by individual connections are released by connectionImpl.Close. + if d.Logger != nil { + d.Logger.Info("FlightSQL database closed", + "target", d.uri.String(), + ) + } return nil } @@ -430,15 +440,47 @@ func getFlightClient(ctx context.Context, loc string, d *databaseImpl, authMiddl var authValue string if d.user != "" || d.pass != "" { + // Emit start/finish logs around basic auth so an operator can + // distinguish "the driver never tried to authenticate" from + // "authentication started but the server never responded" — the + // two have very different root causes (option not threaded vs + // network/TLS/server fault). + authStart := time.Now() + d.Logger.InfoContext(ctx, "FlightSQL basic auth started", + "target", loc, + "user", d.user, + ) var header, trailer metadata.MD ctx, err = cl.Client.AuthenticateBasicToken(ctx, d.user, d.pass, grpc.Header(&header), grpc.Trailer(&trailer), d.timeout) if err != nil { + args := []any{ + "target", loc, + "user", d.user, + "duration", time.Since(authStart), + "err", err, + } + args = append(args, correlationHeaderAttrs(header)...) + args = append(args, correlationHeaderAttrs(trailer)...) + args = append(args, grpcStatusAttrs(err)...) + d.Logger.InfoContext(ctx, "FlightSQL basic auth failed", args...) return nil, adbcFromFlightStatusWithDetails(err, header, trailer, "AuthenticateBasicToken") } if md, ok := metadata.FromOutgoingContext(ctx); ok { authValue = md.Get("Authorization")[0] } + + // Log only the fact + token length, never the token value. The + // length is useful for quick sanity checks (e.g. a zero-length + // token would indicate the server returned an empty + // Authorization header even though AuthenticateBasicToken + // returned no error). + d.Logger.InfoContext(ctx, "FlightSQL basic auth succeeded", + "target", loc, + "user", d.user, + "duration", time.Since(authStart), + "token_length", len(authValue), + ) } if authValue != "" { @@ -453,7 +495,7 @@ type support struct { } func (d *databaseImpl) Open(ctx context.Context) (adbc.Connection, error) { - authMiddle := &bearerAuthMiddleware{hdrs: d.hdrs.Copy()} + authMiddle := &bearerAuthMiddleware{hdrs: d.hdrs.Copy(), logger: safeLogger(d.Logger)} var cookies flight.CookieMiddleware if d.enableCookies { cookies = flight.NewCookieMiddleware() @@ -480,7 +522,7 @@ func (d *databaseImpl) Open(ctx context.Context) (adbc.Connection, error) { } // use the existing auth token if there is one cl, err := getFlightClient(context.Background(), uri, d, - &bearerAuthMiddleware{hdrs: authMiddle.hdrs.Copy()}, cookieMiddleware) + &bearerAuthMiddleware{hdrs: authMiddle.hdrs.Copy(), logger: safeLogger(d.Logger)}, cookieMiddleware) if err != nil { return nil, err } @@ -510,7 +552,7 @@ func (d *databaseImpl) Open(ctx context.Context) (adbc.Connection, error) { const int32code = 3 for _, endpoint := range info.Endpoint { - rdr, err := doGet(ctx, cl, endpoint, cache, d.timeout) + rdr, err := doGetWithLogger(ctx, cl, endpoint, cache, d.Logger, d.timeout) if err != nil { continue } @@ -549,6 +591,20 @@ func (d *databaseImpl) Open(ctx context.Context) (adbc.Connection, error) { hdrs: make(metadata.MD), timeouts: d.timeout, supportInfo: cnxnSupport, ConnectionImplBase: driverbase.NewConnectionImplBase(&d.DatabaseImplBase), } + // Mint a stable per-connection identifier and wrap the inherited + // logger so that every subsequent log line (and every statement + // created through this connection) carries connection_id. This is the + // primary correlation hook between FlightSQL driver logs and any host + // application (such as Power Query / Mashup) that records its own + // per-connection identifier. + conn.id = newRandomID("conn") + conn.openedAt = time.Now() + conn.Logger = safeLogger(conn.Logger).With("connection_id", conn.id) + conn.Logger.InfoContext(ctx, "FlightSQL connection opened", + "target", d.uri.String(), + "transactionsSupported", cnxnSupport.transactions, + "driver", infoDriverName, + ) return driverbase.NewConnectionBuilder(conn). WithDriverInfoPreparer(conn). @@ -560,6 +616,14 @@ func (d *databaseImpl) Open(ctx context.Context) (adbc.Connection, error) { type bearerAuthMiddleware struct { mutex sync.RWMutex hdrs metadata.MD + // logger, when non-nil, receives an Info-level event every time the + // bearer token is rotated (either because the server returned a new + // Authorization header on a response, or because the driver itself + // explicitly set one via SetHeader). The token value is never + // logged; only the fact of the rotation plus the new token length + // is recorded so an operator can confirm that token-refresh logic + // is firing without exposing credential material in the log stream. + logger *slog.Logger } func (b *bearerAuthMiddleware) StartCall(ctx context.Context) context.Context { @@ -569,18 +633,59 @@ func (b *bearerAuthMiddleware) StartCall(ctx context.Context) context.Context { return metadata.NewOutgoingContext(ctx, metadata.Join(md, b.hdrs)) } +// rotateAuth atomically replaces the stored Authorization metadata and +// returns the previous value plus a snapshot of the logger pointer. +// Splitting this out lets HeadersReceived and SetHeader manage the +// mutex with defer-style discipline while still performing the logger +// call outside of the critical section: log handlers are user-supplied +// and may be slow, so holding b.mutex across them would penalize every +// concurrent StartCall reader. +func (b *bearerAuthMiddleware) rotateAuth(headers ...string) (previous []string, logger *slog.Logger) { + b.mutex.Lock() + defer b.mutex.Unlock() + previous = b.hdrs.Get("authorization") + b.hdrs.Set("authorization", headers...) + return previous, b.logger +} + func (b *bearerAuthMiddleware) HeadersReceived(ctx context.Context, md metadata.MD) { // apache/arrow-adbc#584 headers := md.Get("authorization") - if len(headers) > 0 { - b.mutex.Lock() - defer b.mutex.Unlock() - b.hdrs.Set("authorization", headers...) - } + if len(headers) == 0 { + return + } + previous, logger := b.rotateAuth(headers...) + if logger == nil { + return + } + // Compare lengths rather than values so that we never touch the + // token contents in the log path. Equal lengths can still indicate + // a fresh token (a server might issue tokens of the same shape), + // but for the no-op case (server echoed the same header) the + // reflected length is what an operator wants to see anyway. + var prevLen int + if len(previous) > 0 { + prevLen = len(previous[0]) + } + logger.InfoContext(ctx, "FlightSQL bearer token rotated by server", + "previous_token_length", prevLen, + "new_token_length", len(headers[0]), + "source", "HeadersReceived", + ) } func (b *bearerAuthMiddleware) SetHeader(authValue string) { - b.mutex.Lock() - defer b.mutex.Unlock() - b.hdrs.Set("authorization", authValue) + previous, logger := b.rotateAuth(authValue) + if logger == nil { + return + } + var prevLen int + if len(previous) > 0 { + prevLen = len(previous[0]) + } + logger.Info("FlightSQL bearer token rotated by client", + "previous_token_length", prevLen, + "new_token_length", len(authValue), + "source", "SetHeader", + ) } diff --git a/go/adbc/driver/flightsql/flightsql_driver.go b/go/adbc/driver/flightsql/flightsql_driver.go index 616a1ab09d..4747c38584 100644 --- a/go/adbc/driver/flightsql/flightsql_driver.go +++ b/go/adbc/driver/flightsql/flightsql_driver.go @@ -66,7 +66,17 @@ const ( OptionBoolSessionOptionPrefix = "adbc.flight.sql.session.optionbool." OptionStringListSessionOptionPrefix = "adbc.flight.sql.session.optionstringlist." OptionLastFlightInfo = "adbc.flight.sql.statement.exec.last_flight_info" - infoDriverName = "ADBC Flight SQL Driver - Go" + // OptionStatementLogQueryPreview controls whether the SQL query (or + // Substrait plan) text is included in log records emitted for a + // statement. The value is the maximum number of UTF-8 bytes of the + // query to include in the "query_preview" log attribute. Set to "0" + // (or leave unset) to disable preview logging entirely; in that case + // only a SHA-256 fingerprint and length are recorded. This is opt-in + // because SQL text frequently embeds user-supplied literal values + // (account numbers, credentials, etc.) that should not be persisted + // into shared log streams without explicit operator consent. + OptionStatementLogQueryPreview = "adbc.flight.sql.statement.log.query_preview" + infoDriverName = "ADBC Flight SQL Driver - Go" // Oauth2 options OptionKeyOauthFlow = "adbc.flight.sql.oauth.flow" @@ -131,7 +141,30 @@ func (d *driverImpl) NewDatabaseWithOptionsContext(ctx context.Context, opts map } delete(opts, adbc.OptionKeyURI) - dbBase, err := driverbase.NewDatabaseImplBase(ctx, &d.DriverImplBase) + // The OTEL traces-exporter option must be consumed before + // driverbase.NewDatabaseImplBase runs, because that constructor is + // what stands up the per-database OpenTelemetry tracer. Pulling it + // out of the opts map here lets the driver-manager / TOML profile + // path override the OTEL_TRACES_EXPORTER environment variable + // without any host-process mutation, while empty or absent values + // fall through to the environment-variable behavior preserved by + // InitTracingWithExporter. + tracesExporter := opts[adbc.OptionKeyTelemetryTracesExporter] + delete(opts, adbc.OptionKeyTelemetryTracesExporter) + + // The traces folder path is only meaningful for the "adbcfile" + // exporter, but the option has to be extracted here for the same + // reason as the exporter itself: the RotatingFileWriter is created + // inside driverbase.NewDatabaseImplBase*, so post-construction + // SetOption calls cannot retarget the on-disk location of an + // already-running writer. + tracesFolderPath := opts[adbc.OptionKeyTelemetryTracesFolderPath] + delete(opts, adbc.OptionKeyTelemetryTracesFolderPath) + + dbBase, err := driverbase.NewDatabaseImplBaseWithOptions(ctx, &d.DriverImplBase, driverbase.TracingOptions{ + ExporterName: tracesExporter, + TracingFolderPath: tracesFolderPath, + }) if err != nil { return nil, err } diff --git a/go/adbc/driver/flightsql/flightsql_statement.go b/go/adbc/driver/flightsql/flightsql_statement.go index eb034ec66f..5221a20ebb 100644 --- a/go/adbc/driver/flightsql/flightsql_statement.go +++ b/go/adbc/driver/flightsql/flightsql_statement.go @@ -20,6 +20,7 @@ package flightsql import ( "context" "fmt" + "log/slog" "math" "strconv" "strings" @@ -179,6 +180,24 @@ type statement struct { // Bound data for bulk ingest bound arrow.RecordBatch streamBind array.RecordReader + + // id is a short random identifier assigned when the statement is + // created. It is included as an attribute on every log record emitted + // for the statement and as a "statement_id" ADBC error detail on every + // error surfaced from the statement, so that a single failing + // operation can be traced from host-application logs through driver + // logs to the server. + id string + // log is a slog.Logger pre-decorated with connection_id and + // statement_id so callers can emit structured records without having + // to remember to attach those identifiers. Always non-nil. + log *slog.Logger + // logQueryPreview is the maximum number of UTF-8 bytes of the SQL + // query (or Substrait plan) to include in log records as a + // "query_preview" attribute. Zero (the default) disables preview + // logging entirely; only a SHA-256 fingerprint and length are + // recorded. Configurable via OptionStatementLogQueryPreview. + logQueryPreview int } func (s *statement) closePreparedStatement() error { @@ -251,6 +270,8 @@ func (s *statement) GetOption(key string) (string, error) { return s.timeouts.queryTimeout.String(), nil case OptionTimeoutUpdate: return s.timeouts.updateTimeout.String(), nil + case OptionStatementLogQueryPreview: + return strconv.Itoa(s.logQueryPreview), nil case adbc.OptionKeyIncremental: if s.incrementalState != nil { return adbc.OptionValueEnabled, nil @@ -304,6 +325,8 @@ func (s *statement) GetOptionInt(key string) (int64, error) { return 0, err } return int64(val), nil + case OptionStatementLogQueryPreview: + return int64(s.logQueryPreview), nil } return 0, adbc.Error{ @@ -360,6 +383,16 @@ func (s *statement) SetOption(key string, val string) error { } } return s.SetOptionInt(key, int64(size)) + case OptionStatementLogQueryPreview: + n, err := strconv.Atoi(val) + if err != nil || n < 0 { + return adbc.Error{ + Msg: fmt.Sprintf("[Flight SQL] Invalid value for statement option '%s': '%s' is not a non-negative integer", OptionStatementLogQueryPreview, val), + Code: adbc.StatusInvalidArgument, + } + } + s.logQueryPreview = n + return nil case OptionStatementSubstraitVersion: s.query.substraitVersion = val case adbc.OptionKeyIncremental: @@ -454,6 +487,15 @@ func (s *statement) SetOptionInt(key string, value int64) error { } s.queueSize = int(value) return nil + case OptionStatementLogQueryPreview: + if value < 0 { + return adbc.Error{ + Msg: fmt.Sprintf("[Flight SQL] Invalid value for statement option '%s': '%d' is not a non-negative integer", OptionStatementLogQueryPreview, value), + Code: adbc.StatusInvalidArgument, + } + } + s.logQueryPreview = int(value) + return nil } return s.SetOptionDouble(key, float64(value)) } @@ -490,9 +532,29 @@ func (s *statement) SetSqlQuery(query string) error { } s.targetTable = "" s.query.setSqlQuery(query) + if s.log != nil { + s.log.Debug("FlightSQL SetSqlQuery", s.queryAttrs()...) + } return nil } +// queryAttrs returns slog attributes that fingerprint the SQL or Substrait +// payload currently set on this statement, honoring the +// OptionStatementLogQueryPreview opt-in for inclusion of the SQL text. +// Always safe to log because no parameter values are surfaced. +func (s *statement) queryAttrs() []any { + if s.query.sqlQuery != "" { + return queryFingerprintAttrs(s.query.sqlQuery, s.logQueryPreview) + } + if s.query.substraitPlan != nil { + return substraitFingerprintAttrs(s.query.substraitPlan, s.query.substraitVersion) + } + if s.targetTable != "" { + return []any{slog.String("query_type", "ingest"), slog.String("target_table", s.targetTable)} + } + return []any{slog.String("query_type", "none")} +} + // ExecuteQuery executes the current query or prepared statement // and returns a RecordReader for the results along with the number // of rows affected if known, otherwise it will be -1. @@ -509,6 +571,18 @@ func (s *statement) ExecuteQuery(ctx context.Context) (rdr array.RecordReader, n return nil, nrec, err } + // Emit a structured "start" record so that operators can pair this + // driver-side event with the corresponding host-application (Mashup / + // Power Query) log entry by statement_id and query fingerprint, even + // when the driver fails before any data is returned. The matching + // "finished" record is emitted via the deferred function below. + startTime := time.Now() + startAttrs := append([]any{ + slog.Bool("prepared", s.prepared != nil), + slog.Bool("hasTxn", s.cnxn.txn != nil), + }, s.queryAttrs()...) + s.log.InfoContext(ctx, "FlightSQL ExecuteQuery start", startAttrs...) + ctx = metadata.NewOutgoingContext(ctx, s.hdrs) var info *flight.FlightInfo var header, trailer metadata.MD @@ -519,12 +593,42 @@ func (s *statement) ExecuteQuery(ctx context.Context) (rdr array.RecordReader, n info, err = s.query.execute(ctx, s.cnxn, opts...) } + defer func() { + finishAttrs := []any{ + slog.Duration("duration", time.Since(startTime)), + slog.String("phase", "GetFlightInfo"), + } + if info != nil { + finishAttrs = append(finishAttrs, flightInfoLogAttrs(info)...) + } + finishAttrs = append(finishAttrs, correlationHeaderAttrs(header)...) + finishAttrs = append(finishAttrs, correlationHeaderAttrs(trailer)...) + if err != nil { + finishAttrs = append(finishAttrs, "err", err) + s.log.WarnContext(ctx, "FlightSQL ExecuteQuery finished with error", finishAttrs...) + } else { + s.log.InfoContext(ctx, "FlightSQL ExecuteQuery finished", finishAttrs...) + } + }() + if err != nil { - return nil, -1, adbcFromFlightStatusWithDetails(err, header, trailer, "ExecuteQuery") + return nil, -1, withOperationIDs( + adbcFromFlightStatusWithDetails(err, header, trailer, "ExecuteQuery"), + s.id, s.cnxn.id) } nrec = info.TotalRecords - rdr, err = newRecordReader(ctx, s.alloc, s.cnxn.cl, info, s.clientCache, s.queueSize, s.timeouts) + rdr, err = newRecordReader(withOperationIDsCtx(ctx, s.id, s.cnxn.id), recordReaderConfig{ + alloc: s.alloc, + cl: s.cnxn.cl, + info: info, + clientCache: s.clientCache, + bufferSize: s.queueSize, + logger: s.log, + }, s.timeouts) + if err != nil { + err = withOperationIDs(err, s.id, s.cnxn.id) + } return } @@ -540,6 +644,13 @@ func (s *statement) ExecuteUpdate(ctx context.Context) (n int64, err error) { return s.executeIngest(ctx) } + startTime := time.Now() + startAttrs := append([]any{ + slog.Bool("prepared", s.prepared != nil), + slog.Bool("hasTxn", s.cnxn.txn != nil), + }, s.queryAttrs()...) + s.log.InfoContext(ctx, "FlightSQL ExecuteUpdate start", startAttrs...) + ctx = metadata.NewOutgoingContext(ctx, s.hdrs) var header, trailer metadata.MD opts := append([]grpc.CallOption{}, grpc.Header(&header), grpc.Trailer(&trailer), s.timeouts) @@ -549,8 +660,25 @@ func (s *statement) ExecuteUpdate(ctx context.Context) (n int64, err error) { n, err = s.query.executeUpdate(ctx, s.cnxn, opts...) } + defer func() { + finishAttrs := []any{ + slog.Duration("duration", time.Since(startTime)), + slog.Int64("rowsAffected", n), + } + finishAttrs = append(finishAttrs, correlationHeaderAttrs(header)...) + finishAttrs = append(finishAttrs, correlationHeaderAttrs(trailer)...) + if err != nil { + finishAttrs = append(finishAttrs, "err", err) + s.log.WarnContext(ctx, "FlightSQL ExecuteUpdate finished with error", finishAttrs...) + } else { + s.log.InfoContext(ctx, "FlightSQL ExecuteUpdate finished", finishAttrs...) + } + }() + if err != nil { - err = adbcFromFlightStatusWithDetails(err, header, trailer, "ExecuteQuery") + err = withOperationIDs( + adbcFromFlightStatusWithDetails(err, header, trailer, "ExecuteQuery"), + s.id, s.cnxn.id) } return @@ -559,11 +687,29 @@ func (s *statement) ExecuteUpdate(ctx context.Context) (n int64, err error) { // Prepare turns this statement into a prepared statement to be executed // multiple times. This invalidates any prior result sets. func (s *statement) Prepare(ctx context.Context) error { + startTime := time.Now() + s.log.InfoContext(ctx, "FlightSQL Prepare start", s.queryAttrs()...) + ctx = metadata.NewOutgoingContext(ctx, s.hdrs) var header, trailer metadata.MD prep, err := s.query.prepare(ctx, s.cnxn, grpc.Header(&header), grpc.Trailer(&trailer), s.timeouts) + + defer func() { + finishAttrs := []any{slog.Duration("duration", time.Since(startTime))} + finishAttrs = append(finishAttrs, correlationHeaderAttrs(header)...) + finishAttrs = append(finishAttrs, correlationHeaderAttrs(trailer)...) + if err != nil { + finishAttrs = append(finishAttrs, "err", err) + s.log.WarnContext(ctx, "FlightSQL Prepare finished with error", finishAttrs...) + } else { + s.log.InfoContext(ctx, "FlightSQL Prepare finished", finishAttrs...) + } + }() + if err != nil { - return adbcFromFlightStatusWithDetails(err, header, trailer, "Prepare") + return withOperationIDs( + adbcFromFlightStatusWithDetails(err, header, trailer, "Prepare"), + s.id, s.cnxn.id) } s.prepared = prep return nil diff --git a/go/adbc/driver/flightsql/logging.go b/go/adbc/driver/flightsql/logging.go index 4fb12c4112..7397b267fc 100644 --- a/go/adbc/driver/flightsql/logging.go +++ b/go/adbc/driver/flightsql/logging.go @@ -19,16 +19,161 @@ package flightsql import ( "context" + "crypto/rand" + "crypto/sha256" + "encoding/hex" "io" "log/slog" + "strconv" + "strings" "time" + "github.com/apache/arrow-adbc/go/adbc" + "github.com/apache/arrow-go/v18/arrow/flight" + "go.opentelemetry.io/otel/trace" "golang.org/x/exp/maps" "golang.org/x/exp/slices" "google.golang.org/grpc" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" ) +// safeLogger returns a non-nil *slog.Logger. If the provided logger is nil +// a discard logger is returned so that callers can always safely log without +// nil-checks. This is important because the streaming code paths can be +// reached by callers (such as tests) that do not have a Logger initialized. +// +// The returned logger is always wrapped with otelTraceHandler so that any +// record emitted through it is automatically stamped with "trace_id" and +// "span_id" attributes when its context carries an active OpenTelemetry +// span. The wrap is idempotent, so wrapping an already-wrapped logger is +// a no-op (see withOtelTraceContext). +func safeLogger(logger *slog.Logger) *slog.Logger { + if logger == nil { + logger = slog.New(slog.NewTextHandler(io.Discard, nil)) + } + return withOtelTraceContext(logger) +} + +// maxLoggedTicketBytes limits how many bytes of a Flight ticket are emitted in +// log records. Tickets can be opaque server-defined blobs of arbitrary size +// (sometimes embedding large query plans), so we cap how much we include +// to keep log records reasonably sized while still being useful for +// correlation against server-side logs. +const maxLoggedTicketBytes = 32 + +// endpointLogAttrs builds a slice of slog.Attr describing a Flight endpoint. +// These attributes are intended to be attached to every log record emitted on +// behalf of a per-endpoint stream so an operator can correlate a failure with +// the specific endpoint (URI, ticket prefix, etc.) that produced it. This is +// particularly important when diagnosing errors like +// "[FlightSQL] error reading from server: EOF (Unavailable; DoGet: endpoint N: [])" +// where the empty "[]" indicates the FlightInfo's Location list was empty and +// the default client connection was used as a fallback. +func endpointLogAttrs(endpointIndex, numEndpoints int, endpoint *flight.FlightEndpoint) []any { + attrs := []any{ + slog.Int("endpointIndex", endpointIndex), + slog.Int("numEndpoints", numEndpoints), + } + if endpoint == nil { + return attrs + } + if endpoint.Ticket != nil { + ticket := endpoint.Ticket.Ticket + attrs = append(attrs, slog.Int("ticketBytes", len(ticket))) + if len(ticket) > 0 { + limit := len(ticket) + if limit > maxLoggedTicketBytes { + limit = maxLoggedTicketBytes + } + attrs = append(attrs, slog.String("ticketPrefixHex", hex.EncodeToString(ticket[:limit]))) + } + } + if len(endpoint.Location) == 0 { + attrs = append(attrs, slog.String("locations", "")) + } else { + uris := make([]string, 0, len(endpoint.Location)) + for _, loc := range endpoint.Location { + uris = append(uris, loc.Uri) + } + attrs = append(attrs, slog.Any("locations", uris)) + } + if endpoint.ExpirationTime != nil { + attrs = append(attrs, slog.Time("expirationTime", endpoint.ExpirationTime.AsTime())) + } + return attrs +} + +// streamProgress tracks per-endpoint streaming statistics so that informative +// log records and error messages can be emitted when a stream ends (either +// successfully or with an error such as a mid-stream EOF). It is safe to be +// used by a single goroutine that owns one endpoint's stream. +type streamProgress struct { + start time.Time + firstBatchAt time.Time + lastBatchAt time.Time + batchesRead int64 + recordsRead int64 + bytesEstimate int64 +} + +func newStreamProgress() *streamProgress { + return &streamProgress{start: time.Now()} +} + +// recordBatch updates the progress tracker after a single Arrow record batch +// has been successfully received from the server. +func (p *streamProgress) recordBatch(rows int64, bytes int64) { + now := time.Now() + if p.batchesRead == 0 { + p.firstBatchAt = now + } + p.lastBatchAt = now + p.batchesRead++ + p.recordsRead += rows + p.bytesEstimate += bytes +} + +// logAttrs returns slog attributes summarizing the progress of this stream. +// These attributes are intended to be appended to per-endpoint log records or +// embedded into error messages produced when a stream ends unexpectedly. +func (p *streamProgress) logAttrs() []any { + attrs := []any{ + slog.Int64("batchesRead", p.batchesRead), + slog.Int64("recordsRead", p.recordsRead), + slog.Int64("approxBytesRead", p.bytesEstimate), + slog.Duration("elapsed", time.Since(p.start)), + } + if !p.firstBatchAt.IsZero() { + attrs = append(attrs, slog.Duration("timeToFirstBatch", p.firstBatchAt.Sub(p.start))) + } else { + attrs = append(attrs, slog.String("timeToFirstBatch", "never")) + } + if !p.lastBatchAt.IsZero() { + attrs = append(attrs, slog.Duration("timeSinceLastBatch", time.Since(p.lastBatchAt))) + } + return attrs +} + +// summary returns a compact human-readable summary of the stream's progress +// that can be embedded into wrapped error messages so the diagnostic +// information survives when only the error string is preserved (for example +// when the error is exported through a C-language client). +func (p *streamProgress) summary() string { + if p.batchesRead == 0 { + return "no batches received before failure; elapsed=" + time.Since(p.start).String() + } + return "received " + formatInt(p.batchesRead) + " batch(es), " + + formatInt(p.recordsRead) + " row(s) before failure; elapsed=" + time.Since(p.start).String() + + "; timeSinceLastBatch=" + time.Since(p.lastBatchAt).String() +} + +// formatInt formats an int64 without pulling in the heavier fmt machinery for +// what would otherwise be a hot, allocation-sensitive path in error messages. +func formatInt(n int64) string { + return strconv.FormatInt(n, 10) +} + func makeUnaryLoggingInterceptor(logger *slog.Logger) grpc.UnaryClientInterceptor { interceptor := func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { start := time.Now() @@ -36,11 +181,20 @@ func makeUnaryLoggingInterceptor(logger *slog.Logger) grpc.UnaryClientIntercepto outgoing, _ := metadata.FromOutgoingContext(ctx) err := invoker(ctx, method, req, reply, cc, opts...) if logger.Enabled(ctx, slog.LevelDebug) { - logger.DebugContext(ctx, method, "target", cc.Target(), "duration", time.Since(start), "err", err, "metadata", outgoing) + args := []any{"target", cc.Target(), "duration", time.Since(start), "err", err, "metadata", outgoing} + args = append(args, outgoingCallHeaderAttrs(ctx)...) + args = append(args, grpcStatusAttrs(err)...) + logger.DebugContext(ctx, method, args...) } else { keys := maps.Keys(outgoing) slices.Sort(keys) - logger.InfoContext(ctx, method, "target", cc.Target(), "duration", time.Since(start), "err", err, "metadata", keys) + args := []any{"target", cc.Target(), "duration", time.Since(start), "err", err, "metadata", keys} + // Surface curated outbound correlation IDs (e.g. PBI + // ActivityId, x-ms-client-request-id) regardless of log + // level so they are always available for cross-log joins. + args = append(args, outgoingCallHeaderAttrs(ctx)...) + args = append(args, grpcStatusAttrs(err)...) + logger.InfoContext(ctx, method, args...) } return err } @@ -54,7 +208,10 @@ func makeStreamLoggingInterceptor(logger *slog.Logger) grpc.StreamClientIntercep outgoing, _ := metadata.FromOutgoingContext(ctx) stream, err := streamer(ctx, desc, cc, method, opts...) if err != nil { - logger.InfoContext(ctx, method, "target", cc.Target(), "duration", time.Since(start), "err", err) + args := []any{"target", cc.Target(), "duration", time.Since(start), "err", err} + args = append(args, outgoingCallHeaderAttrs(ctx)...) + args = append(args, grpcStatusAttrs(err)...) + logger.InfoContext(ctx, method, args...) return stream, err } @@ -72,23 +229,478 @@ type loggedStream struct { start time.Time target string outgoing metadata.MD + + // recvCount tracks how many messages were successfully received from the + // server before the stream ended. This is logged on every termination + // (success or failure) so an operator can tell whether a stream that + // failed with "EOF/Unavailable" never received any data or died mid-way + // through a large result set. + recvCount int64 } func (stream *loggedStream) RecvMsg(m any) error { err := stream.ClientStream.RecvMsg(m) - if err != nil { - loggedErr := err - if loggedErr == io.EOF { - loggedErr = nil - } + if err == nil { + stream.recvCount++ + return nil + } - if stream.logger.Enabled(stream.ctx, slog.LevelDebug) { - stream.logger.DebugContext(stream.ctx, stream.method, "target", stream.target, "duration", time.Since(stream.start), "err", loggedErr, "metadata", stream.outgoing) - } else { - keys := maps.Keys(stream.outgoing) - slices.Sort(keys) - stream.logger.InfoContext(stream.ctx, stream.method, "target", stream.target, "duration", time.Since(stream.start), "err", loggedErr, "metadata", keys) + loggedErr := err + if loggedErr == io.EOF { + loggedErr = nil + } + + // Attempt to capture trailer metadata from the underlying stream. Trailers + // are only valid once the stream has terminated, which is the case here + // because RecvMsg returned a non-nil error. Trailers frequently carry + // server-side diagnostic information (e.g., grpc-message, custom error + // detail headers) that is invaluable when triaging "[FlightSQL] error + // reading from server: EOF (Unavailable; ...)" reports. + trailer := stream.Trailer() + + if stream.logger.Enabled(stream.ctx, slog.LevelDebug) { + stream.logger.DebugContext(stream.ctx, stream.method, + "target", stream.target, + "duration", time.Since(stream.start), + "err", loggedErr, + "recvMessages", stream.recvCount, + "metadata", stream.outgoing, + "trailer", trailer, + ) + } else { + keys := maps.Keys(stream.outgoing) + slices.Sort(keys) + trailerKeys := maps.Keys(trailer) + slices.Sort(trailerKeys) + args := []any{ + "target", stream.target, + "duration", time.Since(stream.start), + "err", loggedErr, + "recvMessages", stream.recvCount, + "metadata", keys, + "trailer", trailerKeys, + } + // Promote curated correlation/tracing header values from the trailer + // to first-class log attributes so an operator can cross-reference + // this failure with the corresponding server-side trace without + // enabling Debug-level logging. + args = append(args, correlationHeaderAttrs(trailer)...) + // Also promote the outbound correlation IDs that the caller + // supplied (PBI ActivityId, x-ms-client-request-id, ...) so a + // single log line carries both sides of the join. + args = append(args, outgoingCallHeaderAttrs(stream.ctx)...) + // Emit gRPC code/message as separate structured fields. EOF + // is treated as a clean close by Flight so loggedErr was + // nil-ed out above; only attach status attrs for real errors. + if loggedErr != nil { + args = append(args, grpcStatusAttrs(loggedErr)...) } + stream.logger.InfoContext(stream.ctx, stream.method, args...) } return err } + +// wellKnownCorrelationHeaders is the curated allow-list of inbound gRPC +// header/trailer keys that are surfaced verbatim into log records. These +// headers are commonly emitted by FlightSQL servers, gateways, and tracing +// frameworks to allow operators to cross-reference a client-side log entry +// against the corresponding server-side trace or query history record. +// +// The allow-list also intentionally covers the Microsoft / Power BI / Power +// Query family of correlation identifiers ("ActivityId", +// "x-ms-client-request-id", etc.). The driver is frequently invoked from +// Power BI Desktop / Mashup, which records every step under a per-step +// ActivityId GUID; capturing that value on the ADBC side is the single +// most useful join column when triaging an issue against a Power BI +// diagnostic trace bundle. +var wellKnownCorrelationHeaders = []string{ + "x-request-id", + "x-correlation-id", + "x-trace-id", + "x-amzn-trace-id", + "x-b3-traceid", + "x-b3-spanid", + "traceparent", + "tracestate", + "x-arrow-flight-session-id", + "x-dremio-request-id", + "x-dremio-query-id", + "x-server-version", + "server", + // Microsoft / Power BI / Power Query family. The exact casing varies + // by host (PBI Desktop emits "ActivityId" as a property in its trace + // file but transmits it as a gRPC header that gRPC's metadata + // package normalizes to lower case), so the allow-list uses the + // canonical lower-case form throughout. Both the unprefixed and + // "x-ms-" prefixed variants are listed because the prefix-less form + // is what Mashup's own diagnostics record. + "activityid", + "activity-id", + "x-ms-activity-id", + "x-ms-client-request-id", + "x-ms-request-id", + "requestid", + "x-pbi-activity-id", +} + +// sensitiveHeaderTokens lists case-insensitive substrings whose presence +// in a header name marks the header as carrying credential material that +// must never be surfaced in driver logs. The list is consulted by +// isSensitiveHeader, which in turn gates correlationHeaderAttrs's +// allow-list and suffix-match paths. The substrings are deliberately +// coarse — "token" rather than a specific header name — because the +// cost of a false positive (a useful correlation header is skipped) is +// much lower than the cost of a false negative (a bearer token is +// written to disk). Header names known to be tracking-only and that +// happen to contain a denylist substring should be added directly to +// wellKnownCorrelationHeaders only after confirming they do not carry +// credentials on the target server. +var sensitiveHeaderTokens = []string{ + "authorization", + "cookie", + "password", + "secret", + "private", + "credential", + "token", + "apikey", + "api-key", +} + +// isSensitiveHeader reports whether name (compared case-insensitively) +// matches any of the substrings in sensitiveHeaderTokens. It is the only +// place the driver guards against accidentally logging credential +// material lifted out of gRPC metadata; correlationHeaderAttrs and +// outgoingCallHeaderAttrs both consult it before promoting a header. +func isSensitiveHeader(name string) bool { + lower := strings.ToLower(name) + for _, tok := range sensitiveHeaderTokens { + if strings.Contains(lower, tok) { + return true + } + } + return false +} + +// headerAttrsWithPrefix is the shared implementation behind +// correlationHeaderAttrs (incoming headers/trailers) and +// outgoingCallHeaderAttrs (call-time outbound metadata). Both surfaces +// use the same allow-list and the same sensitive-header denylist; only +// the slog attribute prefix differs so an operator can tell at a glance +// whether the value was something the driver sent or something the +// server returned. Returns nil when no allow-listed header is present +// so callers can use append(...). +func headerAttrsWithPrefix(md metadata.MD, prefix string) []any { + if len(md) == 0 { + return nil + } + out := make([]any, 0, 4) + seen := make(map[string]bool, len(wellKnownCorrelationHeaders)) + for _, k := range wellKnownCorrelationHeaders { + seen[k] = true + if isSensitiveHeader(k) { + continue + } + if vals := md.Get(k); len(vals) > 0 { + out = append(out, slog.Any(prefix+k, vals)) + } + } + for k, vals := range md { + lk := strings.ToLower(k) + if seen[lk] { + continue + } + if isSensitiveHeader(lk) { + continue + } + if strings.HasSuffix(lk, "-request-id") || + strings.HasSuffix(lk, "-trace-id") || + strings.HasSuffix(lk, "-query-id") || + strings.HasSuffix(lk, "-session-id") || + strings.HasSuffix(lk, "-activity-id") { + out = append(out, slog.Any(prefix+lk, vals)) + } + } + return out +} + +// correlationHeaderAttrs returns slog attributes for the well-known +// correlation/tracing headers present in md (typically headers or +// trailers received from the server). Attribute names use the "hdr_" +// prefix to distinguish them from outbound headers, which use +// "out_hdr_" (see outgoingCallHeaderAttrs). Sensitive headers (matched +// by isSensitiveHeader) are always skipped. +func correlationHeaderAttrs(md metadata.MD) []any { + return headerAttrsWithPrefix(md, "hdr_") +} + +// outgoingCallHeaderAttrs returns slog attributes for the well-known +// correlation headers present on ctx's outbound gRPC metadata (the +// values the caller — for example Power Query — set before invoking +// the driver). Attribute names use the "out_hdr_" prefix so they can +// be visually separated from incoming response headers in a single +// log line. This is the join column that pairs an ADBC log record with +// the PBI / Mashup trace entry that triggered it. +func outgoingCallHeaderAttrs(ctx context.Context) []any { + if ctx == nil { + return nil + } + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { + return nil + } + return headerAttrsWithPrefix(md, "out_hdr_") +} + +// grpcStatusAttrs returns slog attributes describing the gRPC status +// embedded in err, or nil when err is nil or carries no gRPC status. +// The attributes ("grpc_code" and "grpc_message") are emitted as +// first-class structured fields rather than left inside the formatted +// error string so an operator can filter on them directly when +// triaging a batch of incidents — "Unavailable", "DeadlineExceeded" +// and "Unauthenticated" require very different remediations. +func grpcStatusAttrs(err error) []any { + if err == nil { + return nil + } + st, ok := status.FromError(err) + if !ok { + return nil + } + return []any{ + slog.String("grpc_code", st.Code().String()), + slog.String("grpc_message", st.Message()), + } +} + +// otelTraceHandler wraps an slog.Handler so that every record produced +// through it is automatically stamped with the current OpenTelemetry +// "trace_id" and "span_id" when the record's context carries an active +// span. This bridges the driver's structured log stream to its +// OpenTelemetry traces (and, transitively, to server-side traces that +// follow the same trace context via the W3C "traceparent" header), so +// a single identifier can be used to join all three views of a single +// failing operation. +type otelTraceHandler struct { + inner slog.Handler +} + +func (h *otelTraceHandler) Enabled(ctx context.Context, level slog.Level) bool { + return h.inner.Enabled(ctx, level) +} + +func (h *otelTraceHandler) Handle(ctx context.Context, r slog.Record) error { + if ctx != nil { + sc := trace.SpanFromContext(ctx).SpanContext() + if sc.IsValid() { + r.AddAttrs( + slog.String("trace_id", sc.TraceID().String()), + slog.String("span_id", sc.SpanID().String()), + ) + } + } + return h.inner.Handle(ctx, r) +} + +func (h *otelTraceHandler) WithAttrs(attrs []slog.Attr) slog.Handler { + return &otelTraceHandler{inner: h.inner.WithAttrs(attrs)} +} + +func (h *otelTraceHandler) WithGroup(name string) slog.Handler { + return &otelTraceHandler{inner: h.inner.WithGroup(name)} +} + +// withOtelTraceContext wraps logger so that every record it produces +// carries "trace_id" and "span_id" attributes derived from the +// OpenTelemetry span (if any) attached to the record's context. The +// wrap is idempotent: calling it twice on the same logger does not +// produce duplicate handlers. A nil logger is returned unchanged so +// callers can chain it freely. +func withOtelTraceContext(logger *slog.Logger) *slog.Logger { + if logger == nil { + return logger + } + if _, alreadyWrapped := logger.Handler().(*otelTraceHandler); alreadyWrapped { + return logger + } + return slog.New(&otelTraceHandler{inner: logger.Handler()}) +} + +// newRandomID returns a short random identifier suitable for tagging log +// records and ADBC error details. The result has the form "-" +// where hex is 12 lower-case hex characters (48 bits of entropy) — enough +// to disambiguate concurrent operations within a single process without +// inflating log line widths. Falls back to a nanosecond timestamp if the +// crypto/rand source is unavailable. +func newRandomID(prefix string) string { + var b [6]byte + if _, err := rand.Read(b[:]); err != nil { + return prefix + "-" + strconv.FormatInt(time.Now().UnixNano(), 16) + } + return prefix + "-" + hex.EncodeToString(b[:]) +} + +// queryFingerprintAttrs builds slog attributes identifying a SQL query in a +// way that is safe to log even when the query may contain sensitive literal +// values from end-user inputs. Always emits the query length and the first +// 16 hex characters of its SHA-256 digest so two log lines reporting the +// "same" query can be matched without exposing the text. When previewLimit +// is greater than zero a "query_preview" attribute is also emitted holding +// the first previewLimit characters of the query verbatim — this is opt-in +// because Power Query / Mashup workflows can embed user data in SQL. +func queryFingerprintAttrs(query string, previewLimit int) []any { + if query == "" { + return []any{slog.String("query_type", "empty")} + } + h := sha256.Sum256([]byte(query)) + attrs := []any{ + slog.String("query_type", "sql"), + slog.Int("query_length", len(query)), + slog.String("query_sha256_prefix", hex.EncodeToString(h[:8])), + } + if previewLimit > 0 { + preview := query + truncated := false + if len(preview) > previewLimit { + preview = preview[:previewLimit] + truncated = true + } + attrs = append(attrs, slog.String("query_preview", preview)) + if truncated { + attrs = append(attrs, slog.Bool("query_preview_truncated", true)) + } + } + return attrs +} + +// substraitFingerprintAttrs builds slog attributes identifying a Substrait +// plan. The plan bytes themselves are never logged; only the length, the +// SHA-256 prefix, and the declared protocol version are emitted so an +// operator can fingerprint the plan without exposing its contents. +func substraitFingerprintAttrs(plan []byte, version string) []any { + if len(plan) == 0 { + return []any{slog.String("query_type", "substrait_empty")} + } + h := sha256.Sum256(plan) + attrs := []any{ + slog.String("query_type", "substrait"), + slog.Int("substrait_plan_bytes", len(plan)), + slog.String("substrait_plan_sha256_prefix", hex.EncodeToString(h[:8])), + } + if version != "" { + attrs = append(attrs, slog.String("substrait_version", version)) + } + return attrs +} + +// flightInfoLogAttrs returns slog attributes describing a FlightInfo +// response. The attributes include correlation hooks (descriptor command +// type and a hex prefix of the descriptor command bytes plus a hex prefix +// of FlightInfo.AppMetadata — many backends, including Dremio and Spice, +// embed an opaque server-side query handle in AppMetadata) and capacity +// information (number of endpoints, advertised total records/bytes). The +// total records and bytes are advisory only as servers are not required to +// populate them. +func flightInfoLogAttrs(info *flight.FlightInfo) []any { + if info == nil { + return nil + } + attrs := []any{ + slog.Int("numEndpoints", len(info.Endpoint)), + slog.Int64("totalRecords", info.TotalRecords), + slog.Int64("totalBytes", info.TotalBytes), + slog.Bool("haveSchemaInFlightInfo", len(info.Schema) > 0), + } + if desc := info.FlightDescriptor; desc != nil { + attrs = append(attrs, slog.String("descriptorType", desc.Type.String())) + if len(desc.Cmd) > 0 { + limit := len(desc.Cmd) + if limit > maxLoggedTicketBytes { + limit = maxLoggedTicketBytes + } + attrs = append(attrs, + slog.Int("descriptorCmdBytes", len(desc.Cmd)), + slog.String("descriptorCmdPrefixHex", hex.EncodeToString(desc.Cmd[:limit])), + ) + } + if len(desc.Path) > 0 { + attrs = append(attrs, slog.Any("descriptorPath", desc.Path)) + } + } + if len(info.AppMetadata) > 0 { + limit := len(info.AppMetadata) + if limit > maxLoggedTicketBytes { + limit = maxLoggedTicketBytes + } + attrs = append(attrs, + slog.Int("appMetadataBytes", len(info.AppMetadata)), + slog.String("appMetadataPrefixHex", hex.EncodeToString(info.AppMetadata[:limit])), + ) + } + return attrs +} + +// withOperationIDs returns err with "statement_id" and "connection_id" +// text error details appended when err is an adbc.Error. Otherwise the +// original error is returned unchanged. Recording these identifiers in +// ADBC's error details surfaces them all the way to the host application +// (for example Power Query / Mashup), enabling a single failing operation +// to be traced through both client-side logs and the host's own logs. +func withOperationIDs(err error, statementID, connectionID string) error { + if err == nil { + return err + } + adbcErr, ok := err.(adbc.Error) + if !ok { + return err + } + if statementID != "" { + adbcErr.Details = append(adbcErr.Details, + &adbc.TextErrorDetail{Name: "statement_id", Detail: statementID}) + } + if connectionID != "" { + adbcErr.Details = append(adbcErr.Details, + &adbc.TextErrorDetail{Name: "connection_id", Detail: connectionID}) + } + return adbcErr +} + +// operationIDsCtxKey is the context.Value key used to propagate the +// statement_id and connection_id identifiers from a statement's Execute +// method into asynchronous code paths (such as the per-endpoint goroutines +// inside newRecordReader) so that errors surfaced later through +// reader.Err() can be stamped with the same correlation IDs that the +// synchronous error path uses. +type operationIDsCtxKey struct{} + +type operationIDs struct { + statementID string + connectionID string +} + +// withOperationIDsCtx attaches the given statement and connection +// identifiers to ctx so they can be retrieved by operationIDsFromCtx in +// downstream code such as newRecordReader. Returns ctx unchanged if both +// identifiers are empty. +func withOperationIDsCtx(ctx context.Context, statementID, connectionID string) context.Context { + if statementID == "" && connectionID == "" { + return ctx + } + return context.WithValue(ctx, operationIDsCtxKey{}, operationIDs{ + statementID: statementID, + connectionID: connectionID, + }) +} + +// operationIDsFromCtx returns the statement and connection identifiers +// previously stored on ctx by withOperationIDsCtx, or empty strings if +// none were set. Safe to call on any context. +func operationIDsFromCtx(ctx context.Context) (statementID, connectionID string) { + if ctx == nil { + return "", "" + } + if v, ok := ctx.Value(operationIDsCtxKey{}).(operationIDs); ok { + return v.statementID, v.connectionID + } + return "", "" +} diff --git a/go/adbc/driver/flightsql/logging_test.go b/go/adbc/driver/flightsql/logging_test.go new file mode 100644 index 0000000000..c7dbc0a2fc --- /dev/null +++ b/go/adbc/driver/flightsql/logging_test.go @@ -0,0 +1,407 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package flightsql + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "log/slog" + "testing" + + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" +) + +// TestIsSensitiveHeader checks the credential-substring denylist that +// gates header promotion in headerAttrsWithPrefix. The intent of the +// denylist is to be coarse: false positives (a useful header is +// skipped) are acceptable, false negatives (a credential header is +// logged) are not. +func TestIsSensitiveHeader(t *testing.T) { + cases := []struct { + name string + input string + want bool + category string + }{ + // Credential headers — must all be flagged. + {"authorization", "authorization", true, "credential"}, + {"authorization_mixed_case", "Authorization", true, "credential"}, + {"proxy-authorization", "Proxy-Authorization", true, "credential"}, + {"cookie", "Cookie", true, "credential"}, + {"set-cookie", "Set-Cookie", true, "credential"}, + {"x-api-key", "x-api-key", true, "credential"}, + {"x-auth-token", "X-Auth-Token", true, "credential"}, + {"x-csrf-token", "X-Csrf-Token", true, "credential"}, + {"any-password-header", "X-User-Password", true, "credential"}, + {"any-secret-header", "X-Shared-Secret", true, "credential"}, + {"private-key", "X-Private-Key", true, "credential"}, + {"credentials-bin", "x-credentials-bin", true, "credential"}, + {"apikey-mashed", "MyApikey", true, "credential"}, + + // Tracking / correlation headers — must NOT be flagged so they + // can be promoted into log records. + {"x-request-id", "x-request-id", false, "tracking"}, + {"x-correlation-id", "x-correlation-id", false, "tracking"}, + {"traceparent", "traceparent", false, "tracking"}, + {"tracestate", "tracestate", false, "tracking"}, + {"x-b3-traceid", "x-b3-traceid", false, "tracking"}, + {"activityid", "activityid", false, "tracking"}, + {"activity-id", "activity-id", false, "tracking"}, + {"x-ms-client-request-id", "x-ms-client-request-id", false, "tracking"}, + {"x-ms-activity-id", "x-ms-activity-id", false, "tracking"}, + {"x-pbi-activity-id", "x-pbi-activity-id", false, "tracking"}, + {"empty", "", false, "edge"}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := isSensitiveHeader(tc.input) + if got != tc.want { + t.Fatalf("isSensitiveHeader(%q) = %v, want %v (%s)", + tc.input, got, tc.want, tc.category) + } + }) + } +} + +// TestSensitiveTokenDoesNotCollideWithAllowlist guards against a future +// edit that adds an entry to wellKnownCorrelationHeaders whose name +// happens to contain a sensitiveHeaderTokens substring. Such an entry +// would be silently dropped at log time (because headerAttrsWithPrefix +// consults isSensitiveHeader before promoting any allow-listed header), +// so we surface the contradiction here at test time. +func TestSensitiveTokenDoesNotCollideWithAllowlist(t *testing.T) { + for _, h := range wellKnownCorrelationHeaders { + if isSensitiveHeader(h) { + t.Fatalf("wellKnownCorrelationHeaders entry %q is flagged sensitive by isSensitiveHeader; "+ + "either rename the header or remove it from the allowlist", h) + } + } +} + +// TestHeaderAttrsWithPrefix_AllowAndDeny exercises the curated allow-list, +// the suffix-match path, and the sensitive-header denylist. The function +// is the engine behind both correlationHeaderAttrs ("hdr_" prefix, used +// on received headers / trailers) and outgoingCallHeaderAttrs +// ("out_hdr_" prefix, used on call-time outgoing metadata). +func TestHeaderAttrsWithPrefix_AllowAndDeny(t *testing.T) { + md := metadata.New(map[string]string{ + // Allow-listed exact match. + "x-request-id": "req-1", + // Microsoft / PBI allow-listed exact match. + "activityid": "act-1", + // Suffix-match: ends with -request-id (not on the allow-list + // literally, but caught by the suffix rule). + "x-vendor-request-id": "vreq-1", + // New activity-id suffix rule. + "x-pbi-activity-id": "pbi-act-1", + // Credential header — must be filtered. + "authorization": "Bearer SECRET", + // Credential by substring — must be filtered. + "x-tenant-token": "tok-1", + // Random header with no allow-list / suffix hit — must be + // ignored to avoid log spam. + "x-random-header": "noise", + }) + + got := headerAttrsWithPrefix(md, "hdr_") + + // Convert []any of alternating key/value pairs into a map for + // stable assertions. Each slog.Any takes the form (string, slice). + gotMap := slogAttrsToMap(t, got) + + wantPresent := []string{ + "hdr_x-request-id", + "hdr_activityid", + "hdr_x-vendor-request-id", + "hdr_x-pbi-activity-id", + } + for _, k := range wantPresent { + if _, ok := gotMap[k]; !ok { + t.Errorf("expected attribute %q in headerAttrsWithPrefix result, got keys=%v", + k, sortedKeys(gotMap)) + } + } + + wantAbsent := []string{ + "hdr_authorization", + "hdr_x-tenant-token", + "hdr_x-random-header", + } + for _, k := range wantAbsent { + if _, ok := gotMap[k]; ok { + t.Errorf("unexpected attribute %q in headerAttrsWithPrefix result "+ + "(must be filtered)", k) + } + } +} + +// TestHeaderAttrsWithPrefix_EmptyMetadata verifies the function returns +// nil (not an empty slice) when there is nothing to log, so callers can +// safely use append(...) without producing an empty placeholder entry. +func TestHeaderAttrsWithPrefix_EmptyMetadata(t *testing.T) { + if got := headerAttrsWithPrefix(nil, "hdr_"); got != nil { + t.Fatalf("headerAttrsWithPrefix(nil, _) = %v, want nil", got) + } + if got := headerAttrsWithPrefix(metadata.MD{}, "hdr_"); got != nil { + t.Fatalf("headerAttrsWithPrefix(empty, _) = %v, want nil", got) + } +} + +// TestCorrelationVsOutgoingPrefix asserts the two public wrappers use +// distinct prefixes so received and sent headers never collide in a +// single log line. +func TestCorrelationVsOutgoingPrefix(t *testing.T) { + md := metadata.New(map[string]string{ + "activityid": "act-99", + }) + + in := slogAttrsToMap(t, correlationHeaderAttrs(md)) + if _, ok := in["hdr_activityid"]; !ok { + t.Errorf("correlationHeaderAttrs did not emit hdr_activityid; got %v", + sortedKeys(in)) + } + + ctx := metadata.NewOutgoingContext(context.Background(), md) + out := slogAttrsToMap(t, outgoingCallHeaderAttrs(ctx)) + if _, ok := out["out_hdr_activityid"]; !ok { + t.Errorf("outgoingCallHeaderAttrs did not emit out_hdr_activityid; got %v", + sortedKeys(out)) + } +} + +// TestOutgoingCallHeaderAttrs_NilOrMissingContext covers the safety paths +// (nil context, context without outbound metadata) so that the call +// sites in the unary/stream interceptors do not need their own +// nil-guards. +func TestOutgoingCallHeaderAttrs_NilOrMissingContext(t *testing.T) { + // Use a typed nil context.Context variable rather than the untyped + // `nil` literal: staticcheck SA1012 flags passing the literal `nil` + // to a context.Context parameter (and offers two conflicting + // auto-fixes that collide in the linter). The function under test + // *does* have a `ctx == nil` guard that we want to exercise, and a + // typed nil interface value still compares equal to nil, so this + // covers the same branch without tripping the lint rule. + var nilCtx context.Context + if got := outgoingCallHeaderAttrs(nilCtx); got != nil { + t.Fatalf("outgoingCallHeaderAttrs(nil) = %v, want nil", got) + } + if got := outgoingCallHeaderAttrs(context.Background()); got != nil { + t.Fatalf("outgoingCallHeaderAttrs(context.Background()) = %v, want nil "+ + "(no outbound metadata set)", got) + } +} + +// TestGrpcStatusAttrs covers the helper that promotes a gRPC status to +// its own structured "grpc_code"/"grpc_message" log fields. The helper +// must handle nil errors, plain Go errors, real gRPC status errors, +// and gRPC errors that have been wrapped via fmt.Errorf("%w", ...). +func TestGrpcStatusAttrs(t *testing.T) { + t.Run("nil_error", func(t *testing.T) { + if got := grpcStatusAttrs(nil); got != nil { + t.Fatalf("grpcStatusAttrs(nil) = %v, want nil", got) + } + }) + + t.Run("plain_error", func(t *testing.T) { + // errors.New does not carry a GRPCStatus()/Unwrap chain, so + // status.FromError returns ok=false and the helper returns + // nil rather than synthesizing a fake code. + if got := grpcStatusAttrs(errors.New("boom")); got != nil { + t.Fatalf("grpcStatusAttrs(errors.New) = %v, want nil", got) + } + }) + + t.Run("grpc_status_error", func(t *testing.T) { + err := status.Error(codes.Unavailable, "DoGet: endpoint 0") + got := slogAttrsToMap(t, grpcStatusAttrs(err)) + if v := got["grpc_code"]; v != "Unavailable" { + t.Errorf("grpc_code = %q, want %q", v, "Unavailable") + } + if v := got["grpc_message"]; v != "DoGet: endpoint 0" { + t.Errorf("grpc_message = %q, want %q", v, "DoGet: endpoint 0") + } + }) + + t.Run("wrapped_grpc_status_error", func(t *testing.T) { + inner := status.Error(codes.DeadlineExceeded, "timeout") + wrapped := fmt.Errorf("outer: %w", inner) + got := slogAttrsToMap(t, grpcStatusAttrs(wrapped)) + if v := got["grpc_code"]; v != "DeadlineExceeded" { + t.Errorf("grpc_code = %q, want %q", v, "DeadlineExceeded") + } + }) +} + +// TestOtelTraceHandler_InjectsTraceIDs creates an slog handler chain +// "JSON -> otelTraceHandler -> buffer", emits a record with a context +// carrying a known SpanContext, and verifies that the handler stamped +// "trace_id" and "span_id" attributes onto the resulting record. This +// is the bridge between the driver's slog stream and any external +// OpenTelemetry traces the host application is producing. +func TestOtelTraceHandler_InjectsTraceIDs(t *testing.T) { + var buf bytes.Buffer + base := slog.NewJSONHandler(&buf, &slog.HandlerOptions{Level: slog.LevelDebug}) + logger := slog.New(&otelTraceHandler{inner: base}) + + tidHex := "4bf92f3577b34da6a3ce929d0e0e4736" + sidHex := "00f067aa0ba902b7" + tid, err := trace.TraceIDFromHex(tidHex) + if err != nil { + t.Fatalf("TraceIDFromHex: %v", err) + } + sid, err := trace.SpanIDFromHex(sidHex) + if err != nil { + t.Fatalf("SpanIDFromHex: %v", err) + } + sc := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: tid, + SpanID: sid, + TraceFlags: trace.FlagsSampled, + Remote: true, + }) + ctx := trace.ContextWithSpanContext(context.Background(), sc) + + logger.InfoContext(ctx, "test event") + + rec := decodeFirstLogLine(t, buf.Bytes()) + if got := rec["trace_id"]; got != tidHex { + t.Errorf("trace_id = %q, want %q (full record: %v)", got, tidHex, rec) + } + if got := rec["span_id"]; got != sidHex { + t.Errorf("span_id = %q, want %q (full record: %v)", got, sidHex, rec) + } +} + +// TestOtelTraceHandler_NoSpanLeavesRecordUnchanged ensures the handler +// is a no-op when the context does not carry a valid SpanContext. We +// must not invent placeholder trace/span IDs just to fill the slot — +// otherwise downstream log search would match unrelated records. +func TestOtelTraceHandler_NoSpanLeavesRecordUnchanged(t *testing.T) { + var buf bytes.Buffer + base := slog.NewJSONHandler(&buf, &slog.HandlerOptions{Level: slog.LevelDebug}) + logger := slog.New(&otelTraceHandler{inner: base}) + + logger.InfoContext(context.Background(), "no span") + + rec := decodeFirstLogLine(t, buf.Bytes()) + if _, ok := rec["trace_id"]; ok { + t.Errorf("unexpected trace_id in record without active span: %v", rec) + } + if _, ok := rec["span_id"]; ok { + t.Errorf("unexpected span_id in record without active span: %v", rec) + } +} + +// TestWithOtelTraceContext_Idempotent verifies the wrap helper does not +// stack handlers on repeated calls. Without this guard every +// derivation step (NewConnection, NewStatement, ...) would add another +// wrapper, slowing logging and bloating handler chains over time. +func TestWithOtelTraceContext_Idempotent(t *testing.T) { + if got := withOtelTraceContext(nil); got != nil { + t.Fatalf("withOtelTraceContext(nil) = %v, want nil", got) + } + + base := slog.New(slog.NewJSONHandler(&bytes.Buffer{}, nil)) + wrapped1 := withOtelTraceContext(base) + wrapped2 := withOtelTraceContext(wrapped1) + if _, ok := wrapped2.Handler().(*otelTraceHandler); !ok { + t.Fatalf("expected outer handler to be *otelTraceHandler after double-wrap") + } + // Drill one level into the inner handler — it must not itself be + // another *otelTraceHandler (which would mean the helper stacked + // instead of de-duplicating). + outer := wrapped2.Handler().(*otelTraceHandler) + if _, doubled := outer.inner.(*otelTraceHandler); doubled { + t.Fatalf("withOtelTraceContext stacked handlers on repeated calls") + } +} + +// TestSafeLogger_AlwaysWrapsOtel guarantees that every logger going +// through the central safe wrapper carries the OTEL trace bridge, so +// individual callers do not have to remember to add it themselves. +func TestSafeLogger_AlwaysWrapsOtel(t *testing.T) { + t.Run("nil_input", func(t *testing.T) { + l := safeLogger(nil) + if l == nil { + t.Fatal("safeLogger(nil) returned nil") + } + if _, ok := l.Handler().(*otelTraceHandler); !ok { + t.Errorf("safeLogger(nil) did not wrap with otelTraceHandler; got %T", + l.Handler()) + } + }) + t.Run("real_input", func(t *testing.T) { + base := slog.New(slog.NewJSONHandler(&bytes.Buffer{}, nil)) + l := safeLogger(base) + if _, ok := l.Handler().(*otelTraceHandler); !ok { + t.Errorf("safeLogger(base) did not wrap with otelTraceHandler; got %T", + l.Handler()) + } + }) +} + +// ---------- test helpers ---------- + +// slogAttrsToMap converts the slog.Attr slice returned by the various +// "...Attrs" helpers into a map[string]string keyed by attribute name. +// Each element of the input is expected to be a single slog.Attr value +// (which is what slog.Any / slog.String produce). The map's values are +// taken from the slog.Value's String() representation so callers can do +// straightforward equality assertions without unwrapping Value kinds. +func slogAttrsToMap(t *testing.T, attrs []any) map[string]string { + t.Helper() + out := make(map[string]string, len(attrs)) + for i, a := range attrs { + attr, ok := a.(slog.Attr) + if !ok { + t.Fatalf("attrs[%d] is %T, want slog.Attr (value=%v)", i, a, a) + } + out[attr.Key] = attr.Value.String() + } + return out +} + +func sortedKeys(m map[string]string) []string { + out := make([]string, 0, len(m)) + for k := range m { + out = append(out, k) + } + return out +} + +func decodeFirstLogLine(t *testing.T, b []byte) map[string]any { + t.Helper() + line := bytes.TrimSpace(b) + if i := bytes.IndexByte(line, '\n'); i >= 0 { + line = line[:i] + } + if len(line) == 0 { + t.Fatalf("no log lines captured") + } + rec := map[string]any{} + if err := json.Unmarshal(line, &rec); err != nil { + t.Fatalf("failed to decode log line %q: %v", line, err) + } + return rec +} diff --git a/go/adbc/driver/flightsql/record_reader.go b/go/adbc/driver/flightsql/record_reader.go index a0c2cc4811..a3252ea54e 100644 --- a/go/adbc/driver/flightsql/record_reader.go +++ b/go/adbc/driver/flightsql/record_reader.go @@ -20,6 +20,7 @@ package flightsql import ( "context" "fmt" + "log/slog" "sync/atomic" "github.com/apache/arrow-adbc/go/adbc" @@ -29,6 +30,7 @@ import ( "github.com/apache/arrow-go/v18/arrow/flight" "github.com/apache/arrow-go/v18/arrow/flight/flightsql" "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/arrow-go/v18/arrow/util" "github.com/bluele/gcache" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -46,9 +48,38 @@ type reader struct { cancelFn context.CancelFunc } +// recordReaderConfig bundles the dependencies that newRecordReader +// needs to spin up its per-endpoint goroutines. Grouping them into a +// single value keeps the call sites compact and lets new fields be +// added without rippling another positional argument through every +// caller. The fields mirror the corresponding members on +// connectionImpl/statement so callers can populate the struct by +// straight field copies. +type recordReaderConfig struct { + alloc memory.Allocator + cl *flightsql.Client + info *flight.FlightInfo + clientCache gcache.Cache + bufferSize int + logger *slog.Logger +} + // kicks off a goroutine for each endpoint and returns a reader which // gathers all of the records as they come in. -func newRecordReader(ctx context.Context, alloc memory.Allocator, cl *flightsql.Client, info *flight.FlightInfo, clCache gcache.Cache, bufferSize int, opts ...grpc.CallOption) (rdr array.RecordReader, err error) { +// +// cfg.logger may be nil; in that case a no-op logger is used internally. +// When supplied it receives structured records describing every endpoint +// stream's lifecycle (start, first batch received, completion, failure). +// These records are essential when diagnosing transient stream failures +// such as "[FlightSQL] error reading from server: EOF (Unavailable; DoGet: +// endpoint N: [])" because they record exactly which endpoint failed, how +// many batches/rows had already been received, and how long the stream had +// been open at the time of failure. Without these records the operator +// otherwise has only the bare gRPC EOF to work with, which carries no +// progress or location information. +func newRecordReader(ctx context.Context, cfg recordReaderConfig, opts ...grpc.CallOption) (rdr array.RecordReader, err error) { + log := safeLogger(cfg.logger) + info := cfg.info endpoints := info.Endpoint var header, trailer metadata.MD opts = append(append([]grpc.CallOption{}, opts...), grpc.Header(&header), grpc.Trailer(&trailer)) @@ -60,7 +91,7 @@ func newRecordReader(ctx context.Context, alloc memory.Allocator, cl *flightsql. Code: adbc.StatusInternal, } } - schema, err = flight.DeserializeSchema(info.Schema, alloc) + schema, err = flight.DeserializeSchema(info.Schema, cfg.alloc) if err != nil { return nil, adbc.Error{ Msg: "Server returned FlightInfo with invalid schema and no endpoints, cannot read stream", @@ -70,12 +101,18 @@ func newRecordReader(ctx context.Context, alloc memory.Allocator, cl *flightsql. return array.NewRecordReader(schema, []arrow.RecordBatch{}) } - ch := make(chan arrow.RecordBatch, bufferSize) + ch := make(chan arrow.RecordBatch, cfg.bufferSize) group, ctx := errgroup.WithContext(ctx) ctx, cancelFn := context.WithCancel(ctx) // We may mutate endpoints below numEndpoints := len(endpoints) + log.DebugContext(ctx, "FlightSQL newRecordReader start", + append([]any{ + slog.Int("bufferSize", cfg.bufferSize), + }, flightInfoLogAttrs(info)...)..., + ) + defer func() { if err != nil { close(ch) @@ -84,7 +121,7 @@ func newRecordReader(ctx context.Context, alloc memory.Allocator, cl *flightsql. }() if info.Schema != nil { - schema, err = flight.DeserializeSchema(info.Schema, alloc) + schema, err = flight.DeserializeSchema(info.Schema, cfg.alloc) if err != nil { return nil, adbc.Error{ Msg: err.Error(), @@ -92,9 +129,19 @@ func newRecordReader(ctx context.Context, alloc memory.Allocator, cl *flightsql. } } else { firstEndpoint := endpoints[0] - rdr, err := doGet(ctx, cl, firstEndpoint, clCache, opts...) + epAttrs := endpointLogAttrs(0, numEndpoints, firstEndpoint) + log.DebugContext(ctx, "FlightSQL endpoint stream opening (schema discovery)", epAttrs...) + startSchemaFetch := newStreamProgress() + rdr, err := doGetWithLogger(ctx, cfg.cl, firstEndpoint, cfg.clientCache, log, opts...) if err != nil { - return nil, adbcFromFlightStatusWithDetails(err, header, trailer, "DoGet: endpoint 0: remote: %s", firstEndpoint.Location) + log.ErrorContext(ctx, "FlightSQL endpoint DoGet failed (schema discovery)", + append(append([]any{}, epAttrs...), + "err", err, + "elapsed", startSchemaFetch.summary(), + )..., + ) + return nil, adbcFromFlightStatusWithDetails(err, header, trailer, + "DoGet: endpoint 0: remote: %s [%s]", firstEndpoint.Location, startSchemaFetch.summary()) } schema = rdr.Schema() group.Go(func() error { @@ -103,14 +150,27 @@ func newRecordReader(ctx context.Context, alloc memory.Allocator, cl *flightsql. defer close(ch) } + progress := newStreamProgress() for rdr.Next() && ctx.Err() == nil { rec := rdr.RecordBatch() + progress.recordBatch(rec.NumRows(), util.TotalRecordSize(rec)) rec.Retain() ch <- rec } if err := checkContext(rdr.Err(), ctx); err != nil { - return adbcFromFlightStatusWithDetails(err, header, trailer, "DoGet: endpoint 0: remote: %s", firstEndpoint.Location) + log.ErrorContext(ctx, "FlightSQL endpoint stream ended with error", + append(append([]any{}, endpointLogAttrs(0, numEndpoints, firstEndpoint)...), + append([]any{"err", err}, progress.logAttrs()...)..., + )..., + ) + return adbcFromFlightStatusWithDetails(err, header, trailer, + "DoGet: endpoint 0: remote: %s [%s]", firstEndpoint.Location, progress.summary()) } + log.DebugContext(ctx, "FlightSQL endpoint stream completed", + append(append([]any{}, endpointLogAttrs(0, numEndpoints, firstEndpoint)...), + progress.logAttrs()..., + )..., + ) return nil }) @@ -133,39 +193,92 @@ func newRecordReader(ctx context.Context, alloc memory.Allocator, cl *flightsql. for i, ep := range endpoints { endpoint := ep endpointIndex := i - chs[endpointIndex] = make(chan arrow.RecordBatch, bufferSize) + // Offset the endpoint index for the log records to account for endpoint 0 + // having been processed above when info.Schema was unset. + logEndpointIndex := endpointIndex + if info.Schema == nil { + logEndpointIndex = endpointIndex + 1 + } + chs[endpointIndex] = make(chan arrow.RecordBatch, cfg.bufferSize) group.Go(func() error { // Close channels (except the last) so that Next can move on to the next channel properly if endpointIndex != lastChannelIndex { defer close(chs[endpointIndex]) } - rdr, err := doGet(ctx, cl, endpoint, clCache, opts...) + epAttrs := endpointLogAttrs(logEndpointIndex, numEndpoints, endpoint) + log.DebugContext(ctx, "FlightSQL endpoint stream opening", epAttrs...) + doGetStart := newStreamProgress() + rdr, err := doGetWithLogger(ctx, cfg.cl, endpoint, cfg.clientCache, log, opts...) if err != nil { - return adbcFromFlightStatusWithDetails(err, header, trailer, "DoGet: endpoint %d: %s", endpointIndex, endpoint.Location) + log.ErrorContext(ctx, "FlightSQL endpoint DoGet failed", + append(append([]any{}, epAttrs...), + "err", err, + "elapsed", doGetStart.summary(), + )..., + ) + return adbcFromFlightStatusWithDetails(err, header, trailer, + "DoGet: endpoint %d: %s [%s]", logEndpointIndex, endpoint.Location, doGetStart.summary()) } defer rdr.Release() streamSchema := utils.RemoveSchemaMetadata(rdr.Schema()) if !streamSchema.Equal(referenceSchema) { - return fmt.Errorf("endpoint %d returned inconsistent schema: expected %s but got %s", endpointIndex, referenceSchema.String(), streamSchema.String()) + log.ErrorContext(ctx, "FlightSQL endpoint returned inconsistent schema", + append(append([]any{}, epAttrs...), + "expectedSchema", referenceSchema.String(), + "actualSchema", streamSchema.String(), + )..., + ) + return fmt.Errorf("endpoint %d returned inconsistent schema: expected %s but got %s", logEndpointIndex, referenceSchema.String(), streamSchema.String()) } + progress := newStreamProgress() for rdr.Next() && ctx.Err() == nil { rec := rdr.RecordBatch() + progress.recordBatch(rec.NumRows(), util.TotalRecordSize(rec)) rec.Retain() chs[endpointIndex] <- rec } if err := checkContext(rdr.Err(), ctx); err != nil { - return adbcFromFlightStatusWithDetails(err, header, trailer, "DoGet: endpoint %d: %s", endpointIndex, endpoint.Location) + log.ErrorContext(ctx, "FlightSQL endpoint stream ended with error", + append(append([]any{}, epAttrs...), + append([]any{"err", err}, progress.logAttrs()...)..., + )..., + ) + return adbcFromFlightStatusWithDetails(err, header, trailer, + "DoGet: endpoint %d: %s [%s]", logEndpointIndex, endpoint.Location, progress.summary()) } + log.DebugContext(ctx, "FlightSQL endpoint stream completed", + append(append([]any{}, epAttrs...), + progress.logAttrs()..., + )..., + ) return nil }) } go func() { - reader.err = group.Wait() + err := group.Wait() + // Surface the statement/connection identifiers (if any were + // propagated through ctx by the caller) on the error returned via + // reader.Err() so the most common production failure mode — a + // mid-stream EOF reported only after the reader has been handed + // off to client code — carries the same correlation IDs that a + // synchronous ExecuteQuery error would. + stmtID, connID := operationIDsFromCtx(ctx) + reader.err = withOperationIDs(err, stmtID, connID) + if reader.err != nil { + log.WarnContext(ctx, "FlightSQL record reader finished with error", + "err", reader.err, + "numEndpoints", numEndpoints, + ) + } else { + log.DebugContext(ctx, "FlightSQL record reader finished successfully", + "numEndpoints", numEndpoints, + ) + } // Don't close the last channel until after the group is finished, so that // Next() can only return after reader.err may have been set close(chs[lastChannelIndex]) diff --git a/go/adbc/driver/flightsql/record_reader_test.go b/go/adbc/driver/flightsql/record_reader_test.go index ab11e05533..ab7b5f1794 100644 --- a/go/adbc/driver/flightsql/record_reader_test.go +++ b/go/adbc/driver/flightsql/record_reader_test.go @@ -160,7 +160,13 @@ func (suite *RecordReaderTests) TestFallbackFailedConnection() { }, } - reader, err := newRecordReader(context.Background(), suite.alloc, suite.cl, &info, suite.clCache, 3) + reader, err := newRecordReader(context.Background(), recordReaderConfig{ + alloc: suite.alloc, + cl: suite.cl, + info: &info, + clientCache: suite.clCache, + bufferSize: 3, + }) suite.NoError(err) defer reader.Release() @@ -190,7 +196,13 @@ func (suite *RecordReaderTests) TestFallbackFailedDoGet() { }, } - reader, err := newRecordReader(context.Background(), suite.alloc, suite.cl, &info, suite.clCache, 3) + reader, err := newRecordReader(context.Background(), recordReaderConfig{ + alloc: suite.alloc, + cl: suite.cl, + info: &info, + clientCache: suite.clCache, + bufferSize: 3, + }) suite.NoError(err) defer reader.Release() @@ -204,7 +216,13 @@ func (suite *RecordReaderTests) TestFallbackFailedDoGet() { // Not enough retries suite.service.failureCount = 4 - reader, err = newRecordReader(context.Background(), suite.alloc, suite.cl, &info, suite.clCache, 3) + reader, err = newRecordReader(context.Background(), recordReaderConfig{ + alloc: suite.alloc, + cl: suite.cl, + info: &info, + clientCache: suite.clCache, + bufferSize: 3, + }) suite.NoError(err) defer reader.Release() suite.False(reader.Next()) @@ -223,7 +241,13 @@ func (suite *RecordReaderTests) TestFallbackFailed() { }, } - reader, err := newRecordReader(context.Background(), suite.alloc, suite.cl, &info, suite.clCache, 3) + reader, err := newRecordReader(context.Background(), recordReaderConfig{ + alloc: suite.alloc, + cl: suite.cl, + info: &info, + clientCache: suite.clCache, + bufferSize: 3, + }) suite.NoError(err) defer reader.Release() @@ -236,7 +260,13 @@ func (suite *RecordReaderTests) TestNoEndpoints() { Schema: flight.SerializeSchema(orderingSchema(), suite.alloc), } - reader, err := newRecordReader(context.Background(), suite.alloc, suite.cl, &info, suite.clCache, 3) + reader, err := newRecordReader(context.Background(), recordReaderConfig{ + alloc: suite.alloc, + cl: suite.cl, + info: &info, + clientCache: suite.clCache, + bufferSize: 3, + }) suite.NoError(err) defer reader.Release() @@ -248,7 +278,13 @@ func (suite *RecordReaderTests) TestNoEndpoints() { func (suite *RecordReaderTests) TestNoEndpointsNoSchema() { info := flight.FlightInfo{} - _, err := newRecordReader(context.Background(), suite.alloc, suite.cl, &info, suite.clCache, 3) + _, err := newRecordReader(context.Background(), recordReaderConfig{ + alloc: suite.alloc, + cl: suite.cl, + info: &info, + clientCache: suite.clCache, + bufferSize: 3, + }) suite.ErrorContains(err, "Server returned FlightInfo with no schema and no endpoints, cannot read stream") } @@ -257,7 +293,13 @@ func (suite *RecordReaderTests) TestNoEndpointsInvalidSchema() { Schema: []byte("f"), } - _, err := newRecordReader(context.Background(), suite.alloc, suite.cl, &info, suite.clCache, 3) + _, err := newRecordReader(context.Background(), recordReaderConfig{ + alloc: suite.alloc, + cl: suite.cl, + info: &info, + clientCache: suite.clCache, + bufferSize: 3, + }) suite.ErrorContains(err, "Server returned FlightInfo with invalid schema and no endpoints, cannot read stream") } @@ -272,7 +314,13 @@ func (suite *RecordReaderTests) TestNoSchema() { }, } - reader, err := newRecordReader(context.Background(), suite.alloc, suite.cl, &info, suite.clCache, 3) + reader, err := newRecordReader(context.Background(), recordReaderConfig{ + alloc: suite.alloc, + cl: suite.cl, + info: &info, + clientCache: suite.clCache, + bufferSize: 3, + }) suite.NoError(err) defer reader.Release() @@ -305,7 +353,13 @@ func (suite *RecordReaderTests) TestSchemaEndpointMismatch() { }, } - reader, err := newRecordReader(context.Background(), suite.alloc, suite.cl, &info, suite.clCache, 3) + reader, err := newRecordReader(context.Background(), recordReaderConfig{ + alloc: suite.alloc, + cl: suite.cl, + info: &info, + clientCache: suite.clCache, + bufferSize: 3, + }) suite.NoError(err) defer reader.Release() @@ -339,7 +393,13 @@ func (suite *RecordReaderTests) TestOrdering() { }, } - reader, err := newRecordReader(context.Background(), suite.alloc, suite.cl, &info, suite.clCache, 3) + reader, err := newRecordReader(context.Background(), recordReaderConfig{ + alloc: suite.alloc, + cl: suite.cl, + info: &info, + clientCache: suite.clCache, + bufferSize: 3, + }) suite.NoError(err) defer reader.Release() diff --git a/go/adbc/driver/flightsql/utils.go b/go/adbc/driver/flightsql/utils.go index 4eeddd8eb1..3103c6debe 100644 --- a/go/adbc/driver/flightsql/utils.go +++ b/go/adbc/driver/flightsql/utils.go @@ -85,6 +85,20 @@ func adbcFromFlightStatusWithDetails(err error, header, trailer metadata.MD, con } details := []adbc.ErrorDetail{} + // Surface the canonical gRPC status code and message as their own + // structured detail entries. The same values appear inside the + // formatted Msg (and as "grpc_code"/"grpc_message" slog attributes + // in the structured log stream), but applications that consume + // adbc.Error.Details directly can filter on them without parsing + // the formatted message — and operators triaging a bug report can + // join the host-application error log against the driver's log + // stream on the same key. + details = append(details, + &adbc.TextErrorDetail{Name: "grpc_code", Detail: grpcStatus.Code().String()}) + if gmsg := grpcStatus.Message(); gmsg != "" { + details = append(details, + &adbc.TextErrorDetail{Name: "grpc_message", Detail: gmsg}) + } for _, detail := range grpcStatus.Proto().Details { details = append(details, &anyErrorDetail{name: "grpc-status-details-bin", message: detail}) } @@ -135,13 +149,40 @@ func adbcFromFlightStatusWithDetails(err error, header, trailer metadata.MD, con return adbc.Error{ // People don't read error messages, so backload the context and frontload the server error - Msg: fmt.Sprintf("[FlightSQL] %s (%s; %s)", grpcStatus.Message(), grpcStatus.Code(), fmt.Sprintf(context, args...)), + Msg: fmt.Sprintf("[FlightSQL] %s (%s; %s%s)", grpcStatus.Message(), grpcStatus.Code(), fmt.Sprintf(context, args...), eofHint(err, grpcStatus.Code())), Code: adbcCode, VendorCode: int32(grpcStatus.Code()), Details: details, } } +// eofHint returns a short diagnostic hint that is appended to the error +// message produced by adbcFromFlightStatusWithDetails when the underlying +// failure looks like a server-side stream termination ("error reading from +// server: EOF" with a gRPC Unavailable code). The hint enumerates the most +// common operator-actionable root causes so that the message left in logs +// is self-describing without requiring source-code lookups. It returns an +// empty string for any error that does not match the EOF pattern so we do +// not pollute normal error messages. +func eofHint(err error, code codes.Code) string { + if err == nil { + return "" + } + msg := err.Error() + if code != codes.Unavailable { + return "" + } + // Match on the substring rather than equality because the gRPC error + // chain wraps the underlying transport error in its own message format + // (e.g. "rpc error: code = Unavailable desc = error reading from server: EOF"). + if !strings.Contains(msg, "error reading from server: EOF") && !strings.Contains(msg, "transport is closing") { + return "" + } + return "; possible causes: server crashed or restarted, server-side timeout/idle disconnect, " + + "load balancer or proxy idle timeout, network interruption, server out-of-memory while serving the stream, " + + "or client read timeout shorter than server processing time" +} + func checkContext(maybeErr error, ctx context.Context) error { if maybeErr != nil && !errors.Is(maybeErr, io.EOF) { return maybeErr diff --git a/go/adbc/driver/internal/driverbase/database.go b/go/adbc/driver/internal/driverbase/database.go index 16a4edb807..639ae36f6f 100644 --- a/go/adbc/driver/internal/driverbase/database.go +++ b/go/adbc/driver/internal/driverbase/database.go @@ -126,6 +126,78 @@ func NewDatabaseImplBase(ctx context.Context, driver *DriverImplBase) (DatabaseI return database, err } +// TracingOptions bundles the configuration knobs that a driver may want +// to forward to driverbase's OpenTelemetry tracer initialization without +// requiring each new knob to widen the InitTracing function signature. +// +// All fields are optional. When a field is the zero value the +// corresponding default (typically the OTEL environment variable or a +// platform-appropriate default path) is used. This struct exists so +// drivers can honor database-level options loaded from a TOML profile +// or otherwise supplied through the driver manager. +type TracingOptions struct { + // ExporterName overrides the OTEL_TRACES_EXPORTER environment + // variable. Must match one of the values understood by + // tryParseTraceExporterType ("none", "otlp", "console", "adbcfile") + // when non-empty. Empty falls back to the environment variable. + ExporterName string + + // TracingFolderPath overrides the default on-disk folder used by + // the "adbcfile" exporter (which is otherwise + // "/.adbc/traces"). Ignored for non-file + // exporters. + TracingFolderPath string +} + +// NewDatabaseImplBaseWithExporter instantiates DatabaseImplBase and +// initializes its OpenTelemetry tracer using the supplied exporter name +// instead of (or before falling back to) the OTEL_TRACES_EXPORTER +// environment variable. See NewDatabaseImplBaseWithOptions for a +// constructor that also accepts a custom on-disk folder for the +// "adbcfile" exporter. +func NewDatabaseImplBaseWithExporter( + ctx context.Context, + driver *DriverImplBase, + exporterName string, +) (DatabaseImplBase, error) { + return NewDatabaseImplBaseWithOptions(ctx, driver, TracingOptions{ExporterName: exporterName}) +} + +// NewDatabaseImplBaseWithOptions instantiates DatabaseImplBase and +// initializes its OpenTelemetry tracer using the supplied TracingOptions +// instead of relying solely on environment variables and default paths. +// +// Empty fields on opts fall back to the same defaults that +// NewDatabaseImplBase uses (OTEL_TRACES_EXPORTER for the exporter and +// "/.adbc/traces" for the adbcfile folder). +// +// This constructor exists so drivers can honor database-level options +// (such as adbc.OptionKeyTelemetryTracesExporter and +// adbc.OptionKeyTelemetryTracesFolderPath) loaded from a TOML profile +// or otherwise supplied through the driver manager, without requiring +// the operator to mutate the host process's environment or filesystem +// defaults. +func NewDatabaseImplBaseWithOptions( + ctx context.Context, + driver *DriverImplBase, + opts TracingOptions, +) (DatabaseImplBase, error) { + database := DatabaseImplBase{ + Alloc: driver.Alloc, + ErrorHelper: driver.ErrorHelper, + DriverInfo: driver.DriverInfo, + Logger: nilLogger(), + Tracer: nilTracer(), + } + err := database.InitTracingWithOptions( + ctx, + driver.DriverInfo.GetName(), + getDriverVersion(driver.DriverInfo), + opts, + ) + return database, err +} + func (base *DatabaseImplBase) Base() *DatabaseImplBase { return base } @@ -232,10 +304,54 @@ func (base *database) InitTracing(ctx context.Context, driverName string, driver return base.Base().InitTracing(ctx, driverName, driverVersion) } -func (base *DatabaseImplBase) InitTracing(ctx context.Context, driverName string, driverVersion string) (err error) { +func (base *DatabaseImplBase) InitTracing(ctx context.Context, driverName string, driverVersion string) error { + return base.InitTracingWithOptions(ctx, driverName, driverVersion, TracingOptions{}) +} + +// InitTracingWithExporter initializes the database's OpenTelemetry tracer +// using the supplied exporter name. See InitTracingWithOptions for the +// fully parameterized variant; this helper is preserved for callers that +// only need to override the exporter selection. +func (base *DatabaseImplBase) InitTracingWithExporter( + ctx context.Context, + driverName string, + driverVersion string, + exporterName string, +) error { + return base.InitTracingWithOptions( + ctx, + driverName, + driverVersion, + TracingOptions{ExporterName: exporterName}, + ) +} + +// InitTracingWithOptions initializes the database's OpenTelemetry tracer +// using the supplied TracingOptions. Empty fields fall back to the same +// defaults that InitTracing uses: the OTEL_TRACES_EXPORTER environment +// variable for the exporter selection, and "/.adbc/traces" +// for the on-disk folder used by the "adbcfile" exporter. +// +// An explicit value on opts takes precedence over the corresponding +// default; this lets a driver honor per-database options (loaded from a +// TOML profile or otherwise supplied through the driver manager) +// without mutating the host process's environment or filesystem layout. +func (base *DatabaseImplBase) InitTracingWithOptions( + ctx context.Context, + driverName string, + driverVersion string, + opts TracingOptions, +) (err error) { fullyQualifiedDriverName := driverNamespace + "." + driverName - exporterName := getExporterName() + // An explicit exporter passed in via opts overrides whatever was + // found in the environment; this lets a driver honor a value loaded + // from a TOML profile or supplied as a database option without + // having to mutate the process's environment block. + exporterName := opts.ExporterName + if exporterName == "" { + exporterName = getExporterName() + } // Empty exporter if exporterName == "" { @@ -253,6 +369,7 @@ func (base *DatabaseImplBase) InitTracing(ctx context.Context, driverName string exporterName, base, driverName, + opts, ) if err != nil { return @@ -280,6 +397,7 @@ func getExporters( exporterName string, base *DatabaseImplBase, driverName string, + opts TracingOptions, ) (exporters []sdktrace.SpanExporter, exporterType traceExporterType, err error) { var exporter sdktrace.SpanExporter exporterType, ok := tryParseTraceExporterType(exporterName) @@ -307,7 +425,7 @@ func getExporters( return } case TraceExporterAdbcFile: - exporter, err = newAdbcFileExporter(driverName) + exporter, err = newAdbcFileExporter(driverName, opts.TracingFolderPath) if err != nil { return } @@ -397,9 +515,18 @@ func newOtlpTraceExporters(ctx context.Context) ([]sdktrace.SpanExporter, error) return []sdktrace.SpanExporter{grpcExporter, httpExporter}, nil } -func newAdbcFileExporter(driverName string) (*stdouttrace.Exporter, error) { +func newAdbcFileExporter(driverName, folderPath string) (*stdouttrace.Exporter, error) { fullyQualifiedDriverName := strings.ToLower(driverNamespace + "." + driverName) - fileWriter, err := NewRotatingFileWriter(WithLogNamePrefix(fullyQualifiedDriverName)) + writerOpts := []rotatingFileWriterOption{WithLogNamePrefix(fullyQualifiedDriverName)} + // An explicit folder path takes precedence over the default of + // "/.adbc/traces"; this lets operators route trace + // files into a location their support workflow already collects + // (such as a shared diagnostics share) via a TOML profile or + // database option, without depending on the local default. + if strings.TrimSpace(folderPath) != "" { + writerOpts = append(writerOpts, WithTracingFolderPath(folderPath)) + } + fileWriter, err := NewRotatingFileWriter(writerOpts...) if err != nil { return nil, err } diff --git a/go/adbc/drivermgr/adbc_driver_manager.cc b/go/adbc/drivermgr/adbc_driver_manager.cc index be20a9e3e0..9ac56c83d1 100644 --- a/go/adbc/drivermgr/adbc_driver_manager.cc +++ b/go/adbc/drivermgr/adbc_driver_manager.cc @@ -16,11 +16,19 @@ // under the License. #if defined(_WIN32) -#include // Must come first - +// These version macros gate which Win32 APIs the SDK headers declare. They MUST +// be set before is included -- once windows.h pulls in winnt.h, the +// internal API-availability macros are fixed and later #defines have no effect. +// In particular, SHGetKnownFolderPath (used below) requires _WIN32_WINNT >= 0x0600 +// (Vista). Without this, builds with toolchains that default _WIN32_WINNT below +// Vista (e.g. TDM-GCC 10.x) fail with "SHGetKnownFolderPath was not declared". +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0A00 // Windows 10 +#endif #ifndef NTDDI_VERSION #define NTDDI_VERSION 0x0A00000C // For SHGetKnownFolderPath in ShlObj_core.h in ShlObj.h #endif +#include // Must come first #include #include diff --git a/go/adbc/go.mod b/go/adbc/go.mod index 5e2696ba7c..6a8d0a3ef8 100644 --- a/go/adbc/go.mod +++ b/go/adbc/go.mod @@ -62,6 +62,7 @@ require ( github.com/pierrec/lz4/v4 v4.1.26 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + github.com/stoewer/go-strcase v1.3.1 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/zeebo/xxh3 v1.1.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect diff --git a/go/adbc/go.sum b/go/adbc/go.sum index acc014743b..05c0eab7ba 100644 --- a/go/adbc/go.sum +++ b/go/adbc/go.sum @@ -12,6 +12,8 @@ github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1x github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= @@ -51,14 +53,23 @@ github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOF github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/pierrec/lz4/v4 v4.1.26 h1:GrpZw1gZttORinvzBdXPUXATeqlJjqUG/D87TKMnhjY= github.com/pierrec/lz4/v4 v4.1.26/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/stoewer/go-strcase v1.3.1 h1:iS0MdW+kVTxgMoE1LAZyMiYJFKlOzLooE4MxjirtkAs= +github.com/stoewer/go-strcase v1.3.1/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= @@ -119,6 +130,7 @@ google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= modernc.org/cc/v4 v4.27.3 h1:uNCgn37E5U09mTv1XgskEVUJ8ADKpmFMPxzGJ0TSo+U=