From 2a72155ed2e8e42a70883fc52d16551d6acc891f Mon Sep 17 00:00:00 2001 From: Jan Grodowski Date: Tue, 2 Jun 2026 11:33:29 +0200 Subject: [PATCH] Emit query latency metrics Add a query duration helper and instrument source/target query paths with millisecond latency histograms. Metrics emitted: - query.duration_milliseconds tagged with side, kind, and outcome Initial coverage includes chunk copy, binlog apply, range select, and exact row count queries. This lets dashboards compare source and target query latency side-by-side and break down latency by query kind. Add unit coverage for the query duration metric helper. --- go/logic/applier.go | 12 +++++++++++ go/logic/inspect.go | 4 ++++ go/metrics/emit.go | 12 +++++++++++ go/metrics/emit_test.go | 45 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 73 insertions(+) diff --git a/go/logic/applier.go b/go/logic/applier.go index 3bf32e124..3d88acc0a 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -16,6 +16,7 @@ import ( "github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/binlog" + "github.com/github/gh-ost/go/metrics" "github.com/github/gh-ost/go/sql" "context" @@ -893,8 +894,10 @@ func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool return hasFurtherRange, err } + queryStartTime := time.Now() rows, err := apl.db.Query(query, explodedArgs...) if err != nil { + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), err) return hasFurtherRange, err } defer rows.Close() @@ -902,13 +905,16 @@ func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool iterationRangeMaxValues := sql.NewColumnValues(apl.migrationContext.UniqueKey.Len()) for rows.Next() { if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil { + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), err) return hasFurtherRange, err } hasFurtherRange = true } if err = rows.Err(); err != nil { + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), err) return hasFurtherRange, err } + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), nil) if hasFurtherRange { apl.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues return hasFurtherRange, nil @@ -956,7 +962,9 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i if _, err := tx.Exec(sessionQuery); err != nil { return nil, err } + queryStartTime := time.Now() result, err := tx.Exec(query, explodedArgs...) + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "target", "chunk_copy", time.Since(queryStartTime), err) if err != nil { return nil, err } @@ -1669,13 +1677,16 @@ func (apl *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) e // in the batch. SHOW WARNINGS only shows warnings from the last statement in a // multi-statement query, so we interleave SHOW WARNINGS after each DML statement. if apl.migrationContext.PanicOnWarnings { + queryStartTime := time.Now() totalDelta, err = apl.executeBatchWithWarningChecking(ctx, tx, buildResults) + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "target", "binlog_apply", time.Since(queryStartTime), err) if err != nil { return rollback(err) } } else { // Fast path: batch together DML queries into multi-statements to minimize network trips. // We use the raw driver connection to access the rows affected for each statement. + queryStartTime := time.Now() execErr := conn.Raw(func(driverConn any) error { ex := driverConn.(driver.ExecerContext) nvc := driverConn.(driver.NamedValueChecker) @@ -1709,6 +1720,7 @@ func (apl *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) e return nil }) + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "target", "binlog_apply", time.Since(queryStartTime), execErr) if execErr != nil { return rollback(execErr) } diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 96aadd672..982be1d98 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -16,6 +16,7 @@ import ( "time" "github.com/github/gh-ost/go/base" + "github.com/github/gh-ost/go/metrics" "github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/sql" @@ -677,13 +678,16 @@ func (isp *Inspector) CountTableRows(ctx context.Context) error { query := fmt.Sprintf(`select /* gh-ost */ count(*) as count_rows from %s.%s`, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.migrationContext.OriginalTableName)) var rowsEstimate int64 + queryStartTime := time.Now() if err := conn.QueryRowContext(ctx, query).Scan(&rowsEstimate); err != nil { + metrics.RecordQueryDuration(isp.migrationContext.Metrics, "source", "row_count", time.Since(queryStartTime), err) if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { isp.migrationContext.Log.Infof("exact row count cancelled (%s), likely because I'm about to cut over. I'm going to kill that query.", ctx.Err()) return mysql.Kill(isp.db, connectionID) } return err } + metrics.RecordQueryDuration(isp.migrationContext.Metrics, "source", "row_count", time.Since(queryStartTime), nil) // row count query finished. nil out the cancel func, so the main migration thread // doesn't bother calling it after row copy is done. diff --git a/go/metrics/emit.go b/go/metrics/emit.go index 86dc97556..e9952ecd0 100644 --- a/go/metrics/emit.go +++ b/go/metrics/emit.go @@ -129,3 +129,15 @@ func EmitThrottleInterval(emit Emitter, duration time.Duration, reason string) { emit.Histogram("throttle.duration_seconds", duration.Seconds(), tags...) emit.Count("throttle.events_total", 1, tags...) } + +// RecordQueryDuration emits gh_ost.query.duration_milliseconds with side/kind/outcome tags. +func RecordQueryDuration(emit Emitter, side string, kind string, duration time.Duration, err error) { + if emit == nil || side == "" || kind == "" || duration < 0 { + return + } + outcome := "ok" + if err != nil { + outcome = "error" + } + emit.Histogram("query.duration_milliseconds", float64(duration.Milliseconds()), "side:"+side, "kind:"+kind, "outcome:"+outcome) +} diff --git a/go/metrics/emit_test.go b/go/metrics/emit_test.go index 695d809d9..a9d4f888f 100644 --- a/go/metrics/emit_test.go +++ b/go/metrics/emit_test.go @@ -7,7 +7,9 @@ package metrics import ( "context" + "errors" "runtime" + "slices" "testing" "time" ) @@ -256,3 +258,46 @@ func TestEmitThrottleIntervalNilSafe(t *testing.T) { EmitThrottleInterval(nil, time.Second, "test") EmitThrottleInterval(&gaugeSpy{}, time.Second, "test") } + +type histogramSpy struct { + names []string + values []float64 + tags [][]string +} + +func (h *histogramSpy) Gauge(_ string, _ float64, _ ...string) {} + +func (h *histogramSpy) Count(_ string, _ int64, _ ...string) {} + +func (h *histogramSpy) Histogram(name string, value float64, tags ...string) { + h.names = append(h.names, name) + h.values = append(h.values, value) + h.tags = append(h.tags, tags) +} + +func TestRecordQueryDuration(t *testing.T) { + spy := &histogramSpy{} + + RecordQueryDuration(spy, "source", "row_count", 1500*time.Millisecond, nil) + RecordQueryDuration(spy, "target", "binlog_apply", 2*time.Second, errors.New("boom")) + + if len(spy.names) != 2 { + t.Fatalf("got %d histograms, want 2", len(spy.names)) + } + if spy.names[0] != "query.duration_milliseconds" || spy.values[0] != 1500 { + t.Fatalf("got %s=%v, want query.duration_milliseconds=1500", spy.names[0], spy.values[0]) + } + if !slices.Equal(spy.tags[0], []string{"side:source", "kind:row_count", "outcome:ok"}) { + t.Fatalf("got tags %#v", spy.tags[0]) + } + if spy.values[1] != 2000 || !slices.Equal(spy.tags[1], []string{"side:target", "kind:binlog_apply", "outcome:error"}) { + t.Fatalf("got second metric value=%v tags=%#v", spy.values[1], spy.tags[1]) + } +} + +func TestRecordQueryDurationNilSafe(t *testing.T) { + RecordQueryDuration(nil, "source", "row_count", time.Second, nil) + RecordQueryDuration(&histogramSpy{}, "", "row_count", time.Second, nil) + RecordQueryDuration(&histogramSpy{}, "source", "", time.Second, nil) + RecordQueryDuration(&histogramSpy{}, "source", "row_count", -time.Second, nil) +}