diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d8ef8f..1f2074b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Add `otelriver` hook support for River producer metrics, including `river.job_get_available_duration` and `river.job_get_available_count`. + ## [0.10.0] - 2026-06-06 ### Added diff --git a/otelriver/example_middleware_test.go b/otelriver/example_middleware_test.go index ce18a39..8030a01 100644 --- a/otelriver/example_middleware_test.go +++ b/otelriver/example_middleware_test.go @@ -12,12 +12,19 @@ import ( ) func ExampleMiddleware() { + middleware := otelriver.NewMiddleware(nil) + _, err := river.NewClient(riverpgxv5.New(nil), &river.Config{ + Hooks: []rivertype.Hook{ + // Install the OpenTelemetry middleware as a hook to emit producer + // metrics from this River client. + middleware, + }, Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn, ReplaceAttr: slogutil.NoLevelTime})), Middleware: []rivertype.Middleware{ // Install the OpenTelemetry middleware to run for all jobs inserted // or worked by this River client. - otelriver.NewMiddleware(nil), + middleware, }, TestOnly: true, // suitable only for use in tests; remove for live environments }) diff --git a/otelriver/middleware.go b/otelriver/middleware.go index 63173f3..01bd695 100644 --- a/otelriver/middleware.go +++ b/otelriver/middleware.go @@ -58,9 +58,10 @@ type MiddlewareConfig struct { TracerProvider trace.TracerProvider } -// Middleware is a River middleware that emits OpenTelemetry metrics when jobs -// are inserted or worked. +// Middleware is a River middleware and hook that emits OpenTelemetry traces and +// metrics. type Middleware struct { + river.HookDefaults river.MiddlewareDefaults config *MiddlewareConfig @@ -75,6 +76,8 @@ type middlewareMetrics struct { insertManyCount metric.Int64Counter insertManyDuration metric.Float64Gauge insertManyDurationHistogram metric.Float64Histogram + jobGetAvailableDuration metric.Float64Histogram + jobGetAvailableCount metric.Int64Histogram messagingClientConsumedMessages metric.Int64Counter messagingClientOperationDuration metric.Float64Histogram messagingClientSentMessages metric.Int64Counter @@ -117,6 +120,8 @@ func NewMiddleware(config *MiddlewareConfig) *Middleware { insertManyCount: mustInt64Counter(meter, prefix+"insert_many_count", metric.WithDescription("Number of job batches inserted (all jobs are inserted in a batch, but batches may be one job)"), metric.WithUnit("{job_batch}")), insertManyDuration: mustFloat64Gauge(meter, prefix+"insert_many_duration", metric.WithDescription("Duration of job batch insertion"), metric.WithUnit(durationUnit)), insertManyDurationHistogram: mustFloat64Histogram(meter, prefix+"insert_many_duration_histogram", metric.WithDescription("Duration of job batch insertion (histogram)"), metric.WithUnit(durationUnit)), + jobGetAvailableDuration: mustFloat64Histogram(meter, prefix+"job_get_available_duration", metric.WithDescription("Duration of successful JobGetAvailable calls"), metric.WithUnit(durationUnit)), + jobGetAvailableCount: mustInt64Histogram(meter, prefix+"job_get_available_count", metric.WithDescription("Number of jobs locked by successful JobGetAvailable calls"), metric.WithUnit("{job}")), workCount: mustInt64Counter(meter, prefix+"work_count", metric.WithDescription("Number of jobs worked"), metric.WithUnit("{job}")), workDuration: mustFloat64Gauge(meter, prefix+"work_duration", metric.WithDescription("Duration of job being worked"), metric.WithUnit(durationUnit)), workDurationHistogram: mustFloat64Histogram(meter, prefix+"work_duration_histogram", metric.WithDescription("Duration of job being worked (histogram)"), metric.WithUnit(durationUnit)), @@ -213,6 +218,21 @@ func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.Job return insertRes, err } +func (m *Middleware) MetricEmit(ctx context.Context, params *rivertype.HookMetricEmitParams) { + if params == nil { + return + } + + switch riverMetric := params.Metric.(type) { + case *rivertype.JobGetAvailableDurationMetric: + m.metrics.jobGetAvailableDuration.Record(ctx, m.durationInPreferredUnit(riverMetric.Duration), + metric.WithAttributes(attribute.String("queue", riverMetric.Queue))) + case *rivertype.JobGetAvailableCountMetric: + m.metrics.jobGetAvailableCount.Record(ctx, int64(riverMetric.Count), + metric.WithAttributes(attribute.String("queue", riverMetric.Queue))) + } +} + func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(context.Context) error) error { spanName := prefix + "work" if m.config.EnableWorkSpanJobKindSuffix { @@ -328,6 +348,14 @@ func mustFloat64Histogram(meter metric.Meter, name string, options ...metric.Flo return metric } +func mustInt64Histogram(meter metric.Meter, name string, options ...metric.Int64HistogramOption) metric.Int64Histogram { + metric, err := meter.Int64Histogram(name, options...) + if err != nil { + panic(err) + } + return metric +} + func mustInt64Counter(meter metric.Meter, name string, options ...metric.Int64CounterOption) metric.Int64Counter { metric, err := meter.Int64Counter(name, options...) if err != nil { diff --git a/otelriver/middleware_test.go b/otelriver/middleware_test.go index 9b35758..9b2c905 100644 --- a/otelriver/middleware_test.go +++ b/otelriver/middleware_test.go @@ -22,6 +22,7 @@ import ( // Verify interface compliance. var ( _ rivertype.JobInsertMiddleware = &Middleware{} + _ rivertype.HookMetricEmit = &Middleware{} _ rivertype.WorkerMiddleware = &Middleware{} ) @@ -58,6 +59,68 @@ func TestMiddleware(t *testing.T) { return setupConfig(t, &MiddlewareConfig{}) } + t.Run("MetricJobGetAvailableDuration", func(t *testing.T) { + t.Parallel() + + middleware, bundle := setup(t) + + middleware.MetricEmit(ctx, &rivertype.HookMetricEmitParams{ + Metric: &rivertype.JobGetAvailableDurationMetric{ + Duration: 2500 * time.Millisecond, + Queue: "critical", + }, + }) + + var metrics metricdata.ResourceMetrics + require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) + metric, metricData := requireHistogramCount(t, metrics, "river.job_get_available_duration", 1, + attribute.String("queue", "critical")) + require.Equal(t, "s", metric.Unit) + require.InDelta(t, 2.5, metricData.DataPoints[0].Sum, 0.001) + }) + + t.Run("MetricJobGetAvailableDurationUnitMS", func(t *testing.T) { + t.Parallel() + + middleware, bundle := setupConfig(t, &MiddlewareConfig{ + DurationUnit: "ms", + }) + + middleware.MetricEmit(ctx, &rivertype.HookMetricEmitParams{ + Metric: &rivertype.JobGetAvailableDurationMetric{ + Duration: 2500 * time.Millisecond, + Queue: "critical", + }, + }) + + var metrics metricdata.ResourceMetrics + require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) + metric, metricData := requireHistogramCount(t, metrics, "river.job_get_available_duration", 1, + attribute.String("queue", "critical")) + require.Equal(t, "ms", metric.Unit) + require.InDelta(t, 2500.0, metricData.DataPoints[0].Sum, 0.001) + }) + + t.Run("MetricJobGetAvailableCount", func(t *testing.T) { + t.Parallel() + + middleware, bundle := setup(t) + + middleware.MetricEmit(ctx, &rivertype.HookMetricEmitParams{ + Metric: &rivertype.JobGetAvailableCountMetric{ + Count: 42, + Queue: "critical", + }, + }) + + var metrics metricdata.ResourceMetrics + require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) + metric, metricData := requireInt64HistogramCount(t, metrics, "river.job_get_available_count", 1, + attribute.String("queue", "critical")) + require.Equal(t, "{job}", metric.Unit) + require.EqualValues(t, 42, metricData.DataPoints[0].Sum) + }) + t.Run("InsertManySuccess", func(t *testing.T) { t.Parallel() @@ -827,6 +890,15 @@ func requireHistogramCount(t *testing.T, metrics metricdata.ResourceMetrics, nam return metric, metricData } +func requireInt64HistogramCount(t *testing.T, metrics metricdata.ResourceMetrics, name string, count uint64, attrs ...attribute.KeyValue) (metricdata.Metrics, metricdata.Histogram[int64]) { //nolint:unparam + t.Helper() + + metric, metricData := requireMetric[metricdata.Histogram[int64]](t, metrics, name) + require.Equal(t, count, metricData.DataPoints[0].Count) + metricdatatest.AssertHasAttributes(t, metric, attrs...) + return metric, metricData +} + func requireNoMetric(t *testing.T, metrics metricdata.ResourceMetrics, name string) { t.Helper()