From cbd5c1a425d687ab7841b0a4c94ab034e6221aef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcin=20Mat=C5=82aszek?= Date: Thu, 26 Mar 2026 13:07:23 +0100 Subject: [PATCH 1/7] fix: decouple Beholder observation metrics from Prometheus scrape cycle --- .../observation_metrics_collector.go | 44 +++++++++++++++- .../observation_metrics_collector_test.go | 51 +++++++++++++++++++ .../capabilities/ccip/oraclecreator/plugin.go | 2 + 3 files changed, 96 insertions(+), 1 deletion(-) diff --git a/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go b/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go index ba64904c964..6fedc5c0715 100644 --- a/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go +++ b/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go @@ -6,6 +6,7 @@ import ( "math" "strings" "sync/atomic" + "time" "github.com/prometheus/client_golang/prometheus" prometheus_dto "github.com/prometheus/client_model/go" @@ -13,6 +14,10 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" ) +// defaultPollingInterval is the interval at which the collector polls counters and publishes +// deltas to Beholder, independent of Prometheus scrapes. +const defaultPollingInterval = 10 * time.Second + // ObservationMetricsPublisher is the interface for publishing observation metrics to external destinations type ObservationMetricsPublisher interface { PublishMetric(ctx context.Context, metricName string, value float64, labels map[string]string) @@ -22,6 +27,7 @@ type ObservationMetricsPublisher interface { type ObservationMetricsCollector struct { logger logger.Logger publisher ObservationMetricsPublisher + ctx context.Context cancel context.CancelFunc constantLabels map[string]string // Prometheus labels (for WrapRegistererWith) beholderLabels map[string]string // Beholder labels (for metrics publishing) @@ -38,10 +44,13 @@ func NewObservationMetricsCollector( constantLabels map[string]string, beholderLabels map[string]string, ) *ObservationMetricsCollector { + ctx, cancel := context.WithCancel(context.Background()) + collector := &ObservationMetricsCollector{ logger: logger, publisher: publisher, - cancel: func() {}, + ctx: ctx, + cancel: cancel, constantLabels: constantLabels, beholderLabels: beholderLabels, } @@ -57,6 +66,39 @@ func (c *ObservationMetricsCollector) CreateWrappedRegisterer(baseRegisterer pro } } +// Start launches a background goroutine that polls the wrapped counters on the given interval +// and publishes deltas to Beholder, independent of Prometheus scrapes. +// Call Start after the wrapped registerer has been passed to libocr (i.e. after NewOracle), +// so that the counters are already registered before the first poll fires. +func (c *ObservationMetricsCollector) Start(interval time.Duration) { + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + c.poll() + case <-c.ctx.Done(): + return + } + } + }() +} + +// poll reads the current value of each wrapped counter and publishes any delta to Beholder. +// It reuses the existing Collect logic on wrappedCounter, which handles delta tracking. +func (c *ObservationMetricsCollector) poll() { + // A small buffered channel to absorb the forwarded prometheus.Metric values. + // Each counter emits exactly one metric per Collect call, so 10 is more than enough. + devNull := make(chan prometheus.Metric, 10) + if c.sentObservationsCounter != nil { + c.sentObservationsCounter.Collect(devNull) + } + if c.includedObservationsCounter != nil { + c.includedObservationsCounter.Collect(devNull) + } +} + // Close stops the collector func (c *ObservationMetricsCollector) Close() error { c.cancel() diff --git a/core/capabilities/ccip/oraclecreator/observation_metrics_collector_test.go b/core/capabilities/ccip/oraclecreator/observation_metrics_collector_test.go index 741ba31cd3d..93bc7a9a065 100644 --- a/core/capabilities/ccip/oraclecreator/observation_metrics_collector_test.go +++ b/core/capabilities/ccip/oraclecreator/observation_metrics_collector_test.go @@ -447,6 +447,57 @@ func TestObservationMetricsCollector_NonTargetMetrics(t *testing.T) { assert.Nil(t, collector.includedObservationsCounter) } +// TestObservationMetricsCollector_BackgroundPolling verifies that Start publishes metrics +// on a timer without any explicit Collect call from the outside (i.e. without a Prometheus scrape). +func TestObservationMetricsCollector_BackgroundPolling(t *testing.T) { + lggr, err := logger.New() + require.NoError(t, err) + + mockPub := &mockPublisher{} + collector := NewObservationMetricsCollector(lggr, mockPub, + map[string]string{"name": "commit-1234"}, + map[string]string{"pluginType": "commit"}, + ) + defer func() { _ = collector.Close() }() + + registry := prometheus.NewRegistry() + wrappedRegisterer := collector.CreateWrappedRegisterer(registry) + + sentCounter := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "ocr3_sent_observations_total", + Help: "Test counter", + }) + require.NoError(t, wrappedRegisterer.Register(sentCounter)) + + sentCounter.Add(3) + + // Start with a short interval — no external Collect call is made. + const pollInterval = 50 * time.Millisecond + collector.Start(pollInterval) + + // Wait for at least one poll to fire. + require.Eventually(t, func() bool { + return len(mockPub.getMetrics()) > 0 + }, 500*time.Millisecond, 10*time.Millisecond) + + metrics := mockPub.getMetrics() + require.Len(t, metrics, 1) + assert.Equal(t, "ocr3_sent_observations_total", metrics[0].name) + assert.InEpsilon(t, 3.0, metrics[0].value, 0.01) + + // Subsequent polls with no new increments should not publish. + time.Sleep(3 * pollInterval) + assert.Len(t, mockPub.getMetrics(), 1, "no new publishes expected when counter has not changed") + + // A new increment should be picked up on the next poll. + sentCounter.Inc() + require.Eventually(t, func() bool { + return len(mockPub.getMetrics()) >= 2 + }, 500*time.Millisecond, 10*time.Millisecond) + + assert.InEpsilon(t, 1.0, mockPub.getMetrics()[1].value, 0.01) +} + // TestObservationMetricsCollector_Close verifies proper cleanup func TestObservationMetricsCollector_Close(t *testing.T) { lggr, err := logger.New() diff --git a/core/capabilities/ccip/oraclecreator/plugin.go b/core/capabilities/ccip/oraclecreator/plugin.go index ef2a9c12250..e9d9a120f1e 100644 --- a/core/capabilities/ccip/oraclecreator/plugin.go +++ b/core/capabilities/ccip/oraclecreator/plugin.go @@ -315,6 +315,8 @@ func (i *pluginOracleCreator) Create(ctx context.Context, donID uint32, config c } // Add metrics collector to closers so it's properly shut down closers = append(closers, i.metricsCollector) + // Start the background polling loop after NewOracle so all counters are already registered. + i.metricsCollector.Start(defaultPollingInterval) return newWrappedOracle(oracle, closers), nil } From 5d40b591231afee0c1204f6b315aebdd57784f63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcin=20Mat=C5=82aszek?= Date: Thu, 26 Mar 2026 13:38:42 +0100 Subject: [PATCH 2/7] Address review comments --- .../observation_metrics_collector.go | 60 ++++++++++--------- .../observation_metrics_collector_test.go | 40 +++++++++++++ 2 files changed, 72 insertions(+), 28 deletions(-) diff --git a/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go b/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go index 6fedc5c0715..f20b538d3b0 100644 --- a/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go +++ b/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go @@ -5,6 +5,7 @@ import ( "errors" "math" "strings" + "sync" "sync/atomic" "time" @@ -27,8 +28,8 @@ type ObservationMetricsPublisher interface { type ObservationMetricsCollector struct { logger logger.Logger publisher ObservationMetricsPublisher - ctx context.Context - cancel context.CancelFunc + stop chan struct{} + stopOnce sync.Once constantLabels map[string]string // Prometheus labels (for WrapRegistererWith) beholderLabels map[string]string // Beholder labels (for metrics publishing) @@ -44,13 +45,10 @@ func NewObservationMetricsCollector( constantLabels map[string]string, beholderLabels map[string]string, ) *ObservationMetricsCollector { - ctx, cancel := context.WithCancel(context.Background()) - collector := &ObservationMetricsCollector{ logger: logger, publisher: publisher, - ctx: ctx, - cancel: cancel, + stop: make(chan struct{}), constantLabels: constantLabels, beholderLabels: beholderLabels, } @@ -71,6 +69,9 @@ func (c *ObservationMetricsCollector) CreateWrappedRegisterer(baseRegisterer pro // Call Start after the wrapped registerer has been passed to libocr (i.e. after NewOracle), // so that the counters are already registered before the first poll fires. func (c *ObservationMetricsCollector) Start(interval time.Duration) { + if interval <= 0 { + interval = defaultPollingInterval + } go func() { ticker := time.NewTicker(interval) defer ticker.Stop() @@ -78,7 +79,7 @@ func (c *ObservationMetricsCollector) Start(interval time.Duration) { select { case <-ticker.C: c.poll() - case <-c.ctx.Done(): + case <-c.stop: return } } @@ -99,9 +100,9 @@ func (c *ObservationMetricsCollector) poll() { } } -// Close stops the collector +// Close stops the background polling goroutine. Safe to call multiple times. func (c *ObservationMetricsCollector) Close() error { - c.cancel() + c.stopOnce.Do(func() { close(c.stop) }) return nil } @@ -132,26 +133,29 @@ func (w *wrappedCounter) Collect(ch chan<- prometheus.Metric) { // Try to extract the counter value from the metric var metricValue float64 if err := extractCounterValue(m, &metricValue); err == nil { - // Load the last value atomically - lastBits := atomic.LoadUint64(&w.lastValueBits) - lastValue := math.Float64frombits(lastBits) - - if metricValue > lastValue { - delta := metricValue - lastValue - // Store the new value atomically - atomic.StoreUint64(&w.lastValueBits, math.Float64bits(metricValue)) - - w.logger.Debugw("Observation metric incremented", - "metric", w.metricName, - "value", metricValue, - "delta", delta, - "labels", w.labels, - ) - - if w.publisher != nil { - // Publish the delta, not the cumulative value - w.publisher.PublishMetric(context.Background(), w.metricName, delta, w.labels) + // CAS loop: ensures only one concurrent caller (background poll or Prometheus + // scrape) advances lastValueBits and publishes the delta for a given interval. + for { + lastBits := atomic.LoadUint64(&w.lastValueBits) + lastValue := math.Float64frombits(lastBits) + if metricValue <= lastValue { + break + } + newBits := math.Float64bits(metricValue) + if atomic.CompareAndSwapUint64(&w.lastValueBits, lastBits, newBits) { + delta := metricValue - lastValue + w.logger.Debugw("Observation metric incremented", + "metric", w.metricName, + "value", metricValue, + "delta", delta, + "labels", w.labels, + ) + if w.publisher != nil { + w.publisher.PublishMetric(context.Background(), w.metricName, delta, w.labels) + } + break } + // CAS failed — another concurrent Collect advanced lastValueBits; retry. } } diff --git a/core/capabilities/ccip/oraclecreator/observation_metrics_collector_test.go b/core/capabilities/ccip/oraclecreator/observation_metrics_collector_test.go index 93bc7a9a065..d18c359721b 100644 --- a/core/capabilities/ccip/oraclecreator/observation_metrics_collector_test.go +++ b/core/capabilities/ccip/oraclecreator/observation_metrics_collector_test.go @@ -498,6 +498,46 @@ func TestObservationMetricsCollector_BackgroundPolling(t *testing.T) { assert.InEpsilon(t, 1.0, mockPub.getMetrics()[1].value, 0.01) } +// TestObservationMetricsCollector_CloseStopsPolling verifies that Close stops the background +// goroutine and no further publishes occur even if the counter keeps incrementing. +func TestObservationMetricsCollector_CloseStopsPolling(t *testing.T) { + lggr, err := logger.New() + require.NoError(t, err) + + mockPub := &mockPublisher{} + collector := NewObservationMetricsCollector(lggr, mockPub, + map[string]string{"name": "commit-1234"}, + map[string]string{"pluginType": "commit"}, + ) + + registry := prometheus.NewRegistry() + wrappedRegisterer := collector.CreateWrappedRegisterer(registry) + + sentCounter := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "ocr3_sent_observations_total", + Help: "Test counter", + }) + require.NoError(t, wrappedRegisterer.Register(sentCounter)) + + sentCounter.Inc() + + const pollInterval = 50 * time.Millisecond + collector.Start(pollInterval) + + // Wait for the first publish. + require.Eventually(t, func() bool { + return len(mockPub.getMetrics()) > 0 + }, 500*time.Millisecond, 10*time.Millisecond) + + require.NoError(t, collector.Close()) + + // Increment again after Close — the poller should no longer be running. + sentCounter.Inc() + time.Sleep(3 * pollInterval) + + assert.Len(t, mockPub.getMetrics(), 1, "no new publishes expected after Close") +} + // TestObservationMetricsCollector_Close verifies proper cleanup func TestObservationMetricsCollector_Close(t *testing.T) { lggr, err := logger.New() From 961497de5356baa0c101828e69a40f8d7e5c1aec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcin=20Mat=C5=82aszek?= Date: Tue, 31 Mar 2026 17:12:49 +0200 Subject: [PATCH 3/7] Add startOnce --- .../observation_metrics_collector.go | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go b/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go index f20b538d3b0..b8a3cde8bf5 100644 --- a/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go +++ b/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go @@ -29,6 +29,7 @@ type ObservationMetricsCollector struct { logger logger.Logger publisher ObservationMetricsPublisher stop chan struct{} + startOnce sync.Once stopOnce sync.Once constantLabels map[string]string // Prometheus labels (for WrapRegistererWith) beholderLabels map[string]string // Beholder labels (for metrics publishing) @@ -68,22 +69,25 @@ func (c *ObservationMetricsCollector) CreateWrappedRegisterer(baseRegisterer pro // and publishes deltas to Beholder, independent of Prometheus scrapes. // Call Start after the wrapped registerer has been passed to libocr (i.e. after NewOracle), // so that the counters are already registered before the first poll fires. +// Safe to call multiple times; only the first call starts the goroutine. func (c *ObservationMetricsCollector) Start(interval time.Duration) { if interval <= 0 { interval = defaultPollingInterval } - go func() { - ticker := time.NewTicker(interval) - defer ticker.Stop() - for { - select { - case <-ticker.C: - c.poll() - case <-c.stop: - return + c.startOnce.Do(func() { + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + c.poll() + case <-c.stop: + return + } } - } - }() + }() + }) } // poll reads the current value of each wrapped counter and publishes any delta to Beholder. From ed8bbc2156924822d9ecb71aaf948ffcb0b12d01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcin=20Mat=C5=82aszek?= Date: Tue, 31 Mar 2026 17:23:09 +0200 Subject: [PATCH 4/7] Remove the unnecessary CAS loop --- .../observation_metrics_collector.go | 77 ++++++++----------- 1 file changed, 30 insertions(+), 47 deletions(-) diff --git a/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go b/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go index b8a3cde8bf5..c5f4b1274d2 100644 --- a/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go +++ b/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go @@ -3,10 +3,8 @@ package oraclecreator import ( "context" "errors" - "math" "strings" "sync" - "sync/atomic" "time" "github.com/prometheus/client_golang/prometheus" @@ -91,16 +89,12 @@ func (c *ObservationMetricsCollector) Start(interval time.Duration) { } // poll reads the current value of each wrapped counter and publishes any delta to Beholder. -// It reuses the existing Collect logic on wrappedCounter, which handles delta tracking. func (c *ObservationMetricsCollector) poll() { - // A small buffered channel to absorb the forwarded prometheus.Metric values. - // Each counter emits exactly one metric per Collect call, so 10 is more than enough. - devNull := make(chan prometheus.Metric, 10) if c.sentObservationsCounter != nil { - c.sentObservationsCounter.Collect(devNull) + c.sentObservationsCounter.readAndPublish() } if c.includedObservationsCounter != nil { - c.includedObservationsCounter.Collect(devNull) + c.includedObservationsCounter.readAndPublish() } } @@ -118,53 +112,42 @@ type wrappedCounter struct { labels map[string]string // Beholder labels (for metrics publishing) publisher ObservationMetricsPublisher logger logger.Logger - lastValueBits uint64 // stores float64 as bits for atomic operations + lastValue float64 + mu sync.Mutex } -// Collect intercepts metric collection to detect counter increments -func (w *wrappedCounter) Collect(ch chan<- prometheus.Metric) { - // Create a channel to intercept metrics - interceptCh := make(chan prometheus.Metric, 10) - - // Collect from the underlying collector +// readAndPublish reads the current counter value and publishes any delta to Beholder. +// Only called by the background poller, never by Prometheus scrapes, so no concurrent +// access to lastValue occurs and no CAS is needed. +func (w *wrappedCounter) readAndPublish() { + ch := make(chan prometheus.Metric, 1) go func() { - w.Collector.Collect(interceptCh) - close(interceptCh) + w.Collector.Collect(ch) + close(ch) }() - // Forward metrics and track counter value - for m := range interceptCh { - // Try to extract the counter value from the metric + for m := range ch { var metricValue float64 - if err := extractCounterValue(m, &metricValue); err == nil { - // CAS loop: ensures only one concurrent caller (background poll or Prometheus - // scrape) advances lastValueBits and publishes the delta for a given interval. - for { - lastBits := atomic.LoadUint64(&w.lastValueBits) - lastValue := math.Float64frombits(lastBits) - if metricValue <= lastValue { - break - } - newBits := math.Float64bits(metricValue) - if atomic.CompareAndSwapUint64(&w.lastValueBits, lastBits, newBits) { - delta := metricValue - lastValue - w.logger.Debugw("Observation metric incremented", - "metric", w.metricName, - "value", metricValue, - "delta", delta, - "labels", w.labels, - ) - if w.publisher != nil { - w.publisher.PublishMetric(context.Background(), w.metricName, delta, w.labels) - } - break - } - // CAS failed — another concurrent Collect advanced lastValueBits; retry. + if err := extractCounterValue(m, &metricValue); err != nil { + continue + } + w.mu.Lock() + delta := metricValue - w.lastValue + if delta > 0 { + w.lastValue = metricValue + w.mu.Unlock() + w.logger.Debugw("Observation metric incremented", + "metric", w.metricName, + "value", metricValue, + "delta", delta, + "labels", w.labels, + ) + if w.publisher != nil { + w.publisher.PublishMetric(context.Background(), w.metricName, delta, w.labels) } + } else { + w.mu.Unlock() } - - // Forward the metric to the actual channel - ch <- m } } From 15873e4cffe9cacfe444f24dabd5491f5e1bc0e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcin=20Mat=C5=82aszek?= Date: Tue, 31 Mar 2026 17:43:21 +0200 Subject: [PATCH 5/7] Fix tests & other refactor fixes --- .../observation_metrics_collector.go | 26 +----- .../observation_metrics_collector_test.go | 83 +++---------------- 2 files changed, 12 insertions(+), 97 deletions(-) diff --git a/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go b/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go index c5f4b1274d2..ef510c94f11 100644 --- a/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go +++ b/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go @@ -121,10 +121,8 @@ type wrappedCounter struct { // access to lastValue occurs and no CAS is needed. func (w *wrappedCounter) readAndPublish() { ch := make(chan prometheus.Metric, 1) - go func() { - w.Collector.Collect(ch) - close(ch) - }() + w.Collector.Collect(ch) + close(ch) for m := range ch { var metricValue float64 @@ -169,26 +167,6 @@ func extractCounterValue(m prometheus.Metric, value *float64) error { return errors.New("metric is not a counter") } -// Describe implements prometheus.Collector -func (c *ObservationMetricsCollector) Describe(ch chan<- *prometheus.Desc) { - if c.sentObservationsCounter != nil { - c.sentObservationsCounter.Describe(ch) - } - if c.includedObservationsCounter != nil { - c.includedObservationsCounter.Describe(ch) - } -} - -// Collect implements prometheus.Collector -func (c *ObservationMetricsCollector) Collect(ch chan<- prometheus.Metric) { - if c.sentObservationsCounter != nil { - c.sentObservationsCounter.Collect(ch) - } - if c.includedObservationsCounter != nil { - c.includedObservationsCounter.Collect(ch) - } -} - // interceptingRegisterer wraps a Prometheus registerer to intercept specific metric registrations type interceptingRegisterer struct { base prometheus.Registerer diff --git a/core/capabilities/ccip/oraclecreator/observation_metrics_collector_test.go b/core/capabilities/ccip/oraclecreator/observation_metrics_collector_test.go index d18c359721b..949a0e5e5c5 100644 --- a/core/capabilities/ccip/oraclecreator/observation_metrics_collector_test.go +++ b/core/capabilities/ccip/oraclecreator/observation_metrics_collector_test.go @@ -95,17 +95,7 @@ func TestObservationMetricsCollector(t *testing.T) { // Increment the counters (simulating what libocr does) sentCounter.Inc() - - // Trigger a collection to detect the increment - metricChan := make(chan prometheus.Metric, 10) - collector.sentObservationsCounter.Collect(metricChan) - close(metricChan) - // Drain the channel - for range metricChan { //nolint:revive // Intentionally draining channel - } - - // Wait for async publishing - time.Sleep(100 * time.Millisecond) + collector.sentObservationsCounter.readAndPublish() // Check that the metric was published with Beholder labels metrics := mockPub.getMetrics() @@ -124,27 +114,13 @@ func TestObservationMetricsCollector(t *testing.T) { // Increment multiple times and trigger collections sentCounter.Inc() - metricChan = make(chan prometheus.Metric, 10) - collector.sentObservationsCounter.Collect(metricChan) - close(metricChan) - for range metricChan { //nolint:revive // Intentionally draining channel - } + collector.sentObservationsCounter.readAndPublish() includedCounter.Inc() - metricChan = make(chan prometheus.Metric, 10) - collector.includedObservationsCounter.Collect(metricChan) - close(metricChan) - for range metricChan { //nolint:revive // Intentionally draining channel - } + collector.includedObservationsCounter.readAndPublish() includedCounter.Inc() - metricChan = make(chan prometheus.Metric, 10) - collector.includedObservationsCounter.Collect(metricChan) - close(metricChan) - for range metricChan { //nolint:revive // Intentionally draining channel - } - - time.Sleep(100 * time.Millisecond) + collector.includedObservationsCounter.readAndPublish() metrics = mockPub.getMetrics() assert.GreaterOrEqual(t, len(metrics), 3, "Expected at least 3 metrics to be published") @@ -202,12 +178,7 @@ func TestWrappedCounter(t *testing.T) { // Test Inc() - increment the base counter and trigger collection baseCounter.Inc() - metricChan := make(chan prometheus.Metric, 10) - wrapped.Collect(metricChan) - close(metricChan) - for range metricChan { //nolint:revive // Intentionally draining channel - } - time.Sleep(50 * time.Millisecond) + wrapped.readAndPublish() metrics := mockPub.getMetrics() require.Len(t, metrics, 1) @@ -222,12 +193,7 @@ func TestWrappedCounter(t *testing.T) { // Test Add() - increment by 5 and trigger collection baseCounter.Add(5.0) - metricChan = make(chan prometheus.Metric, 10) - wrapped.Collect(metricChan) - close(metricChan) - for range metricChan { //nolint:revive // Intentionally draining channel - } - time.Sleep(50 * time.Millisecond) + wrapped.readAndPublish() metrics = mockPub.getMetrics() require.Len(t, metrics, 2) @@ -279,14 +245,7 @@ func TestWrappedCounter_AtomicOperations(t *testing.T) { <-done } - // Trigger a collection to detect all the increments - metricChan := make(chan prometheus.Metric, 10) - wrapped.Collect(metricChan) - close(metricChan) - for range metricChan { //nolint:revive // Intentionally draining channel - } - - time.Sleep(100 * time.Millisecond) + wrapped.readAndPublish() // Verify that we published the total delta metrics := mockPub.getMetrics() @@ -316,15 +275,7 @@ func TestWrappedCounter_DeltaPublishing(t *testing.T) { logger: lggr, } - // Helper function to increment and collect - collectMetrics := func() { - metricChan := make(chan prometheus.Metric, 10) - wrapped.Collect(metricChan) - close(metricChan) - for range metricChan { //nolint:revive // Intentionally draining channel - } - time.Sleep(10 * time.Millisecond) - } + collectMetrics := func() { wrapped.readAndPublish() } // Test sequence: Inc(), Inc(), Add(5), Inc(), Add(10) baseCounter.Inc() // Should publish 1 @@ -380,15 +331,7 @@ func TestWrappedCounter_AddWithFractionalValues(t *testing.T) { logger: lggr, } - // Helper function to collect metrics - collectMetrics := func() { - metricChan := make(chan prometheus.Metric, 10) - wrapped.Collect(metricChan) - close(metricChan) - for range metricChan { //nolint:revive // Intentionally draining channel - } - time.Sleep(10 * time.Millisecond) - } + collectMetrics := func() { wrapped.readAndPublish() } // Test with fractional values baseCounter.Add(2.7) @@ -585,12 +528,6 @@ func TestWrappedCounter_NilPublisher(t *testing.T) { require.NotPanics(t, func() { baseCounter.Inc() baseCounter.Add(5.0) - - // Trigger collection - metricChan := make(chan prometheus.Metric, 10) - wrapped.Collect(metricChan) - close(metricChan) - for range metricChan { //nolint:revive // Intentionally draining channel - } + wrapped.readAndPublish() }) } From 330b1c893e829e6147675356a954fcec7732b7dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcin=20Mat=C5=82aszek?= Date: Tue, 31 Mar 2026 17:54:13 +0200 Subject: [PATCH 6/7] Fix --- .../ccip/oraclecreator/observation_metrics_collector.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go b/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go index ef510c94f11..5844d9ce6c9 100644 --- a/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go +++ b/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go @@ -121,7 +121,7 @@ type wrappedCounter struct { // access to lastValue occurs and no CAS is needed. func (w *wrappedCounter) readAndPublish() { ch := make(chan prometheus.Metric, 1) - w.Collector.Collect(ch) + w.Collect(ch) close(ch) for m := range ch { From 07712e6472db6baa105429c7fdaa14f1d443b3ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcin=20Mat=C5=82aszek?= Date: Wed, 1 Apr 2026 10:26:14 +0200 Subject: [PATCH 7/7] Review fixes --- .../observation_metrics_collector.go | 23 ++++++++----------- .../observation_metrics_collector_test.go | 5 ++-- .../capabilities/ccip/oraclecreator/plugin.go | 22 ++++++++---------- 3 files changed, 22 insertions(+), 28 deletions(-) diff --git a/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go b/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go index 5844d9ce6c9..9bf5c4811d6 100644 --- a/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go +++ b/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go @@ -108,19 +108,20 @@ func (c *ObservationMetricsCollector) Close() error { // to intercept Collect() calls and track value changes type wrappedCounter struct { prometheus.Collector - metricName string - labels map[string]string // Beholder labels (for metrics publishing) - publisher ObservationMetricsPublisher - logger logger.Logger - lastValue float64 - mu sync.Mutex + metricName string + labels map[string]string // Beholder labels (for metrics publishing) + publisher ObservationMetricsPublisher + logger logger.Logger + lastValue float64 } // readAndPublish reads the current counter value and publishes any delta to Beholder. -// Only called by the background poller, never by Prometheus scrapes, so no concurrent -// access to lastValue occurs and no CAS is needed. +// Only called sequentially by the background poller goroutine, so lastValue requires +// no synchronisation. func (w *wrappedCounter) readAndPublish() { - ch := make(chan prometheus.Metric, 1) + // Buffer sized to the typical max Prometheus series per counter (1 for a plain + // Counter, more for a CounterVec). Sized generously to avoid blocking Collect. + ch := make(chan prometheus.Metric, 16) w.Collect(ch) close(ch) @@ -129,11 +130,9 @@ func (w *wrappedCounter) readAndPublish() { if err := extractCounterValue(m, &metricValue); err != nil { continue } - w.mu.Lock() delta := metricValue - w.lastValue if delta > 0 { w.lastValue = metricValue - w.mu.Unlock() w.logger.Debugw("Observation metric incremented", "metric", w.metricName, "value", metricValue, @@ -143,8 +142,6 @@ func (w *wrappedCounter) readAndPublish() { if w.publisher != nil { w.publisher.PublishMetric(context.Background(), w.metricName, delta, w.labels) } - } else { - w.mu.Unlock() } } } diff --git a/core/capabilities/ccip/oraclecreator/observation_metrics_collector_test.go b/core/capabilities/ccip/oraclecreator/observation_metrics_collector_test.go index 949a0e5e5c5..477ab1a245a 100644 --- a/core/capabilities/ccip/oraclecreator/observation_metrics_collector_test.go +++ b/core/capabilities/ccip/oraclecreator/observation_metrics_collector_test.go @@ -206,8 +206,9 @@ func TestWrappedCounter(t *testing.T) { assert.Equal(t, "Arbitrum", metrics[1].labels["networkName"]) } -// TestWrappedCounter_AtomicOperations verifies that atomic operations work correctly under concurrent access -func TestWrappedCounter_AtomicOperations(t *testing.T) { +// TestWrappedCounter_ConcurrentIncrements verifies that the total delta is correctly published +// when the underlying counter is incremented concurrently from multiple goroutines. +func TestWrappedCounter_ConcurrentIncrements(t *testing.T) { lggr, err := logger.New() require.NoError(t, err) diff --git a/core/capabilities/ccip/oraclecreator/plugin.go b/core/capabilities/ccip/oraclecreator/plugin.go index e9d9a120f1e..429eacb27cf 100644 --- a/core/capabilities/ccip/oraclecreator/plugin.go +++ b/core/capabilities/ccip/oraclecreator/plugin.go @@ -77,7 +77,6 @@ type pluginOracleCreator struct { relayers map[types.RelayID]loop.Relayer addressCodec ccipcommon.AddressCodec p2pID p2pkey.KeyV2 - metricsCollector *ObservationMetricsCollector } func NewPluginOracleCreator( @@ -268,7 +267,7 @@ func (i *pluginOracleCreator) Create(ctx context.Context, donID uint32, config c } // Initialize the observation metrics collector to wrap OCR3 metrics - wrappedRegisterer, err := i.setupObservationMetricsCollector(pluginType, telemetryType, chainSelector) + metricsCollector, wrappedRegisterer, err := i.setupObservationMetricsCollector(pluginType, telemetryType, chainSelector) if err != nil { return nil, fmt.Errorf("failed to setup observation metrics collector: %w", err) } @@ -314,9 +313,9 @@ func (i *pluginOracleCreator) Create(ctx context.Context, donID uint32, config c closers = append(closers, cw) } // Add metrics collector to closers so it's properly shut down - closers = append(closers, i.metricsCollector) + closers = append(closers, metricsCollector) // Start the background polling loop after NewOracle so all counters are already registered. - i.metricsCollector.Start(defaultPollingInterval) + metricsCollector.Start(defaultPollingInterval) return newWrappedOracle(oracle, closers), nil } @@ -863,21 +862,21 @@ func (i *pluginOracleCreator) setupObservationMetricsCollector( pluginType cctypes.PluginType, telemetryType synchronization.TelemetryType, chainSelector uint64, -) (prometheus.Registerer, error) { +) (*ObservationMetricsCollector, prometheus.Registerer, error) { // Get chain details for Beholder labels chainID, err := chainsel.GetChainIDFromSelector(chainSelector) if err != nil { - return nil, fmt.Errorf("failed to get chain ID from selector %d: %w", chainSelector, err) + return nil, nil, fmt.Errorf("failed to get chain ID from selector %d: %w", chainSelector, err) } chainFamily, err := chainsel.GetSelectorFamily(chainSelector) if err != nil { - return nil, fmt.Errorf("failed to get chain family from selector %d: %w", chainSelector, err) + return nil, nil, fmt.Errorf("failed to get chain family from selector %d: %w", chainSelector, err) } networkName, err := chainsel.GetChainNameFromSelector(chainSelector) if err != nil { - return nil, fmt.Errorf("failed to get network name from selector %d: %w", chainSelector, err) + return nil, nil, fmt.Errorf("failed to get network name from selector %d: %w", chainSelector, err) } // Determine the plugin type name for labels @@ -903,7 +902,7 @@ func (i *pluginOracleCreator) setupObservationMetricsCollector( string(telemetryType), // Use telemetryType (ocr3-ccip-commit or ocr3-ccip-exec) ) if err != nil { - return nil, fmt.Errorf("failed to create Beholder metrics publisher: %w", err) + return nil, nil, fmt.Errorf("failed to create Beholder metrics publisher: %w", err) } // Define Prometheus constant labels (keep existing format to avoid breaking changes) @@ -922,14 +921,11 @@ func (i *pluginOracleCreator) setupObservationMetricsCollector( beholderLabels, ) - // Store the collector so we can close it later - i.metricsCollector = metricsCollector - // Create a wrapped registerer that intercepts observation metric registrations // Don't use WrapRegistererWith here as it would wrap the collectors and prevent // us from intercepting Inc() calls on counters. Instead, we'll handle the wrapping // ourselves in the intercepting registerer. wrappedRegisterer := metricsCollector.CreateWrappedRegisterer(prometheus.DefaultRegisterer) - return wrappedRegisterer, nil + return metricsCollector, wrappedRegisterer, nil }