From 629ec4cc7d5bda7596c2b23ca1485ec7687a9908 Mon Sep 17 00:00:00 2001 From: Brandur Date: Fri, 12 Jun 2026 08:38:22 -0500 Subject: [PATCH] Add support for metrics for time to lock jobs and locked count Here, build on the proposal in [1] to add a metric hook to River, and start emitting metrics for the time to lock jobs and the number of jobs locked. Especially the first metric is generally very useful for looking for queue degradation due to dead tuples. [1] https://github.com/riverqueue/river/pull/1285 --- CHANGELOG.md | 4 ++ otelriver/example_middleware_test.go | 9 +++- otelriver/middleware.go | 32 ++++++++++++- otelriver/middleware_test.go | 72 ++++++++++++++++++++++++++++ 4 files changed, 114 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d8ef8f..1f2074b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Add `otelriver` hook support for River producer metrics, including `river.job_get_available_duration` and `river.job_get_available_count`. + ## [0.10.0] - 2026-06-06 ### Added diff --git a/otelriver/example_middleware_test.go b/otelriver/example_middleware_test.go index ce18a39..8030a01 100644 --- a/otelriver/example_middleware_test.go +++ b/otelriver/example_middleware_test.go @@ -12,12 +12,19 @@ import ( ) func ExampleMiddleware() { + middleware := otelriver.NewMiddleware(nil) + _, err := river.NewClient(riverpgxv5.New(nil), &river.Config{ + Hooks: []rivertype.Hook{ + // Install the OpenTelemetry middleware as a hook to emit producer + // metrics from this River client. + middleware, + }, Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn, ReplaceAttr: slogutil.NoLevelTime})), Middleware: []rivertype.Middleware{ // Install the OpenTelemetry middleware to run for all jobs inserted // or worked by this River client. - otelriver.NewMiddleware(nil), + middleware, }, TestOnly: true, // suitable only for use in tests; remove for live environments }) diff --git a/otelriver/middleware.go b/otelriver/middleware.go index 63173f3..01bd695 100644 --- a/otelriver/middleware.go +++ b/otelriver/middleware.go @@ -58,9 +58,10 @@ type MiddlewareConfig struct { TracerProvider trace.TracerProvider } -// Middleware is a River middleware that emits OpenTelemetry metrics when jobs -// are inserted or worked. +// Middleware is a River middleware and hook that emits OpenTelemetry traces and +// metrics. type Middleware struct { + river.HookDefaults river.MiddlewareDefaults config *MiddlewareConfig @@ -75,6 +76,8 @@ type middlewareMetrics struct { insertManyCount metric.Int64Counter insertManyDuration metric.Float64Gauge insertManyDurationHistogram metric.Float64Histogram + jobGetAvailableDuration metric.Float64Histogram + jobGetAvailableCount metric.Int64Histogram messagingClientConsumedMessages metric.Int64Counter messagingClientOperationDuration metric.Float64Histogram messagingClientSentMessages metric.Int64Counter @@ -117,6 +120,8 @@ func NewMiddleware(config *MiddlewareConfig) *Middleware { insertManyCount: mustInt64Counter(meter, prefix+"insert_many_count", metric.WithDescription("Number of job batches inserted (all jobs are inserted in a batch, but batches may be one job)"), metric.WithUnit("{job_batch}")), insertManyDuration: mustFloat64Gauge(meter, prefix+"insert_many_duration", metric.WithDescription("Duration of job batch insertion"), metric.WithUnit(durationUnit)), insertManyDurationHistogram: mustFloat64Histogram(meter, prefix+"insert_many_duration_histogram", metric.WithDescription("Duration of job batch insertion (histogram)"), metric.WithUnit(durationUnit)), + jobGetAvailableDuration: mustFloat64Histogram(meter, prefix+"job_get_available_duration", metric.WithDescription("Duration of successful JobGetAvailable calls"), metric.WithUnit(durationUnit)), + jobGetAvailableCount: mustInt64Histogram(meter, prefix+"job_get_available_count", metric.WithDescription("Number of jobs locked by successful JobGetAvailable calls"), metric.WithUnit("{job}")), workCount: mustInt64Counter(meter, prefix+"work_count", metric.WithDescription("Number of jobs worked"), metric.WithUnit("{job}")), workDuration: mustFloat64Gauge(meter, prefix+"work_duration", metric.WithDescription("Duration of job being worked"), metric.WithUnit(durationUnit)), workDurationHistogram: mustFloat64Histogram(meter, prefix+"work_duration_histogram", metric.WithDescription("Duration of job being worked (histogram)"), metric.WithUnit(durationUnit)), @@ -213,6 +218,21 @@ func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.Job return insertRes, err } +func (m *Middleware) MetricEmit(ctx context.Context, params *rivertype.HookMetricEmitParams) { + if params == nil { + return + } + + switch riverMetric := params.Metric.(type) { + case *rivertype.JobGetAvailableDurationMetric: + m.metrics.jobGetAvailableDuration.Record(ctx, m.durationInPreferredUnit(riverMetric.Duration), + metric.WithAttributes(attribute.String("queue", riverMetric.Queue))) + case *rivertype.JobGetAvailableCountMetric: + m.metrics.jobGetAvailableCount.Record(ctx, int64(riverMetric.Count), + metric.WithAttributes(attribute.String("queue", riverMetric.Queue))) + } +} + func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(context.Context) error) error { spanName := prefix + "work" if m.config.EnableWorkSpanJobKindSuffix { @@ -328,6 +348,14 @@ func mustFloat64Histogram(meter metric.Meter, name string, options ...metric.Flo return metric } +func mustInt64Histogram(meter metric.Meter, name string, options ...metric.Int64HistogramOption) metric.Int64Histogram { + metric, err := meter.Int64Histogram(name, options...) + if err != nil { + panic(err) + } + return metric +} + func mustInt64Counter(meter metric.Meter, name string, options ...metric.Int64CounterOption) metric.Int64Counter { metric, err := meter.Int64Counter(name, options...) if err != nil { diff --git a/otelriver/middleware_test.go b/otelriver/middleware_test.go index 9b35758..9b2c905 100644 --- a/otelriver/middleware_test.go +++ b/otelriver/middleware_test.go @@ -22,6 +22,7 @@ import ( // Verify interface compliance. var ( _ rivertype.JobInsertMiddleware = &Middleware{} + _ rivertype.HookMetricEmit = &Middleware{} _ rivertype.WorkerMiddleware = &Middleware{} ) @@ -58,6 +59,68 @@ func TestMiddleware(t *testing.T) { return setupConfig(t, &MiddlewareConfig{}) } + t.Run("MetricJobGetAvailableDuration", func(t *testing.T) { + t.Parallel() + + middleware, bundle := setup(t) + + middleware.MetricEmit(ctx, &rivertype.HookMetricEmitParams{ + Metric: &rivertype.JobGetAvailableDurationMetric{ + Duration: 2500 * time.Millisecond, + Queue: "critical", + }, + }) + + var metrics metricdata.ResourceMetrics + require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) + metric, metricData := requireHistogramCount(t, metrics, "river.job_get_available_duration", 1, + attribute.String("queue", "critical")) + require.Equal(t, "s", metric.Unit) + require.InDelta(t, 2.5, metricData.DataPoints[0].Sum, 0.001) + }) + + t.Run("MetricJobGetAvailableDurationUnitMS", func(t *testing.T) { + t.Parallel() + + middleware, bundle := setupConfig(t, &MiddlewareConfig{ + DurationUnit: "ms", + }) + + middleware.MetricEmit(ctx, &rivertype.HookMetricEmitParams{ + Metric: &rivertype.JobGetAvailableDurationMetric{ + Duration: 2500 * time.Millisecond, + Queue: "critical", + }, + }) + + var metrics metricdata.ResourceMetrics + require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) + metric, metricData := requireHistogramCount(t, metrics, "river.job_get_available_duration", 1, + attribute.String("queue", "critical")) + require.Equal(t, "ms", metric.Unit) + require.InDelta(t, 2500.0, metricData.DataPoints[0].Sum, 0.001) + }) + + t.Run("MetricJobGetAvailableCount", func(t *testing.T) { + t.Parallel() + + middleware, bundle := setup(t) + + middleware.MetricEmit(ctx, &rivertype.HookMetricEmitParams{ + Metric: &rivertype.JobGetAvailableCountMetric{ + Count: 42, + Queue: "critical", + }, + }) + + var metrics metricdata.ResourceMetrics + require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) + metric, metricData := requireInt64HistogramCount(t, metrics, "river.job_get_available_count", 1, + attribute.String("queue", "critical")) + require.Equal(t, "{job}", metric.Unit) + require.EqualValues(t, 42, metricData.DataPoints[0].Sum) + }) + t.Run("InsertManySuccess", func(t *testing.T) { t.Parallel() @@ -827,6 +890,15 @@ func requireHistogramCount(t *testing.T, metrics metricdata.ResourceMetrics, nam return metric, metricData } +func requireInt64HistogramCount(t *testing.T, metrics metricdata.ResourceMetrics, name string, count uint64, attrs ...attribute.KeyValue) (metricdata.Metrics, metricdata.Histogram[int64]) { //nolint:unparam + t.Helper() + + metric, metricData := requireMetric[metricdata.Histogram[int64]](t, metrics, name) + require.Equal(t, count, metricData.DataPoints[0].Count) + metricdatatest.AssertHasAttributes(t, metric, attrs...) + return metric, metricData +} + func requireNoMetric(t *testing.T, metrics metricdata.ResourceMetrics, name string) { t.Helper()