Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* [ENHANCEMENT] Parquet Converter: Add a ring status page to expose the ring status. #7455
* [ENHANCEMENT] Parquet: Add `-blocks-storage.bucket-store.parquet-query-concurrency` flag to configure the maximum number of concurrent goroutines applied at each level of parquet query processing in store-gateway: shard querying, row group processing, and column materialization. #7613
* [ENHANCEMENT] Parquet: Add a row ranges cache for parquet query filtering in querier and store-gateway. #7478
* [ENHANCEMENT] Ingester: Add `cortex_ingester_ingestion_delay_seconds` native histogram metric to track the delay between sample ingestion time and sample timestamp. #7443
* [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420
* [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401
* [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359
Expand Down
14 changes: 14 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1517,6 +1517,14 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte

var newSeries []labels.Labels

delayObserver := i.metrics.ingestionDelaySeconds.WithLabelValues(userID)
nowMs := time.Now().UnixMilli()
observeDelay := func(timestampMs int64) {
if delayMs := nowMs - timestampMs; delayMs >= 0 {
delayObserver.Observe(float64(delayMs) / 1000.0)
}
}

for _, ts := range req.Timeseries {
// The labels must be sorted (in our case, it's guaranteed a write request
// has sorted labels once hit the ingester).
Expand Down Expand Up @@ -1545,6 +1553,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
for _, s := range ts.Samples {
var err error

// Observe ingestion delay for all samples (accepted and rejected)
observeDelay(s.TimestampMs)

if s.StartTimestampMs != 0 && s.TimestampMs != 0 {
// TODO(SungJin1212): Change to AppendSTZeroSample after update the Prometheus v3.9.0+
if _, err = app.AppendCTZeroSample(ref, copiedLabels, s.TimestampMs, s.StartTimestampMs); err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) {
Expand Down Expand Up @@ -1591,6 +1602,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
fh *histogram.FloatHistogram
)

// Observe ingestion delay for all histograms (accepted and rejected)
observeDelay(hp.TimestampMs)

// Choose the decoder based on the histogram's proto type (the
// CountInt/CountFloat oneof), not the count value. A float
// histogram with a count of 0 (e.g. a staleness marker or an
Expand Down
187 changes: 187 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2892,6 +2892,193 @@ func TestIngester_Push_OutOfOrderLabels(t *testing.T) {
require.NoError(t, err)
}

func TestIngester_IngestionDelayMetric(t *testing.T) {
metricLabelAdapters := []cortexpb.LabelAdapter{{Name: labels.MetricName, Value: "test"}}
metricLabels := cortexpb.FromLabelAdaptersToLabels(metricLabelAdapters)

t.Run("float samples with delays", func(t *testing.T) {
registry := prometheus.NewRegistry()
cfg := defaultIngesterTestConfig(t)
cfg.LifecyclerConfig.JoinAfter = 0

ing, err := prepareIngesterWithBlocksStorage(t, cfg, registry)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck

test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any {
return ing.lifecycler.GetState()
})

userID := "user-1"
ctx := user.InjectOrgID(context.Background(), userID)

// Push samples with different delays (oldest first to avoid OOO)
now := time.Now().UnixMilli()
samples := []cortexpb.Sample{
{Value: 1, TimestampMs: now - 30000}, // 30 seconds ago
{Value: 2, TimestampMs: now - 10000}, // 10 seconds ago
{Value: 3, TimestampMs: now - 5000}, // 5 seconds ago
}

for _, sample := range samples {
req := cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{sample},
nil,
nil,
cortexpb.API,
)
_, err = ing.Push(ctx, req)
require.NoError(t, err)
}

// Verify metric exists and has 3 observations
metricFamily, err := registry.Gather()
require.NoError(t, err)

var found bool
var sampleCount uint64
var sampleSum float64
for _, mf := range metricFamily {
if mf.GetName() == "cortex_ingester_ingestion_delay_seconds" {
for _, metric := range mf.GetMetric() {
for _, label := range metric.GetLabel() {
if label.GetName() == "user" && label.GetValue() == userID {
found = true
if metric.Histogram != nil {
sampleCount = metric.Histogram.GetSampleCount()
sampleSum = metric.Histogram.GetSampleSum()
}
}
}
}
}
}

require.True(t, found, "ingestion delay metric not found")
assert.Equal(t, uint64(3), sampleCount, "expected 3 observations")

// Verify delays were actually measured (sum should be positive and reasonable)
// We sent samples 30s, 10s, and 5s old, so total delay should be ~45s (with some execution time added)
assert.Greater(t, sampleSum, 40.0, "sum of delays should be at least 40s")
})

t.Run("future timestamps are filtered", func(t *testing.T) {
registry := prometheus.NewRegistry()
cfg := defaultIngesterTestConfig(t)
cfg.LifecyclerConfig.JoinAfter = 0

ing, err := prepareIngesterWithBlocksStorage(t, cfg, registry)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck

test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any {
return ing.lifecycler.GetState()
})

userID := "user-future"
ctx := user.InjectOrgID(context.Background(), userID)

// Push sample with future timestamp
futureTimestamp := time.Now().UnixMilli() + 60000 // 60 seconds in the future
req := cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{{Value: 1, TimestampMs: futureTimestamp}},
nil,
nil,
cortexpb.API,
)

_, err = ing.Push(ctx, req)
require.NoError(t, err)

// Verify metric has 0 observations (future timestamp filtered)
metricFamily, err := registry.Gather()
require.NoError(t, err)

for _, mf := range metricFamily {
if mf.GetName() == "cortex_ingester_ingestion_delay_seconds" {
for _, metric := range mf.GetMetric() {
for _, label := range metric.GetLabel() {
if label.GetName() == "user" && label.GetValue() == userID {
if metric.Histogram != nil {
assert.Equal(t, uint64(0), metric.Histogram.GetSampleCount(),
"future timestamp should not be observed")
}
}
}
}
}
}
})

t.Run("per-user isolation", func(t *testing.T) {
registry := prometheus.NewRegistry()
cfg := defaultIngesterTestConfig(t)
cfg.LifecyclerConfig.JoinAfter = 0

ing, err := prepareIngesterWithBlocksStorage(t, cfg, registry)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck

test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any {
return ing.lifecycler.GetState()
})

// Push samples for two users
baseTime := time.Now().UnixMilli()
for _, u := range []struct {
id string
numSamples int
delayMs int64
}{
{"user-a", 2, 5000},
{"user-b", 3, 30000},
} {
ctx := user.InjectOrgID(context.Background(), u.id)
// Send in chronological order (oldest first)
for idx := u.numSamples - 1; idx >= 0; idx-- {
timestamp := baseTime - u.delayMs - int64(idx*1000)
req := cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{{Value: float64(idx + 1), TimestampMs: timestamp}},
nil,
nil,
cortexpb.API,
)
_, err := ing.Push(ctx, req)
require.NoError(t, err)
}
}

// Verify each user has their own metric
metricFamily, err := registry.Gather()
require.NoError(t, err)

userCounts := make(map[string]uint64)
for _, mf := range metricFamily {
if mf.GetName() == "cortex_ingester_ingestion_delay_seconds" {
for _, metric := range mf.GetMetric() {
for _, label := range metric.GetLabel() {
if label.GetName() == "user" {
userID := label.GetValue()
if metric.Histogram != nil {
userCounts[userID] = metric.Histogram.GetSampleCount()
}
}
}
}
}
}

assert.Equal(t, uint64(2), userCounts["user-a"])
assert.Equal(t, uint64(3), userCounts["user-b"])
})
}

func BenchmarkIngesterPush(b *testing.B) {
limits := defaultLimitsTestConfig()
benchmarkIngesterPush(b, limits, false)
Expand Down
10 changes: 10 additions & 0 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type ingesterMetrics struct {
ingestedExemplarsFail prometheus.Counter
ingestedMetadataFail prometheus.Counter
ingestedHistogramBuckets *prometheus.HistogramVec
ingestionDelaySeconds *prometheus.HistogramVec
oooLabelsTotal *prometheus.CounterVec
queries prometheus.Counter
queriedSamples prometheus.Histogram
Expand Down Expand Up @@ -149,6 +150,14 @@ func newIngesterMetrics(r prometheus.Registerer,
NativeHistogramMinResetDuration: 1 * time.Hour,
Buckets: prometheus.ExponentialBuckets(1, 2, 10), // 1 to 512 buckets
}, []string{"user"}),
ingestionDelaySeconds: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_ingester_ingestion_delay_seconds",
Help: "Delay in seconds between sample ingestion time and sample timestamp.",
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1,
Buckets: []float64{1, 5, 10, 30, 60, 120, 300, 600}, // 1s, 5s, 10s, 30s, 1m, 2m, 5m, 10m
}, []string{"user"}),
oooLabelsTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_out_of_order_labels_total",
Help: "The total number of out of order label found per user.",
Expand Down Expand Up @@ -413,6 +422,7 @@ func (m *ingesterMetrics) deletePerUserMetrics(userID string) {
m.limitsPerLabelSet.DeletePartialMatch(prometheus.Labels{"user": userID})
m.pushErrorsTotal.DeletePartialMatch(prometheus.Labels{"user": userID})
m.ingestedHistogramBuckets.DeleteLabelValues(userID)
m.ingestionDelaySeconds.DeleteLabelValues(userID)
Comment thread
Shvejan marked this conversation as resolved.

if m.memSeriesCreatedTotal != nil {
m.memSeriesCreatedTotal.DeleteLabelValues(userID)
Expand Down
17 changes: 16 additions & 1 deletion pkg/ingester/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,31 @@ func TestUnoptimizedRegexRejectedMetric(t *testing.T) {
// Add metrics for multiple users
m.unoptimizedRegexRejectedTotal.WithLabelValues("user1", "pattern_length").Inc()
m.unoptimizedRegexRejectedTotal.WithLabelValues("user2", "cardinality").Inc()
m.ingestionDelaySeconds.WithLabelValues("user1").Observe(10.0)
m.ingestionDelaySeconds.WithLabelValues("user2").Observe(5.0)

// Delete user1 metrics
m.deletePerUserMetrics("user1")

// Only user2 metrics should remain
err := testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP cortex_ingester_ingestion_delay_seconds Delay in seconds between sample ingestion time and sample timestamp.
# TYPE cortex_ingester_ingestion_delay_seconds histogram
cortex_ingester_ingestion_delay_seconds_bucket{user="user2",le="1"} 0
cortex_ingester_ingestion_delay_seconds_bucket{user="user2",le="5"} 1
cortex_ingester_ingestion_delay_seconds_bucket{user="user2",le="10"} 1
cortex_ingester_ingestion_delay_seconds_bucket{user="user2",le="30"} 1
cortex_ingester_ingestion_delay_seconds_bucket{user="user2",le="60"} 1
cortex_ingester_ingestion_delay_seconds_bucket{user="user2",le="120"} 1
cortex_ingester_ingestion_delay_seconds_bucket{user="user2",le="300"} 1
cortex_ingester_ingestion_delay_seconds_bucket{user="user2",le="600"} 1
cortex_ingester_ingestion_delay_seconds_bucket{user="user2",le="+Inf"} 1
cortex_ingester_ingestion_delay_seconds_sum{user="user2"} 5
cortex_ingester_ingestion_delay_seconds_count{user="user2"} 1
# HELP cortex_ingester_unoptimized_regex_rejected_requests_total Total number of requests rejected due to unoptimized regex matcher limits per user and reason.
# TYPE cortex_ingester_unoptimized_regex_rejected_requests_total counter
cortex_ingester_unoptimized_regex_rejected_requests_total{reason="cardinality",user="user2"} 1
`), "cortex_ingester_unoptimized_regex_rejected_requests_total")
`), "cortex_ingester_unoptimized_regex_rejected_requests_total", "cortex_ingester_ingestion_delay_seconds")
require.NoError(t, err)
})
}
Expand Down