diff --git a/go/logic/applier.go b/go/logic/applier.go index 3bf32e124..3d88acc0a 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -16,6 +16,7 @@ import ( "github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/binlog" + "github.com/github/gh-ost/go/metrics" "github.com/github/gh-ost/go/sql" "context" @@ -893,8 +894,10 @@ func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool return hasFurtherRange, err } + queryStartTime := time.Now() rows, err := apl.db.Query(query, explodedArgs...) if err != nil { + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), err) return hasFurtherRange, err } defer rows.Close() @@ -902,13 +905,16 @@ func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool iterationRangeMaxValues := sql.NewColumnValues(apl.migrationContext.UniqueKey.Len()) for rows.Next() { if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil { + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), err) return hasFurtherRange, err } hasFurtherRange = true } if err = rows.Err(); err != nil { + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), err) return hasFurtherRange, err } + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), nil) if hasFurtherRange { apl.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues return hasFurtherRange, nil @@ -956,7 +962,9 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i if _, err := tx.Exec(sessionQuery); err != nil { return nil, err } + queryStartTime := time.Now() result, err := tx.Exec(query, explodedArgs...) + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "target", "chunk_copy", time.Since(queryStartTime), err) if err != nil { return nil, err } @@ -1669,13 +1677,16 @@ func (apl *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) e // in the batch. SHOW WARNINGS only shows warnings from the last statement in a // multi-statement query, so we interleave SHOW WARNINGS after each DML statement. if apl.migrationContext.PanicOnWarnings { + queryStartTime := time.Now() totalDelta, err = apl.executeBatchWithWarningChecking(ctx, tx, buildResults) + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "target", "binlog_apply", time.Since(queryStartTime), err) if err != nil { return rollback(err) } } else { // Fast path: batch together DML queries into multi-statements to minimize network trips. // We use the raw driver connection to access the rows affected for each statement. + queryStartTime := time.Now() execErr := conn.Raw(func(driverConn any) error { ex := driverConn.(driver.ExecerContext) nvc := driverConn.(driver.NamedValueChecker) @@ -1709,6 +1720,7 @@ func (apl *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) e return nil }) + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "target", "binlog_apply", time.Since(queryStartTime), execErr) if execErr != nil { return rollback(execErr) } diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 96aadd672..982be1d98 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -16,6 +16,7 @@ import ( "time" "github.com/github/gh-ost/go/base" + "github.com/github/gh-ost/go/metrics" "github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/sql" @@ -677,13 +678,16 @@ func (isp *Inspector) CountTableRows(ctx context.Context) error { query := fmt.Sprintf(`select /* gh-ost */ count(*) as count_rows from %s.%s`, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.migrationContext.OriginalTableName)) var rowsEstimate int64 + queryStartTime := time.Now() if err := conn.QueryRowContext(ctx, query).Scan(&rowsEstimate); err != nil { + metrics.RecordQueryDuration(isp.migrationContext.Metrics, "source", "row_count", time.Since(queryStartTime), err) if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { isp.migrationContext.Log.Infof("exact row count cancelled (%s), likely because I'm about to cut over. I'm going to kill that query.", ctx.Err()) return mysql.Kill(isp.db, connectionID) } return err } + metrics.RecordQueryDuration(isp.migrationContext.Metrics, "source", "row_count", time.Since(queryStartTime), nil) // row count query finished. nil out the cancel func, so the main migration thread // doesn't bother calling it after row copy is done. diff --git a/go/metrics/emit.go b/go/metrics/emit.go index ed9c3a506..f0357daca 100644 --- a/go/metrics/emit.go +++ b/go/metrics/emit.go @@ -130,6 +130,18 @@ func EmitThrottleInterval(emit Emitter, duration time.Duration, reason string) { emit.Count("throttle.events_total", 1, tags...) } +// RecordQueryDuration emits gh_ost.query.duration_milliseconds with side/kind/outcome tags. +func RecordQueryDuration(emit Emitter, side string, kind string, duration time.Duration, err error) { + if emit == nil || side == "" || kind == "" || duration < 0 { + return + } + outcome := "ok" + if err != nil { + outcome = "error" + } + emit.Histogram("query.duration_milliseconds", float64(duration.Milliseconds()), "side:"+side, "kind:"+kind, "outcome:"+outcome) +} + // RecordSleep emits per-stage sleep/wait metrics (namespace is applied by the client): // gh_ost.sleep.duration_milliseconds and gh_ost.sleep.total_milliseconds, both tagged by stage. func RecordSleep(emit Emitter, stage string, d time.Duration) { diff --git a/go/metrics/emit_test.go b/go/metrics/emit_test.go index 48de64c8b..657e55099 100644 --- a/go/metrics/emit_test.go +++ b/go/metrics/emit_test.go @@ -7,6 +7,7 @@ package metrics import ( "context" + "errors" "runtime" "slices" "testing" @@ -258,6 +259,49 @@ func TestEmitThrottleIntervalNilSafe(t *testing.T) { EmitThrottleInterval(&gaugeSpy{}, time.Second, "test") } +type histogramSpy struct { + names []string + values []float64 + tags [][]string +} + +func (h *histogramSpy) Gauge(_ string, _ float64, _ ...string) {} + +func (h *histogramSpy) Count(_ string, _ int64, _ ...string) {} + +func (h *histogramSpy) Histogram(name string, value float64, tags ...string) { + h.names = append(h.names, name) + h.values = append(h.values, value) + h.tags = append(h.tags, tags) +} + +func TestRecordQueryDuration(t *testing.T) { + spy := &histogramSpy{} + + RecordQueryDuration(spy, "source", "row_count", 1500*time.Millisecond, nil) + RecordQueryDuration(spy, "target", "binlog_apply", 2*time.Second, errors.New("boom")) + + if len(spy.names) != 2 { + t.Fatalf("got %d histograms, want 2", len(spy.names)) + } + if spy.names[0] != "query.duration_milliseconds" || spy.values[0] != 1500 { + t.Fatalf("got %s=%v, want query.duration_milliseconds=1500", spy.names[0], spy.values[0]) + } + if !slices.Equal(spy.tags[0], []string{"side:source", "kind:row_count", "outcome:ok"}) { + t.Fatalf("got tags %#v", spy.tags[0]) + } + if spy.values[1] != 2000 || !slices.Equal(spy.tags[1], []string{"side:target", "kind:binlog_apply", "outcome:error"}) { + t.Fatalf("got second metric value=%v tags=%#v", spy.values[1], spy.tags[1]) + } +} + +func TestRecordQueryDurationNilSafe(t *testing.T) { + RecordQueryDuration(nil, "source", "row_count", time.Second, nil) + RecordQueryDuration(&histogramSpy{}, "", "row_count", time.Second, nil) + RecordQueryDuration(&histogramSpy{}, "source", "", time.Second, nil) + RecordQueryDuration(&histogramSpy{}, "source", "row_count", -time.Second, nil) +} + type sleepSpy struct { histogramNames []string histogramValues []float64