From 4ad8fc258188ba953393d353ab32820468ae63b3 Mon Sep 17 00:00:00 2001 From: Shvejan Mutheboyina Date: Tue, 21 Apr 2026 05:48:16 +0000 Subject: [PATCH 1/3] adding ingestion delay metric Signed-off-by: Shvejan Mutheboyina --- CHANGELOG.md | 1 + pkg/ingester/ingester.go | 12 +++ pkg/ingester/ingester_test.go | 187 ++++++++++++++++++++++++++++++++++ pkg/ingester/metrics.go | 10 ++ 4 files changed, 210 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 974a16d07a..2c955d3b91 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ * [ENHANCEMENT] Parquet Converter: Add a ring status page to expose the ring status. #7455 * [ENHANCEMENT] Parquet: Add `-blocks-storage.bucket-store.parquet-query-concurrency` flag to configure the maximum number of concurrent goroutines applied at each level of parquet query processing in store-gateway: shard querying, row group processing, and column materialization. #7613 * [ENHANCEMENT] Parquet: Add a row ranges cache for parquet query filtering in querier and store-gateway. #7478 +* [ENHANCEMENT] Ingester: Add `cortex_ingester_ingestion_delay_seconds` native histogram metric to track the delay between sample ingestion time and sample timestamp. #6748 * [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420 * [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401 * [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359 diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 06d9f13747..a6963cd5f6 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1517,6 +1517,14 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte var newSeries []labels.Labels + delayObserver := i.metrics.ingestionDelaySeconds.WithLabelValues(userID) + nowMs := time.Now().UnixMilli() + observeDelay := func(timestampMs int64) { + if delayMs := nowMs - timestampMs; delayMs >= 0 { + delayObserver.Observe(float64(delayMs) / 1000.0) + } + } + for _, ts := range req.Timeseries { // The labels must be sorted (in our case, it's guaranteed a write request // has sorted labels once hit the ingester). @@ -1557,6 +1565,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte if ref != 0 { if _, err = app.Append(ref, copiedLabels, s.TimestampMs, s.Value); err == nil { succeededSamplesCount++ + observeDelay(s.TimestampMs) continue } @@ -1568,6 +1577,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte newSeries = append(newSeries, copiedLabels) } succeededSamplesCount++ + observeDelay(s.TimestampMs) continue } } @@ -1615,6 +1625,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte if ref != 0 { if _, err = app.AppendHistogram(ref, copiedLabels, hp.TimestampMs, h, fh); err == nil { succeededHistogramsCount++ + observeDelay(hp.TimestampMs) ingestedBucketsObserver.Observe(float64(hp.BucketCount())) continue } @@ -1627,6 +1638,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte newSeries = append(newSeries, copiedLabels) } succeededHistogramsCount++ + observeDelay(hp.TimestampMs) ingestedBucketsObserver.Observe(float64(hp.BucketCount())) continue } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 1f33a0ebc9..a886afedfe 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -2892,6 +2892,193 @@ func TestIngester_Push_OutOfOrderLabels(t *testing.T) { require.NoError(t, err) } +func TestIngester_IngestionDelayMetric(t *testing.T) { + metricLabelAdapters := []cortexpb.LabelAdapter{{Name: labels.MetricName, Value: "test"}} + metricLabels := cortexpb.FromLabelAdaptersToLabels(metricLabelAdapters) + + t.Run("float samples with delays", func(t *testing.T) { + registry := prometheus.NewRegistry() + cfg := defaultIngesterTestConfig(t) + cfg.LifecyclerConfig.JoinAfter = 0 + + ing, err := prepareIngesterWithBlocksStorage(t, cfg, registry) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) + defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck + + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any { + return ing.lifecycler.GetState() + }) + + userID := "user-1" + ctx := user.InjectOrgID(context.Background(), userID) + + // Push samples with different delays (oldest first to avoid OOO) + now := time.Now().UnixMilli() + samples := []cortexpb.Sample{ + {Value: 1, TimestampMs: now - 30000}, // 30 seconds ago + {Value: 2, TimestampMs: now - 10000}, // 10 seconds ago + {Value: 3, TimestampMs: now - 5000}, // 5 seconds ago + } + + for _, sample := range samples { + req := cortexpb.ToWriteRequest( + []labels.Labels{metricLabels}, + []cortexpb.Sample{sample}, + nil, + nil, + cortexpb.API, + ) + _, err = ing.Push(ctx, req) + require.NoError(t, err) + } + + // Verify metric exists and has 3 observations + metricFamily, err := registry.Gather() + require.NoError(t, err) + + var found bool + var sampleCount uint64 + var sampleSum float64 + for _, mf := range metricFamily { + if mf.GetName() == "cortex_ingester_ingestion_delay_seconds" { + for _, metric := range mf.GetMetric() { + for _, label := range metric.GetLabel() { + if label.GetName() == "user" && label.GetValue() == userID { + found = true + if metric.Histogram != nil { + sampleCount = metric.Histogram.GetSampleCount() + sampleSum = metric.Histogram.GetSampleSum() + } + } + } + } + } + } + + require.True(t, found, "ingestion delay metric not found") + assert.Equal(t, uint64(3), sampleCount, "expected 3 observations") + + // Verify delays were actually measured (sum should be positive and reasonable) + // We sent samples 30s, 10s, and 5s old, so total delay should be ~45s (with some execution time added) + assert.Greater(t, sampleSum, 40.0, "sum of delays should be at least 40s") + }) + + t.Run("future timestamps are filtered", func(t *testing.T) { + registry := prometheus.NewRegistry() + cfg := defaultIngesterTestConfig(t) + cfg.LifecyclerConfig.JoinAfter = 0 + + ing, err := prepareIngesterWithBlocksStorage(t, cfg, registry) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) + defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck + + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any { + return ing.lifecycler.GetState() + }) + + userID := "user-future" + ctx := user.InjectOrgID(context.Background(), userID) + + // Push sample with future timestamp + futureTimestamp := time.Now().UnixMilli() + 60000 // 60 seconds in the future + req := cortexpb.ToWriteRequest( + []labels.Labels{metricLabels}, + []cortexpb.Sample{{Value: 1, TimestampMs: futureTimestamp}}, + nil, + nil, + cortexpb.API, + ) + + _, err = ing.Push(ctx, req) + require.NoError(t, err) + + // Verify metric has 0 observations (future timestamp filtered) + metricFamily, err := registry.Gather() + require.NoError(t, err) + + for _, mf := range metricFamily { + if mf.GetName() == "cortex_ingester_ingestion_delay_seconds" { + for _, metric := range mf.GetMetric() { + for _, label := range metric.GetLabel() { + if label.GetName() == "user" && label.GetValue() == userID { + if metric.Histogram != nil { + assert.Equal(t, uint64(0), metric.Histogram.GetSampleCount(), + "future timestamp should not be observed") + } + } + } + } + } + } + }) + + t.Run("per-user isolation", func(t *testing.T) { + registry := prometheus.NewRegistry() + cfg := defaultIngesterTestConfig(t) + cfg.LifecyclerConfig.JoinAfter = 0 + + ing, err := prepareIngesterWithBlocksStorage(t, cfg, registry) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) + defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck + + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any { + return ing.lifecycler.GetState() + }) + + // Push samples for two users + baseTime := time.Now().UnixMilli() + for _, u := range []struct { + id string + numSamples int + delayMs int64 + }{ + {"user-a", 2, 5000}, + {"user-b", 3, 30000}, + } { + ctx := user.InjectOrgID(context.Background(), u.id) + // Send in chronological order (oldest first) + for idx := u.numSamples - 1; idx >= 0; idx-- { + timestamp := baseTime - u.delayMs - int64(idx*1000) + req := cortexpb.ToWriteRequest( + []labels.Labels{metricLabels}, + []cortexpb.Sample{{Value: float64(idx + 1), TimestampMs: timestamp}}, + nil, + nil, + cortexpb.API, + ) + _, err := ing.Push(ctx, req) + require.NoError(t, err) + } + } + + // Verify each user has their own metric + metricFamily, err := registry.Gather() + require.NoError(t, err) + + userCounts := make(map[string]uint64) + for _, mf := range metricFamily { + if mf.GetName() == "cortex_ingester_ingestion_delay_seconds" { + for _, metric := range mf.GetMetric() { + for _, label := range metric.GetLabel() { + if label.GetName() == "user" { + userID := label.GetValue() + if metric.Histogram != nil { + userCounts[userID] = metric.Histogram.GetSampleCount() + } + } + } + } + } + } + + assert.Equal(t, uint64(2), userCounts["user-a"]) + assert.Equal(t, uint64(3), userCounts["user-b"]) + }) +} + func BenchmarkIngesterPush(b *testing.B) { limits := defaultLimitsTestConfig() benchmarkIngesterPush(b, limits, false) diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index bd0ed95ebf..3cc9f5bf01 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -42,6 +42,7 @@ type ingesterMetrics struct { ingestedExemplarsFail prometheus.Counter ingestedMetadataFail prometheus.Counter ingestedHistogramBuckets *prometheus.HistogramVec + ingestionDelaySeconds *prometheus.HistogramVec oooLabelsTotal *prometheus.CounterVec queries prometheus.Counter queriedSamples prometheus.Histogram @@ -149,6 +150,14 @@ func newIngesterMetrics(r prometheus.Registerer, NativeHistogramMinResetDuration: 1 * time.Hour, Buckets: prometheus.ExponentialBuckets(1, 2, 10), // 1 to 512 buckets }, []string{"user"}), + ingestionDelaySeconds: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Name: "cortex_ingester_ingestion_delay_seconds", + Help: "Delay in seconds between sample ingestion time and sample timestamp.", + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: 1, + Buckets: []float64{1, 5, 10, 30, 60, 120, 300, 600}, // 1s, 5s, 10s, 30s, 1m, 2m, 5m, 10m + }, []string{"user"}), oooLabelsTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_ingester_out_of_order_labels_total", Help: "The total number of out of order label found per user.", @@ -413,6 +422,7 @@ func (m *ingesterMetrics) deletePerUserMetrics(userID string) { m.limitsPerLabelSet.DeletePartialMatch(prometheus.Labels{"user": userID}) m.pushErrorsTotal.DeletePartialMatch(prometheus.Labels{"user": userID}) m.ingestedHistogramBuckets.DeleteLabelValues(userID) + m.ingestionDelaySeconds.DeleteLabelValues(userID) if m.memSeriesCreatedTotal != nil { m.memSeriesCreatedTotal.DeleteLabelValues(userID) From 27bf2afad7547e51f8e7f8f1746268377d4d5a10 Mon Sep 17 00:00:00 2001 From: Shvejan Mutheboyina Date: Mon, 29 Jun 2026 00:52:43 +0000 Subject: [PATCH 2/3] updating pr id Signed-off-by: Shvejan Mutheboyina --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c955d3b91..e80213b0d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,7 +20,7 @@ * [ENHANCEMENT] Parquet Converter: Add a ring status page to expose the ring status. #7455 * [ENHANCEMENT] Parquet: Add `-blocks-storage.bucket-store.parquet-query-concurrency` flag to configure the maximum number of concurrent goroutines applied at each level of parquet query processing in store-gateway: shard querying, row group processing, and column materialization. #7613 * [ENHANCEMENT] Parquet: Add a row ranges cache for parquet query filtering in querier and store-gateway. #7478 -* [ENHANCEMENT] Ingester: Add `cortex_ingester_ingestion_delay_seconds` native histogram metric to track the delay between sample ingestion time and sample timestamp. #6748 +* [ENHANCEMENT] Ingester: Add `cortex_ingester_ingestion_delay_seconds` native histogram metric to track the delay between sample ingestion time and sample timestamp. #7443 * [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420 * [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401 * [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359 From 8a5e079dc1ef30ca3cf9eef93f19805e80768799 Mon Sep 17 00:00:00 2001 From: Shvejan Mutheboyina Date: Thu, 2 Jul 2026 06:27:54 +0000 Subject: [PATCH 3/3] observe delay for all samples, added unit tests Signed-off-by: Shvejan Mutheboyina --- pkg/ingester/ingester.go | 10 ++++++---- pkg/ingester/metrics_test.go | 17 ++++++++++++++++- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index a6963cd5f6..ff4817fe15 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1553,6 +1553,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte for _, s := range ts.Samples { var err error + // Observe ingestion delay for all samples (accepted and rejected) + observeDelay(s.TimestampMs) + if s.StartTimestampMs != 0 && s.TimestampMs != 0 { // TODO(SungJin1212): Change to AppendSTZeroSample after update the Prometheus v3.9.0+ if _, err = app.AppendCTZeroSample(ref, copiedLabels, s.TimestampMs, s.StartTimestampMs); err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { @@ -1565,7 +1568,6 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte if ref != 0 { if _, err = app.Append(ref, copiedLabels, s.TimestampMs, s.Value); err == nil { succeededSamplesCount++ - observeDelay(s.TimestampMs) continue } @@ -1577,7 +1579,6 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte newSeries = append(newSeries, copiedLabels) } succeededSamplesCount++ - observeDelay(s.TimestampMs) continue } } @@ -1601,6 +1602,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte fh *histogram.FloatHistogram ) + // Observe ingestion delay for all histograms (accepted and rejected) + observeDelay(hp.TimestampMs) + // Choose the decoder based on the histogram's proto type (the // CountInt/CountFloat oneof), not the count value. A float // histogram with a count of 0 (e.g. a staleness marker or an @@ -1625,7 +1629,6 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte if ref != 0 { if _, err = app.AppendHistogram(ref, copiedLabels, hp.TimestampMs, h, fh); err == nil { succeededHistogramsCount++ - observeDelay(hp.TimestampMs) ingestedBucketsObserver.Observe(float64(hp.BucketCount())) continue } @@ -1638,7 +1641,6 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte newSeries = append(newSeries, copiedLabels) } succeededHistogramsCount++ - observeDelay(hp.TimestampMs) ingestedBucketsObserver.Observe(float64(hp.BucketCount())) continue } diff --git a/pkg/ingester/metrics_test.go b/pkg/ingester/metrics_test.go index 378ada9cdf..ad10efd41a 100644 --- a/pkg/ingester/metrics_test.go +++ b/pkg/ingester/metrics_test.go @@ -84,16 +84,31 @@ func TestUnoptimizedRegexRejectedMetric(t *testing.T) { // Add metrics for multiple users m.unoptimizedRegexRejectedTotal.WithLabelValues("user1", "pattern_length").Inc() m.unoptimizedRegexRejectedTotal.WithLabelValues("user2", "cardinality").Inc() + m.ingestionDelaySeconds.WithLabelValues("user1").Observe(10.0) + m.ingestionDelaySeconds.WithLabelValues("user2").Observe(5.0) // Delete user1 metrics m.deletePerUserMetrics("user1") // Only user2 metrics should remain err := testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_ingester_ingestion_delay_seconds Delay in seconds between sample ingestion time and sample timestamp. + # TYPE cortex_ingester_ingestion_delay_seconds histogram + cortex_ingester_ingestion_delay_seconds_bucket{user="user2",le="1"} 0 + cortex_ingester_ingestion_delay_seconds_bucket{user="user2",le="5"} 1 + cortex_ingester_ingestion_delay_seconds_bucket{user="user2",le="10"} 1 + cortex_ingester_ingestion_delay_seconds_bucket{user="user2",le="30"} 1 + cortex_ingester_ingestion_delay_seconds_bucket{user="user2",le="60"} 1 + cortex_ingester_ingestion_delay_seconds_bucket{user="user2",le="120"} 1 + cortex_ingester_ingestion_delay_seconds_bucket{user="user2",le="300"} 1 + cortex_ingester_ingestion_delay_seconds_bucket{user="user2",le="600"} 1 + cortex_ingester_ingestion_delay_seconds_bucket{user="user2",le="+Inf"} 1 + cortex_ingester_ingestion_delay_seconds_sum{user="user2"} 5 + cortex_ingester_ingestion_delay_seconds_count{user="user2"} 1 # HELP cortex_ingester_unoptimized_regex_rejected_requests_total Total number of requests rejected due to unoptimized regex matcher limits per user and reason. # TYPE cortex_ingester_unoptimized_regex_rejected_requests_total counter cortex_ingester_unoptimized_regex_rejected_requests_total{reason="cardinality",user="user2"} 1 - `), "cortex_ingester_unoptimized_regex_rejected_requests_total") + `), "cortex_ingester_unoptimized_regex_rejected_requests_total", "cortex_ingester_ingestion_delay_seconds") require.NoError(t, err) }) }