Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@ package oraclecreator
import (
"context"
"errors"
"math"
"strings"
"sync/atomic"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
prometheus_dto "github.com/prometheus/client_model/go"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
)

// defaultPollingInterval is the interval at which the collector polls counters and publishes
// deltas to Beholder, independent of Prometheus scrapes.
const defaultPollingInterval = 10 * time.Second

// ObservationMetricsPublisher is the interface for publishing observation metrics to external destinations
type ObservationMetricsPublisher interface {
PublishMetric(ctx context.Context, metricName string, value float64, labels map[string]string)
Expand All @@ -22,7 +26,9 @@ type ObservationMetricsPublisher interface {
type ObservationMetricsCollector struct {
logger logger.Logger
publisher ObservationMetricsPublisher
cancel context.CancelFunc
stop chan struct{}
startOnce sync.Once
stopOnce sync.Once
constantLabels map[string]string // Prometheus labels (for WrapRegistererWith)
beholderLabels map[string]string // Beholder labels (for metrics publishing)

Expand All @@ -41,7 +47,7 @@ func NewObservationMetricsCollector(
collector := &ObservationMetricsCollector{
logger: logger,
publisher: publisher,
cancel: func() {},
stop: make(chan struct{}),
constantLabels: constantLabels,
beholderLabels: beholderLabels,
}
Expand All @@ -57,64 +63,86 @@ func (c *ObservationMetricsCollector) CreateWrappedRegisterer(baseRegisterer pro
}
}

// Close stops the collector
// Start launches a background goroutine that polls the wrapped counters on the given interval
// and publishes deltas to Beholder, independent of Prometheus scrapes.
// Call Start after the wrapped registerer has been passed to libocr (i.e. after NewOracle),
// so that the counters are already registered before the first poll fires.
// Safe to call multiple times; only the first call starts the goroutine.
func (c *ObservationMetricsCollector) Start(interval time.Duration) {
if interval <= 0 {
interval = defaultPollingInterval
}
c.startOnce.Do(func() {
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.poll()
case <-c.stop:
return
}
}
}()
})
}

// poll reads the current value of each wrapped counter and publishes any delta to Beholder.
func (c *ObservationMetricsCollector) poll() {
if c.sentObservationsCounter != nil {
c.sentObservationsCounter.readAndPublish()
}
if c.includedObservationsCounter != nil {
c.includedObservationsCounter.readAndPublish()
}
}

// Close stops the background polling goroutine. Safe to call multiple times.
func (c *ObservationMetricsCollector) Close() error {
c.cancel()
c.stopOnce.Do(func() { close(c.stop) })
return nil
}

// wrappedCounter wraps a Prometheus collector (which may be a counter or wrappingCollector)
// to intercept Collect() calls and track value changes
type wrappedCounter struct {
prometheus.Collector
metricName string
labels map[string]string // Beholder labels (for metrics publishing)
publisher ObservationMetricsPublisher
logger logger.Logger
lastValueBits uint64 // stores float64 as bits for atomic operations
metricName string
labels map[string]string // Beholder labels (for metrics publishing)
publisher ObservationMetricsPublisher
logger logger.Logger
lastValue float64
}

// Collect intercepts metric collection to detect counter increments
func (w *wrappedCounter) Collect(ch chan<- prometheus.Metric) {
// Create a channel to intercept metrics
interceptCh := make(chan prometheus.Metric, 10)

// Collect from the underlying collector
go func() {
w.Collector.Collect(interceptCh)
close(interceptCh)
}()

// Forward metrics and track counter value
for m := range interceptCh {
// Try to extract the counter value from the metric
// readAndPublish reads the current counter value and publishes any delta to Beholder.
// Only called sequentially by the background poller goroutine, so lastValue requires
// no synchronisation.
func (w *wrappedCounter) readAndPublish() {
// Buffer sized to the typical max Prometheus series per counter (1 for a plain
// Counter, more for a CounterVec). Sized generously to avoid blocking Collect.
ch := make(chan prometheus.Metric, 16)
w.Collect(ch)
close(ch)

for m := range ch {
var metricValue float64
if err := extractCounterValue(m, &metricValue); err == nil {
// Load the last value atomically
lastBits := atomic.LoadUint64(&w.lastValueBits)
lastValue := math.Float64frombits(lastBits)

if metricValue > lastValue {
delta := metricValue - lastValue
// Store the new value atomically
atomic.StoreUint64(&w.lastValueBits, math.Float64bits(metricValue))

w.logger.Debugw("Observation metric incremented",
"metric", w.metricName,
"value", metricValue,
"delta", delta,
"labels", w.labels,
)

if w.publisher != nil {
// Publish the delta, not the cumulative value
w.publisher.PublishMetric(context.Background(), w.metricName, delta, w.labels)
}
if err := extractCounterValue(m, &metricValue); err != nil {
continue
}
delta := metricValue - w.lastValue
if delta > 0 {
w.lastValue = metricValue
w.logger.Debugw("Observation metric incremented",
"metric", w.metricName,
"value", metricValue,
"delta", delta,
"labels", w.labels,
)
if w.publisher != nil {
w.publisher.PublishMetric(context.Background(), w.metricName, delta, w.labels)
}
}

// Forward the metric to the actual channel
ch <- m
}
}

Expand All @@ -136,26 +164,6 @@ func extractCounterValue(m prometheus.Metric, value *float64) error {
return errors.New("metric is not a counter")
}

// Describe implements prometheus.Collector
func (c *ObservationMetricsCollector) Describe(ch chan<- *prometheus.Desc) {
if c.sentObservationsCounter != nil {
c.sentObservationsCounter.Describe(ch)
}
if c.includedObservationsCounter != nil {
c.includedObservationsCounter.Describe(ch)
}
}

// Collect implements prometheus.Collector
func (c *ObservationMetricsCollector) Collect(ch chan<- prometheus.Metric) {
if c.sentObservationsCounter != nil {
c.sentObservationsCounter.Collect(ch)
}
if c.includedObservationsCounter != nil {
c.includedObservationsCounter.Collect(ch)
}
}

// interceptingRegisterer wraps a Prometheus registerer to intercept specific metric registrations
type interceptingRegisterer struct {
base prometheus.Registerer
Expand Down
Loading
Loading