Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/binlog"
"github.com/github/gh-ost/go/metrics"
"github.com/github/gh-ost/go/sql"

"context"
Expand Down Expand Up @@ -893,22 +894,27 @@ func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool
return hasFurtherRange, err
}

queryStartTime := time.Now()
rows, err := apl.db.Query(query, explodedArgs...)
if err != nil {
metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), err)
return hasFurtherRange, err
}
defer rows.Close()

iterationRangeMaxValues := sql.NewColumnValues(apl.migrationContext.UniqueKey.Len())
for rows.Next() {
if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil {
metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), err)
return hasFurtherRange, err
}
hasFurtherRange = true
}
if err = rows.Err(); err != nil {
metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), err)
return hasFurtherRange, err
}
metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), nil)
if hasFurtherRange {
apl.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
return hasFurtherRange, nil
Expand Down Expand Up @@ -956,7 +962,9 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i
if _, err := tx.Exec(sessionQuery); err != nil {
return nil, err
}
queryStartTime := time.Now()
result, err := tx.Exec(query, explodedArgs...)
metrics.RecordQueryDuration(apl.migrationContext.Metrics, "target", "chunk_copy", time.Since(queryStartTime), err)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1669,13 +1677,16 @@ func (apl *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) e
// in the batch. SHOW WARNINGS only shows warnings from the last statement in a
// multi-statement query, so we interleave SHOW WARNINGS after each DML statement.
if apl.migrationContext.PanicOnWarnings {
queryStartTime := time.Now()
totalDelta, err = apl.executeBatchWithWarningChecking(ctx, tx, buildResults)
metrics.RecordQueryDuration(apl.migrationContext.Metrics, "target", "binlog_apply", time.Since(queryStartTime), err)
if err != nil {
return rollback(err)
}
} else {
// Fast path: batch together DML queries into multi-statements to minimize network trips.
// We use the raw driver connection to access the rows affected for each statement.
queryStartTime := time.Now()
execErr := conn.Raw(func(driverConn any) error {
ex := driverConn.(driver.ExecerContext)
nvc := driverConn.(driver.NamedValueChecker)
Expand Down Expand Up @@ -1709,6 +1720,7 @@ func (apl *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) e
return nil
})

metrics.RecordQueryDuration(apl.migrationContext.Metrics, "target", "binlog_apply", time.Since(queryStartTime), execErr)
if execErr != nil {
return rollback(execErr)
}
Expand Down
4 changes: 4 additions & 0 deletions go/logic/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/metrics"
"github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql"

Expand Down Expand Up @@ -677,13 +678,16 @@ func (isp *Inspector) CountTableRows(ctx context.Context) error {

query := fmt.Sprintf(`select /* gh-ost */ count(*) as count_rows from %s.%s`, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.migrationContext.OriginalTableName))
var rowsEstimate int64
queryStartTime := time.Now()
if err := conn.QueryRowContext(ctx, query).Scan(&rowsEstimate); err != nil {
metrics.RecordQueryDuration(isp.migrationContext.Metrics, "source", "row_count", time.Since(queryStartTime), err)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
isp.migrationContext.Log.Infof("exact row count cancelled (%s), likely because I'm about to cut over. I'm going to kill that query.", ctx.Err())
return mysql.Kill(isp.db, connectionID)
}
return err
}
metrics.RecordQueryDuration(isp.migrationContext.Metrics, "source", "row_count", time.Since(queryStartTime), nil)

// row count query finished. nil out the cancel func, so the main migration thread
// doesn't bother calling it after row copy is done.
Expand Down
12 changes: 12 additions & 0 deletions go/metrics/emit.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,18 @@ func EmitThrottleInterval(emit Emitter, duration time.Duration, reason string) {
emit.Count("throttle.events_total", 1, tags...)
}

// RecordQueryDuration emits gh_ost.query.duration_milliseconds with side/kind/outcome tags.
func RecordQueryDuration(emit Emitter, side string, kind string, duration time.Duration, err error) {
if emit == nil || side == "" || kind == "" || duration < 0 {
return
}
outcome := "ok"
if err != nil {
outcome = "error"
}
emit.Histogram("query.duration_milliseconds", float64(duration.Milliseconds()), "side:"+side, "kind:"+kind, "outcome:"+outcome)
}

// RecordSleep emits per-stage sleep/wait metrics (namespace is applied by the client):
// gh_ost.sleep.duration_milliseconds and gh_ost.sleep.total_milliseconds, both tagged by stage.
func RecordSleep(emit Emitter, stage string, d time.Duration) {
Expand Down
44 changes: 44 additions & 0 deletions go/metrics/emit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package metrics

import (
"context"
"errors"
"runtime"
"slices"
"testing"
Expand Down Expand Up @@ -258,6 +259,49 @@ func TestEmitThrottleIntervalNilSafe(t *testing.T) {
EmitThrottleInterval(&gaugeSpy{}, time.Second, "test")
}

type histogramSpy struct {
names []string
values []float64
tags [][]string
}

func (h *histogramSpy) Gauge(_ string, _ float64, _ ...string) {}

func (h *histogramSpy) Count(_ string, _ int64, _ ...string) {}

func (h *histogramSpy) Histogram(name string, value float64, tags ...string) {
h.names = append(h.names, name)
h.values = append(h.values, value)
h.tags = append(h.tags, tags)
}

func TestRecordQueryDuration(t *testing.T) {
spy := &histogramSpy{}

RecordQueryDuration(spy, "source", "row_count", 1500*time.Millisecond, nil)
RecordQueryDuration(spy, "target", "binlog_apply", 2*time.Second, errors.New("boom"))

if len(spy.names) != 2 {
t.Fatalf("got %d histograms, want 2", len(spy.names))
}
if spy.names[0] != "query.duration_milliseconds" || spy.values[0] != 1500 {
t.Fatalf("got %s=%v, want query.duration_milliseconds=1500", spy.names[0], spy.values[0])
}
if !slices.Equal(spy.tags[0], []string{"side:source", "kind:row_count", "outcome:ok"}) {
t.Fatalf("got tags %#v", spy.tags[0])
}
if spy.values[1] != 2000 || !slices.Equal(spy.tags[1], []string{"side:target", "kind:binlog_apply", "outcome:error"}) {
t.Fatalf("got second metric value=%v tags=%#v", spy.values[1], spy.tags[1])
}
}

func TestRecordQueryDurationNilSafe(t *testing.T) {
RecordQueryDuration(nil, "source", "row_count", time.Second, nil)
RecordQueryDuration(&histogramSpy{}, "", "row_count", time.Second, nil)
RecordQueryDuration(&histogramSpy{}, "source", "", time.Second, nil)
RecordQueryDuration(&histogramSpy{}, "source", "row_count", -time.Second, nil)
}

type sleepSpy struct {
histogramNames []string
histogramValues []float64
Expand Down
Loading