diff --git a/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go b/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go index ba64904c964..9bf5c4811d6 100644 --- a/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go +++ b/core/capabilities/ccip/oraclecreator/observation_metrics_collector.go @@ -3,9 +3,9 @@ package oraclecreator import ( "context" "errors" - "math" "strings" - "sync/atomic" + "sync" + "time" "github.com/prometheus/client_golang/prometheus" prometheus_dto "github.com/prometheus/client_model/go" @@ -13,6 +13,10 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" ) +// defaultPollingInterval is the interval at which the collector polls counters and publishes +// deltas to Beholder, independent of Prometheus scrapes. +const defaultPollingInterval = 10 * time.Second + // ObservationMetricsPublisher is the interface for publishing observation metrics to external destinations type ObservationMetricsPublisher interface { PublishMetric(ctx context.Context, metricName string, value float64, labels map[string]string) @@ -22,7 +26,9 @@ type ObservationMetricsPublisher interface { type ObservationMetricsCollector struct { logger logger.Logger publisher ObservationMetricsPublisher - cancel context.CancelFunc + stop chan struct{} + startOnce sync.Once + stopOnce sync.Once constantLabels map[string]string // Prometheus labels (for WrapRegistererWith) beholderLabels map[string]string // Beholder labels (for metrics publishing) @@ -41,7 +47,7 @@ func NewObservationMetricsCollector( collector := &ObservationMetricsCollector{ logger: logger, publisher: publisher, - cancel: func() {}, + stop: make(chan struct{}), constantLabels: constantLabels, beholderLabels: beholderLabels, } @@ -57,9 +63,44 @@ func (c *ObservationMetricsCollector) CreateWrappedRegisterer(baseRegisterer pro } } -// Close stops the collector +// Start launches a background goroutine that polls the wrapped counters on the given interval +// and publishes deltas to Beholder, independent of Prometheus scrapes. +// Call Start after the wrapped registerer has been passed to libocr (i.e. after NewOracle), +// so that the counters are already registered before the first poll fires. +// Safe to call multiple times; only the first call starts the goroutine. +func (c *ObservationMetricsCollector) Start(interval time.Duration) { + if interval <= 0 { + interval = defaultPollingInterval + } + c.startOnce.Do(func() { + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + c.poll() + case <-c.stop: + return + } + } + }() + }) +} + +// poll reads the current value of each wrapped counter and publishes any delta to Beholder. +func (c *ObservationMetricsCollector) poll() { + if c.sentObservationsCounter != nil { + c.sentObservationsCounter.readAndPublish() + } + if c.includedObservationsCounter != nil { + c.includedObservationsCounter.readAndPublish() + } +} + +// Close stops the background polling goroutine. Safe to call multiple times. func (c *ObservationMetricsCollector) Close() error { - c.cancel() + c.stopOnce.Do(func() { close(c.stop) }) return nil } @@ -67,54 +108,41 @@ func (c *ObservationMetricsCollector) Close() error { // to intercept Collect() calls and track value changes type wrappedCounter struct { prometheus.Collector - metricName string - labels map[string]string // Beholder labels (for metrics publishing) - publisher ObservationMetricsPublisher - logger logger.Logger - lastValueBits uint64 // stores float64 as bits for atomic operations + metricName string + labels map[string]string // Beholder labels (for metrics publishing) + publisher ObservationMetricsPublisher + logger logger.Logger + lastValue float64 } -// Collect intercepts metric collection to detect counter increments -func (w *wrappedCounter) Collect(ch chan<- prometheus.Metric) { - // Create a channel to intercept metrics - interceptCh := make(chan prometheus.Metric, 10) - - // Collect from the underlying collector - go func() { - w.Collector.Collect(interceptCh) - close(interceptCh) - }() - - // Forward metrics and track counter value - for m := range interceptCh { - // Try to extract the counter value from the metric +// readAndPublish reads the current counter value and publishes any delta to Beholder. +// Only called sequentially by the background poller goroutine, so lastValue requires +// no synchronisation. +func (w *wrappedCounter) readAndPublish() { + // Buffer sized to the typical max Prometheus series per counter (1 for a plain + // Counter, more for a CounterVec). Sized generously to avoid blocking Collect. + ch := make(chan prometheus.Metric, 16) + w.Collect(ch) + close(ch) + + for m := range ch { var metricValue float64 - if err := extractCounterValue(m, &metricValue); err == nil { - // Load the last value atomically - lastBits := atomic.LoadUint64(&w.lastValueBits) - lastValue := math.Float64frombits(lastBits) - - if metricValue > lastValue { - delta := metricValue - lastValue - // Store the new value atomically - atomic.StoreUint64(&w.lastValueBits, math.Float64bits(metricValue)) - - w.logger.Debugw("Observation metric incremented", - "metric", w.metricName, - "value", metricValue, - "delta", delta, - "labels", w.labels, - ) - - if w.publisher != nil { - // Publish the delta, not the cumulative value - w.publisher.PublishMetric(context.Background(), w.metricName, delta, w.labels) - } + if err := extractCounterValue(m, &metricValue); err != nil { + continue + } + delta := metricValue - w.lastValue + if delta > 0 { + w.lastValue = metricValue + w.logger.Debugw("Observation metric incremented", + "metric", w.metricName, + "value", metricValue, + "delta", delta, + "labels", w.labels, + ) + if w.publisher != nil { + w.publisher.PublishMetric(context.Background(), w.metricName, delta, w.labels) } } - - // Forward the metric to the actual channel - ch <- m } } @@ -136,26 +164,6 @@ func extractCounterValue(m prometheus.Metric, value *float64) error { return errors.New("metric is not a counter") } -// Describe implements prometheus.Collector -func (c *ObservationMetricsCollector) Describe(ch chan<- *prometheus.Desc) { - if c.sentObservationsCounter != nil { - c.sentObservationsCounter.Describe(ch) - } - if c.includedObservationsCounter != nil { - c.includedObservationsCounter.Describe(ch) - } -} - -// Collect implements prometheus.Collector -func (c *ObservationMetricsCollector) Collect(ch chan<- prometheus.Metric) { - if c.sentObservationsCounter != nil { - c.sentObservationsCounter.Collect(ch) - } - if c.includedObservationsCounter != nil { - c.includedObservationsCounter.Collect(ch) - } -} - // interceptingRegisterer wraps a Prometheus registerer to intercept specific metric registrations type interceptingRegisterer struct { base prometheus.Registerer diff --git a/core/capabilities/ccip/oraclecreator/observation_metrics_collector_test.go b/core/capabilities/ccip/oraclecreator/observation_metrics_collector_test.go index 741ba31cd3d..477ab1a245a 100644 --- a/core/capabilities/ccip/oraclecreator/observation_metrics_collector_test.go +++ b/core/capabilities/ccip/oraclecreator/observation_metrics_collector_test.go @@ -95,17 +95,7 @@ func TestObservationMetricsCollector(t *testing.T) { // Increment the counters (simulating what libocr does) sentCounter.Inc() - - // Trigger a collection to detect the increment - metricChan := make(chan prometheus.Metric, 10) - collector.sentObservationsCounter.Collect(metricChan) - close(metricChan) - // Drain the channel - for range metricChan { //nolint:revive // Intentionally draining channel - } - - // Wait for async publishing - time.Sleep(100 * time.Millisecond) + collector.sentObservationsCounter.readAndPublish() // Check that the metric was published with Beholder labels metrics := mockPub.getMetrics() @@ -124,27 +114,13 @@ func TestObservationMetricsCollector(t *testing.T) { // Increment multiple times and trigger collections sentCounter.Inc() - metricChan = make(chan prometheus.Metric, 10) - collector.sentObservationsCounter.Collect(metricChan) - close(metricChan) - for range metricChan { //nolint:revive // Intentionally draining channel - } + collector.sentObservationsCounter.readAndPublish() includedCounter.Inc() - metricChan = make(chan prometheus.Metric, 10) - collector.includedObservationsCounter.Collect(metricChan) - close(metricChan) - for range metricChan { //nolint:revive // Intentionally draining channel - } + collector.includedObservationsCounter.readAndPublish() includedCounter.Inc() - metricChan = make(chan prometheus.Metric, 10) - collector.includedObservationsCounter.Collect(metricChan) - close(metricChan) - for range metricChan { //nolint:revive // Intentionally draining channel - } - - time.Sleep(100 * time.Millisecond) + collector.includedObservationsCounter.readAndPublish() metrics = mockPub.getMetrics() assert.GreaterOrEqual(t, len(metrics), 3, "Expected at least 3 metrics to be published") @@ -202,12 +178,7 @@ func TestWrappedCounter(t *testing.T) { // Test Inc() - increment the base counter and trigger collection baseCounter.Inc() - metricChan := make(chan prometheus.Metric, 10) - wrapped.Collect(metricChan) - close(metricChan) - for range metricChan { //nolint:revive // Intentionally draining channel - } - time.Sleep(50 * time.Millisecond) + wrapped.readAndPublish() metrics := mockPub.getMetrics() require.Len(t, metrics, 1) @@ -222,12 +193,7 @@ func TestWrappedCounter(t *testing.T) { // Test Add() - increment by 5 and trigger collection baseCounter.Add(5.0) - metricChan = make(chan prometheus.Metric, 10) - wrapped.Collect(metricChan) - close(metricChan) - for range metricChan { //nolint:revive // Intentionally draining channel - } - time.Sleep(50 * time.Millisecond) + wrapped.readAndPublish() metrics = mockPub.getMetrics() require.Len(t, metrics, 2) @@ -240,8 +206,9 @@ func TestWrappedCounter(t *testing.T) { assert.Equal(t, "Arbitrum", metrics[1].labels["networkName"]) } -// TestWrappedCounter_AtomicOperations verifies that atomic operations work correctly under concurrent access -func TestWrappedCounter_AtomicOperations(t *testing.T) { +// TestWrappedCounter_ConcurrentIncrements verifies that the total delta is correctly published +// when the underlying counter is incremented concurrently from multiple goroutines. +func TestWrappedCounter_ConcurrentIncrements(t *testing.T) { lggr, err := logger.New() require.NoError(t, err) @@ -279,14 +246,7 @@ func TestWrappedCounter_AtomicOperations(t *testing.T) { <-done } - // Trigger a collection to detect all the increments - metricChan := make(chan prometheus.Metric, 10) - wrapped.Collect(metricChan) - close(metricChan) - for range metricChan { //nolint:revive // Intentionally draining channel - } - - time.Sleep(100 * time.Millisecond) + wrapped.readAndPublish() // Verify that we published the total delta metrics := mockPub.getMetrics() @@ -316,15 +276,7 @@ func TestWrappedCounter_DeltaPublishing(t *testing.T) { logger: lggr, } - // Helper function to increment and collect - collectMetrics := func() { - metricChan := make(chan prometheus.Metric, 10) - wrapped.Collect(metricChan) - close(metricChan) - for range metricChan { //nolint:revive // Intentionally draining channel - } - time.Sleep(10 * time.Millisecond) - } + collectMetrics := func() { wrapped.readAndPublish() } // Test sequence: Inc(), Inc(), Add(5), Inc(), Add(10) baseCounter.Inc() // Should publish 1 @@ -380,15 +332,7 @@ func TestWrappedCounter_AddWithFractionalValues(t *testing.T) { logger: lggr, } - // Helper function to collect metrics - collectMetrics := func() { - metricChan := make(chan prometheus.Metric, 10) - wrapped.Collect(metricChan) - close(metricChan) - for range metricChan { //nolint:revive // Intentionally draining channel - } - time.Sleep(10 * time.Millisecond) - } + collectMetrics := func() { wrapped.readAndPublish() } // Test with fractional values baseCounter.Add(2.7) @@ -447,6 +391,97 @@ func TestObservationMetricsCollector_NonTargetMetrics(t *testing.T) { assert.Nil(t, collector.includedObservationsCounter) } +// TestObservationMetricsCollector_BackgroundPolling verifies that Start publishes metrics +// on a timer without any explicit Collect call from the outside (i.e. without a Prometheus scrape). +func TestObservationMetricsCollector_BackgroundPolling(t *testing.T) { + lggr, err := logger.New() + require.NoError(t, err) + + mockPub := &mockPublisher{} + collector := NewObservationMetricsCollector(lggr, mockPub, + map[string]string{"name": "commit-1234"}, + map[string]string{"pluginType": "commit"}, + ) + defer func() { _ = collector.Close() }() + + registry := prometheus.NewRegistry() + wrappedRegisterer := collector.CreateWrappedRegisterer(registry) + + sentCounter := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "ocr3_sent_observations_total", + Help: "Test counter", + }) + require.NoError(t, wrappedRegisterer.Register(sentCounter)) + + sentCounter.Add(3) + + // Start with a short interval — no external Collect call is made. + const pollInterval = 50 * time.Millisecond + collector.Start(pollInterval) + + // Wait for at least one poll to fire. + require.Eventually(t, func() bool { + return len(mockPub.getMetrics()) > 0 + }, 500*time.Millisecond, 10*time.Millisecond) + + metrics := mockPub.getMetrics() + require.Len(t, metrics, 1) + assert.Equal(t, "ocr3_sent_observations_total", metrics[0].name) + assert.InEpsilon(t, 3.0, metrics[0].value, 0.01) + + // Subsequent polls with no new increments should not publish. + time.Sleep(3 * pollInterval) + assert.Len(t, mockPub.getMetrics(), 1, "no new publishes expected when counter has not changed") + + // A new increment should be picked up on the next poll. + sentCounter.Inc() + require.Eventually(t, func() bool { + return len(mockPub.getMetrics()) >= 2 + }, 500*time.Millisecond, 10*time.Millisecond) + + assert.InEpsilon(t, 1.0, mockPub.getMetrics()[1].value, 0.01) +} + +// TestObservationMetricsCollector_CloseStopsPolling verifies that Close stops the background +// goroutine and no further publishes occur even if the counter keeps incrementing. +func TestObservationMetricsCollector_CloseStopsPolling(t *testing.T) { + lggr, err := logger.New() + require.NoError(t, err) + + mockPub := &mockPublisher{} + collector := NewObservationMetricsCollector(lggr, mockPub, + map[string]string{"name": "commit-1234"}, + map[string]string{"pluginType": "commit"}, + ) + + registry := prometheus.NewRegistry() + wrappedRegisterer := collector.CreateWrappedRegisterer(registry) + + sentCounter := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "ocr3_sent_observations_total", + Help: "Test counter", + }) + require.NoError(t, wrappedRegisterer.Register(sentCounter)) + + sentCounter.Inc() + + const pollInterval = 50 * time.Millisecond + collector.Start(pollInterval) + + // Wait for the first publish. + require.Eventually(t, func() bool { + return len(mockPub.getMetrics()) > 0 + }, 500*time.Millisecond, 10*time.Millisecond) + + require.NoError(t, collector.Close()) + + // Increment again after Close — the poller should no longer be running. + sentCounter.Inc() + time.Sleep(3 * pollInterval) + + assert.Len(t, mockPub.getMetrics(), 1, "no new publishes expected after Close") +} + // TestObservationMetricsCollector_Close verifies proper cleanup func TestObservationMetricsCollector_Close(t *testing.T) { lggr, err := logger.New() @@ -494,12 +529,6 @@ func TestWrappedCounter_NilPublisher(t *testing.T) { require.NotPanics(t, func() { baseCounter.Inc() baseCounter.Add(5.0) - - // Trigger collection - metricChan := make(chan prometheus.Metric, 10) - wrapped.Collect(metricChan) - close(metricChan) - for range metricChan { //nolint:revive // Intentionally draining channel - } + wrapped.readAndPublish() }) } diff --git a/core/capabilities/ccip/oraclecreator/plugin.go b/core/capabilities/ccip/oraclecreator/plugin.go index ef2a9c12250..429eacb27cf 100644 --- a/core/capabilities/ccip/oraclecreator/plugin.go +++ b/core/capabilities/ccip/oraclecreator/plugin.go @@ -77,7 +77,6 @@ type pluginOracleCreator struct { relayers map[types.RelayID]loop.Relayer addressCodec ccipcommon.AddressCodec p2pID p2pkey.KeyV2 - metricsCollector *ObservationMetricsCollector } func NewPluginOracleCreator( @@ -268,7 +267,7 @@ func (i *pluginOracleCreator) Create(ctx context.Context, donID uint32, config c } // Initialize the observation metrics collector to wrap OCR3 metrics - wrappedRegisterer, err := i.setupObservationMetricsCollector(pluginType, telemetryType, chainSelector) + metricsCollector, wrappedRegisterer, err := i.setupObservationMetricsCollector(pluginType, telemetryType, chainSelector) if err != nil { return nil, fmt.Errorf("failed to setup observation metrics collector: %w", err) } @@ -314,7 +313,9 @@ func (i *pluginOracleCreator) Create(ctx context.Context, donID uint32, config c closers = append(closers, cw) } // Add metrics collector to closers so it's properly shut down - closers = append(closers, i.metricsCollector) + closers = append(closers, metricsCollector) + // Start the background polling loop after NewOracle so all counters are already registered. + metricsCollector.Start(defaultPollingInterval) return newWrappedOracle(oracle, closers), nil } @@ -861,21 +862,21 @@ func (i *pluginOracleCreator) setupObservationMetricsCollector( pluginType cctypes.PluginType, telemetryType synchronization.TelemetryType, chainSelector uint64, -) (prometheus.Registerer, error) { +) (*ObservationMetricsCollector, prometheus.Registerer, error) { // Get chain details for Beholder labels chainID, err := chainsel.GetChainIDFromSelector(chainSelector) if err != nil { - return nil, fmt.Errorf("failed to get chain ID from selector %d: %w", chainSelector, err) + return nil, nil, fmt.Errorf("failed to get chain ID from selector %d: %w", chainSelector, err) } chainFamily, err := chainsel.GetSelectorFamily(chainSelector) if err != nil { - return nil, fmt.Errorf("failed to get chain family from selector %d: %w", chainSelector, err) + return nil, nil, fmt.Errorf("failed to get chain family from selector %d: %w", chainSelector, err) } networkName, err := chainsel.GetChainNameFromSelector(chainSelector) if err != nil { - return nil, fmt.Errorf("failed to get network name from selector %d: %w", chainSelector, err) + return nil, nil, fmt.Errorf("failed to get network name from selector %d: %w", chainSelector, err) } // Determine the plugin type name for labels @@ -901,7 +902,7 @@ func (i *pluginOracleCreator) setupObservationMetricsCollector( string(telemetryType), // Use telemetryType (ocr3-ccip-commit or ocr3-ccip-exec) ) if err != nil { - return nil, fmt.Errorf("failed to create Beholder metrics publisher: %w", err) + return nil, nil, fmt.Errorf("failed to create Beholder metrics publisher: %w", err) } // Define Prometheus constant labels (keep existing format to avoid breaking changes) @@ -920,14 +921,11 @@ func (i *pluginOracleCreator) setupObservationMetricsCollector( beholderLabels, ) - // Store the collector so we can close it later - i.metricsCollector = metricsCollector - // Create a wrapped registerer that intercepts observation metric registrations // Don't use WrapRegistererWith here as it would wrap the collectors and prevent // us from intercepting Inc() calls on counters. Instead, we'll handle the wrapping // ourselves in the intercepting registerer. wrappedRegisterer := metricsCollector.CreateWrappedRegisterer(prometheus.DefaultRegisterer) - return wrappedRegisterer, nil + return metricsCollector, wrappedRegisterer, nil }