Skip to content

Commit 5d40b59

Browse files
committed
Address review comments
1 parent cbd5c1a commit 5d40b59

2 files changed

Lines changed: 72 additions & 28 deletions

File tree

core/capabilities/ccip/oraclecreator/observation_metrics_collector.go

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"math"
77
"strings"
8+
"sync"
89
"sync/atomic"
910
"time"
1011

@@ -27,8 +28,8 @@ type ObservationMetricsPublisher interface {
2728
type ObservationMetricsCollector struct {
2829
logger logger.Logger
2930
publisher ObservationMetricsPublisher
30-
ctx context.Context
31-
cancel context.CancelFunc
31+
stop chan struct{}
32+
stopOnce sync.Once
3233
constantLabels map[string]string // Prometheus labels (for WrapRegistererWith)
3334
beholderLabels map[string]string // Beholder labels (for metrics publishing)
3435

@@ -44,13 +45,10 @@ func NewObservationMetricsCollector(
4445
constantLabels map[string]string,
4546
beholderLabels map[string]string,
4647
) *ObservationMetricsCollector {
47-
ctx, cancel := context.WithCancel(context.Background())
48-
4948
collector := &ObservationMetricsCollector{
5049
logger: logger,
5150
publisher: publisher,
52-
ctx: ctx,
53-
cancel: cancel,
51+
stop: make(chan struct{}),
5452
constantLabels: constantLabels,
5553
beholderLabels: beholderLabels,
5654
}
@@ -71,14 +69,17 @@ func (c *ObservationMetricsCollector) CreateWrappedRegisterer(baseRegisterer pro
7169
// Call Start after the wrapped registerer has been passed to libocr (i.e. after NewOracle),
7270
// so that the counters are already registered before the first poll fires.
7371
func (c *ObservationMetricsCollector) Start(interval time.Duration) {
72+
if interval <= 0 {
73+
interval = defaultPollingInterval
74+
}
7475
go func() {
7576
ticker := time.NewTicker(interval)
7677
defer ticker.Stop()
7778
for {
7879
select {
7980
case <-ticker.C:
8081
c.poll()
81-
case <-c.ctx.Done():
82+
case <-c.stop:
8283
return
8384
}
8485
}
@@ -99,9 +100,9 @@ func (c *ObservationMetricsCollector) poll() {
99100
}
100101
}
101102

102-
// Close stops the collector
103+
// Close stops the background polling goroutine. Safe to call multiple times.
103104
func (c *ObservationMetricsCollector) Close() error {
104-
c.cancel()
105+
c.stopOnce.Do(func() { close(c.stop) })
105106
return nil
106107
}
107108

@@ -132,26 +133,29 @@ func (w *wrappedCounter) Collect(ch chan<- prometheus.Metric) {
132133
// Try to extract the counter value from the metric
133134
var metricValue float64
134135
if err := extractCounterValue(m, &metricValue); err == nil {
135-
// Load the last value atomically
136-
lastBits := atomic.LoadUint64(&w.lastValueBits)
137-
lastValue := math.Float64frombits(lastBits)
138-
139-
if metricValue > lastValue {
140-
delta := metricValue - lastValue
141-
// Store the new value atomically
142-
atomic.StoreUint64(&w.lastValueBits, math.Float64bits(metricValue))
143-
144-
w.logger.Debugw("Observation metric incremented",
145-
"metric", w.metricName,
146-
"value", metricValue,
147-
"delta", delta,
148-
"labels", w.labels,
149-
)
150-
151-
if w.publisher != nil {
152-
// Publish the delta, not the cumulative value
153-
w.publisher.PublishMetric(context.Background(), w.metricName, delta, w.labels)
136+
// CAS loop: ensures only one concurrent caller (background poll or Prometheus
137+
// scrape) advances lastValueBits and publishes the delta for a given interval.
138+
for {
139+
lastBits := atomic.LoadUint64(&w.lastValueBits)
140+
lastValue := math.Float64frombits(lastBits)
141+
if metricValue <= lastValue {
142+
break
143+
}
144+
newBits := math.Float64bits(metricValue)
145+
if atomic.CompareAndSwapUint64(&w.lastValueBits, lastBits, newBits) {
146+
delta := metricValue - lastValue
147+
w.logger.Debugw("Observation metric incremented",
148+
"metric", w.metricName,
149+
"value", metricValue,
150+
"delta", delta,
151+
"labels", w.labels,
152+
)
153+
if w.publisher != nil {
154+
w.publisher.PublishMetric(context.Background(), w.metricName, delta, w.labels)
155+
}
156+
break
154157
}
158+
// CAS failed — another concurrent Collect advanced lastValueBits; retry.
155159
}
156160
}
157161

core/capabilities/ccip/oraclecreator/observation_metrics_collector_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,46 @@ func TestObservationMetricsCollector_BackgroundPolling(t *testing.T) {
498498
assert.InEpsilon(t, 1.0, mockPub.getMetrics()[1].value, 0.01)
499499
}
500500

501+
// TestObservationMetricsCollector_CloseStopsPolling verifies that Close stops the background
502+
// goroutine and no further publishes occur even if the counter keeps incrementing.
503+
func TestObservationMetricsCollector_CloseStopsPolling(t *testing.T) {
504+
lggr, err := logger.New()
505+
require.NoError(t, err)
506+
507+
mockPub := &mockPublisher{}
508+
collector := NewObservationMetricsCollector(lggr, mockPub,
509+
map[string]string{"name": "commit-1234"},
510+
map[string]string{"pluginType": "commit"},
511+
)
512+
513+
registry := prometheus.NewRegistry()
514+
wrappedRegisterer := collector.CreateWrappedRegisterer(registry)
515+
516+
sentCounter := prometheus.NewCounter(prometheus.CounterOpts{
517+
Name: "ocr3_sent_observations_total",
518+
Help: "Test counter",
519+
})
520+
require.NoError(t, wrappedRegisterer.Register(sentCounter))
521+
522+
sentCounter.Inc()
523+
524+
const pollInterval = 50 * time.Millisecond
525+
collector.Start(pollInterval)
526+
527+
// Wait for the first publish.
528+
require.Eventually(t, func() bool {
529+
return len(mockPub.getMetrics()) > 0
530+
}, 500*time.Millisecond, 10*time.Millisecond)
531+
532+
require.NoError(t, collector.Close())
533+
534+
// Increment again after Close — the poller should no longer be running.
535+
sentCounter.Inc()
536+
time.Sleep(3 * pollInterval)
537+
538+
assert.Len(t, mockPub.getMetrics(), 1, "no new publishes expected after Close")
539+
}
540+
501541
// TestObservationMetricsCollector_Close verifies proper cleanup
502542
func TestObservationMetricsCollector_Close(t *testing.T) {
503543
lggr, err := logger.New()

0 commit comments

Comments
 (0)