Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/compliance-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ jobs:
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- run: cd remotewrite && make sender
# Skip ST tests on CI as Prometheus does not implement those yet (see PROM-60).
- run: cd remotewrite && make sender PROMETHEUS_RW_COMPLIANCE_SKIP_TEST_RE="TestSample/rw2/prometheus/start_timestamp*"
5 changes: 3 additions & 2 deletions remotewrite/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ RESULT_FILE="$(ROOT_DIR)/results.json"

# Go test arguments and the base execution command.
BASE_TEST_ARGS = -run $(PROMETHEUS_RW_COMPLIANCE_TEST_RE) -skip $(PROMETHEUS_RW_COMPLIANCE_SKIP_TEST_RE)
RUN_TEST = cd ./$(COMPONENT) && DEBUG=$(DEBUG) $(COMPONENT_ENV) go test $(BASE_TEST_ARGS)
RUN_TEST = cd ./$(COMPONENT) && GOWORK=off DEBUG=$(DEBUG) $(COMPONENT_ENV) go test $(BASE_TEST_ARGS)

# Use target-specific variables to dynamically inject directories and environment variables.
sender sender-%: PROMETHEUS_COMPLIANCE_RW_SENDERS="prometheus"
sender sender-%: PROMETHEUS_COMPLIANCE_RW_PROCESS_CMD=""
sender sender-%: COMPONENT = sender
sender sender-%: COMPONENT_ENV = PROMETHEUS_COMPLIANCE_RW_SENDERS=$(PROMETHEUS_COMPLIANCE_RW_SENDERS)
sender sender-%: COMPONENT_ENV = PROMETHEUS_COMPLIANCE_RW_SENDERS=$(PROMETHEUS_COMPLIANCE_RW_SENDERS) PROMETHEUS_COMPLIANCE_RW_PROCESS_CMD=$(PROMETHEUS_COMPLIANCE_RW_PROCESS_CMD)

receiver receiver-%: COMPONENT = receiver
receiver receiver-%: COMPONENT_ENV = PROMETHEUS_RW2_COMPLIANCE_RECEIVERS=$(PROMETHEUS_RW2_COMPLIANCE_RECEIVERS)
Expand Down
1 change: 0 additions & 1 deletion remotewrite/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ For testing custom senders:

* Add target running code and register it the `sender/main_test.go`.

TODO: Implement that.
* Use custom process target via `PROMETHEUS_COMPLIANCE_RW_SENDERS="process"` and `PROMETHEUS_COMPLIANCE_RW_PROCESS_BINARY=<path>` envvars.

**Debug output:**
Expand Down
5 changes: 4 additions & 1 deletion remotewrite/sender/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,24 @@ go 1.25.0

require (
github.com/golang/snappy v1.0.0
github.com/oklog/run v1.2.0
github.com/prometheus/client_golang/exp v0.0.0-20250914183048-a974e0d45e0a
github.com/prometheus/prometheus v0.307.4-0.20251119130332-1174b0ce4f1f
github.com/stretchr/testify v1.11.1
gopkg.in/yaml.v3 v3.0.1
)

require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/grafana/regexp v0.0.0-20250905093917-f7b3be9d1853 // indirect
github.com/klauspost/compress v1.18.1 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.67.2 // indirect
go.yaml.in/yaml/v2 v2.4.3 // indirect
golang.org/x/text v0.30.0 // indirect
google.golang.org/protobuf v1.36.10 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
6 changes: 6 additions & 0 deletions remotewrite/sender/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,19 @@ github.com/grafana/regexp v0.0.0-20250905093917-f7b3be9d1853 h1:cLN4IBkmkYZNnk7E
github.com/grafana/regexp v0.0.0-20250905093917-f7b3be9d1853/go.mod h1:+JKpmjMGhpgPL+rXZ5nsZieVzvarn86asRlBg4uNGnk=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co=
github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/oklog/run v1.2.0 h1:O8x3yXwah4A73hJdlrwo/2X6J62gE5qTMusH0dvz60E=
github.com/oklog/run v1.2.0/go.mod h1:mgDbKRSwPhJfesJ4PntqFUbKQRZ50NgmZTSPlFA0YFk=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang/exp v0.0.0-20250914183048-a974e0d45e0a h1:RF1vfKM34/3DbGNis22BGd6sDDY3XBi0eM7pYqmOEO0=
github.com/prometheus/client_golang/exp v0.0.0-20250914183048-a974e0d45e0a/go.mod h1:FGJuwvfcPY0V5enm+w8zF1RNS062yugQtPPQp1c4Io4=
github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
github.com/prometheus/common v0.67.2 h1:PcBAckGFTIHt2+L3I33uNRTlKTplNzFctXcWhPyAEN8=
Expand Down
127 changes: 59 additions & 68 deletions remotewrite/sender/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,28 @@ import (
"io"
"net/http"
"net/http/httptest"
"sort"
"strings"
"sync"
"testing"
"time"

"github.com/golang/snappy"
"github.com/prometheus/compliance/remotewrite/sender/targets"
writev1 "github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/stretchr/testify/require"
)

// CapturedRequest represents a captured HTTP request from a sender.
// DEPRECATED: To kill, use sendertest.
type CapturedRequest struct {
Headers http.Header
Body []byte
Request *writev2.Request
}

// MockReceiver is an HTTP server that captures remote write requests.
// DEPRECATED: To kill, use sendertest.
type MockReceiver struct {
server *httptest.Server
mu sync.Mutex
Expand All @@ -45,6 +48,7 @@ type MockReceiver struct {
}

// MockReceiverResponse configures the response behavior of the mock receiver.
// DEPRECATED: To kill, use sendertest.
type MockReceiverResponse struct {
StatusCode int
Headers map[string]string
Expand All @@ -55,6 +59,7 @@ type MockReceiverResponse struct {
}

// NewMockReceiver creates a new mock HTTP receiver for testing senders.
// DEPRECATED: To kill, use sendertest.
func NewMockReceiver() *MockReceiver {
mr := &MockReceiver{
requests: make([]CapturedRequest, 0),
Expand Down Expand Up @@ -178,26 +183,8 @@ func (mr *MockReceiver) SetResponse(resp MockReceiverResponse) {
mr.response = resp
}

// WaitForRequests waits for at least n requests to be captured, with configurable timeout.
// Polls every 100ms. Returns all captured requests.
func (mr *MockReceiver) WaitForRequests(n int, timeout time.Duration) []CapturedRequest {
deadline := time.Now().Add(timeout)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for time.Now().Before(deadline) {
requests := mr.GetRequests()
if len(requests) >= n {
return requests
}
<-ticker.C
}

// Return whatever we got on timeout
return mr.GetRequests()
}

// MockScrapeTarget is an HTTP server that serves metrics in Prometheus format.
// DEPRECATED: To kill, use sendertest.
type MockScrapeTarget struct {
server *httptest.Server
mu sync.Mutex
Expand Down Expand Up @@ -308,13 +295,15 @@ func extractExemplarLabels(ex *writev2.Exemplar, symbols []string) map[string]st

// must marks a test as having a "MUST" RFC compliance level.
// Tests marked with must() will fail on assertion failures.
// DEPRECATED: To kill, use sendertest.
func must(t *testing.T) *require.Assertions {
t.Helper()
t.Attr("rfcLevel", "MUST")
return require.New(t)
}

// should marks a test as having a "SHOULD" RFC compliance level.
// DEPRECATED: To kill, use sendertest.
func should(t *testing.T, condition bool, msg string) {
t.Helper()
t.Attr("rfcLevel", "SHOULD")
Expand All @@ -324,6 +313,7 @@ func should(t *testing.T, condition bool, msg string) {
}

// may marks a test as having a "MAY" RFC compliance level.
// DEPRECATED: To kill, use sendertest.
func may(t *testing.T, condition bool, msg string) {
t.Helper()
t.Attr("rfcLevel", "MAY")
Expand All @@ -333,6 +323,7 @@ func may(t *testing.T, condition bool, msg string) {
}

// recommended marks a test as having an "RECOMMENDED" compliance level.
// DEPRECATED: To kill, use sendertest.
func recommended(t *testing.T, condition bool, msg string) {
t.Helper()
t.Attr("rfcLevel", "RECOMMENDED")
Expand All @@ -341,44 +332,10 @@ func recommended(t *testing.T, condition bool, msg string) {
}
}

// validateSymbolTable validates that the symbol table follows RW 2.0 requirements.
func validateSymbolTable(t *testing.T, symbols []string) {
t.Helper()

must(t).NotEmpty(symbols, "Symbol table must not be empty")
must(t).Equal("", symbols[0], "First symbol (index 0) must be empty string")

// Check for duplicates (MUST requirement for deduplication).
seen := make(map[string]bool)
for _, sym := range symbols {
if seen[sym] && sym != "" {
// Duplicate non-empty strings found - this violates deduplication requirement.
must(t).Fail(fmt.Sprintf("Duplicate symbol found in symbol table: %q", sym))
}
seen[sym] = true
}
}

// validateLabelRefs validates that label references are valid.
func validateLabelRefs(t *testing.T, refs []uint32, symbols []string) {
t.Helper()

must(t).Equal(0, len(refs)%2, "Label refs length must be even (key-value pairs)")

for i, ref := range refs {
must(t).Less(int(ref), len(symbols),
"Label ref at index %d points to invalid symbol index %d (symbol table size: %d)",
i, ref, len(symbols))
}
}

// isSorted checks if labels are sorted lexicographically.
func isSorted(labels map[string]string, symbols []string, refs []uint32) bool {
// isSorted checks if label names are sorted lexicographically.
func isSorted(symbols []string, refs []uint32) bool {
var prevKey string
for i := 0; i < len(refs); i += 2 {
if i+1 >= len(refs) {
break
}
keyRef := refs[i]
if int(keyRef) >= len(symbols) {
return false
Expand All @@ -392,13 +349,31 @@ func isSorted(labels map[string]string, symbols []string, refs []uint32) bool {
return true
}

// isSortedRW1 checks if label names are sorted lexicographically.
func isSortedRW1(labels []writev1.Label) bool {
return sort.SliceIsSorted(labels, func(i, j int) bool {
return strings.Compare(labels[i].Name, labels[j].Name) < 0
})
}

func TestIsSorted(t *testing.T) {
symbols := []string{"", "a", "c", "b", "x", "__name__"}
require.True(t, isSorted(symbols, []uint32{5, 1, 1, 1, 3, 1, 2, 1, 4, 1}))
require.False(t, isSorted(symbols, []uint32{5, 1, 1, 1, 2, 1, 3, 1, 4, 1}))

require.True(t, isSortedRW1([]writev1.Label{{Name: "__name__"}, {Name: "a"}, {Name: "b"}, {Name: "c"}}))
require.False(t, isSortedRW1([]writev1.Label{{Name: "__name__"}, {Name: "a"}, {Name: "x"}, {Name: "c"}}))
require.False(t, isSortedRW1([]writev1.Label{{Name: "a"}, {Name: "b"}, {Name: "c"}, {Name: "__name__"}}))
}

// containsExemplars checks if the metrics string contains exemplar annotations.
// Exemplars in OpenMetrics format are indicated by "# {" after a metric value.
func containsExemplars(metrics string) bool {
return strings.Contains(metrics, "# {")
}

// TestCase represents a single test case for compliance testing.
// DEPRECATED: To kill, use sendertest.
type TestCase struct {
Name string
Description string
Expand All @@ -408,6 +383,7 @@ type TestCase struct {
}

// runTestCases is a helper that eliminates the common test table runner pattern.
// DEPRECATED: To kill, use sendertest.
func runTestCases(t *testing.T, tests []TestCase) {
for _, tt := range tests {
t.Run(tt.Name, func(t *testing.T) {
Expand All @@ -426,10 +402,10 @@ func runTestCases(t *testing.T, tests []TestCase) {
}

// findTimeseriesByMetricName finds a timeseries by metric name from a captured request.
func findTimeseriesByMetricName(req *CapturedRequest, metricName string) (*writev2.TimeSeries, map[string]string) {
for i := range req.Request.Timeseries {
ts := &req.Request.Timeseries[i]
labels := extractLabels(ts, req.Request.Symbols)
func findTimeseriesByMetricName(req *writev2.Request, metricName string) (*writev2.TimeSeries, map[string]string) {
for i := range req.Timeseries {
ts := &req.Timeseries[i]
labels := extractLabels(ts, req.Symbols)
if labels["__name__"] == metricName {
return ts, labels
}
Expand All @@ -438,21 +414,36 @@ func findTimeseriesByMetricName(req *CapturedRequest, metricName string) (*write
}

// requireTimeseriesByMetricName finds a timeseries by metric name and fails the test if not found.
func requireTimeseriesByMetricName(t *testing.T, req *CapturedRequest, metricName string) (*writev2.TimeSeries, map[string]string) {
func requireTimeseriesByMetricName(t *testing.T, req *writev2.Request, metricName string) (*writev2.TimeSeries, map[string]string) {
t.Helper()
ts, labels := findTimeseriesByMetricName(req, metricName)
must(t).NotNil(ts, "Timeseries with metric name %q must be present", metricName)
require.NotNil(t, ts, "Timeseries with metric name %q must be present", metricName)
return ts, labels
}

// requireTimeseriesRW1ByMetricName finds a timeseries by metric name and fails the test if not found.
func requireTimeseriesRW1ByMetricName(t *testing.T, req *writev1.WriteRequest, metricName string) *writev1.TimeSeries {
t.Helper()

for i := range req.Timeseries {
for _, l := range req.Timeseries[i].Labels {
if l.Name == "__name__" && l.Value == metricName {
return &req.Timeseries[i]
}
}
}
t.Fatalf("Timeseries with metric name %q must be present", metricName)
return nil
}

// findHistogramData attempts to find histogram data in both classic and native formats.
// Returns (classicFound, nativeTS) where:
// - classicFound: true if classic histogram metrics (_count, _sum, _bucket) are found
// - nativeTS: pointer to timeseries containing native histogram, or nil if not found
func findHistogramData(req *CapturedRequest, baseName string) (classicFound bool, nativeTS *writev2.TimeSeries) {
for i := range req.Request.Timeseries {
ts := &req.Request.Timeseries[i]
labels := extractLabels(ts, req.Request.Symbols)
func findHistogramData(req *writev2.Request, baseName string) (classicFound bool, nativeTS *writev2.TimeSeries) {
for i := range req.Timeseries {
ts := &req.Timeseries[i]
labels := extractLabels(ts, req.Symbols)
metricName := labels["__name__"]

// Check for classic histogram components.
Expand All @@ -470,7 +461,7 @@ func findHistogramData(req *CapturedRequest, baseName string) (classicFound bool

// extractHistogramCount extracts count from either classic or native histogram format.
// Returns (count, found) where found indicates if count was successfully extracted.
func extractHistogramCount(req *CapturedRequest, baseName string) (float64, bool) {
func extractHistogramCount(req *writev2.Request, baseName string) (float64, bool) {
// Try classic format first.
ts, _ := findTimeseriesByMetricName(req, baseName+"_count")
if ts != nil && len(ts.Samples) > 0 {
Expand All @@ -494,7 +485,7 @@ func extractHistogramCount(req *CapturedRequest, baseName string) (float64, bool
}

// extractHistogramSum extracts sum from either classic or native histogram format.
func extractHistogramSum(req *CapturedRequest, baseName string) (float64, bool) {
func extractHistogramSum(req *writev2.Request, baseName string) (float64, bool) {
// Try classic format first.
ts, _ := findTimeseriesByMetricName(req, baseName+"_sum")
if ts != nil && len(ts.Samples) > 0 {
Expand Down
6 changes: 3 additions & 3 deletions remotewrite/sender/histograms_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ test_histogram_bucket{le="+Inf"} 10
// Note: This is a classic histogram, not native histogram.
// Native histograms use exponential buckets notation.
// For classic histograms, senders typically send as multiple timeseries.
classicFound, nativeTS := findHistogramData(req, "test_histogram")
classicFound, nativeTS := findHistogramData(req.Request, "test_histogram")
must(t).True(classicFound || nativeTS != nil,
"Histogram data must be present (either as count/sum/bucket or native format)")
},
Expand All @@ -52,7 +52,7 @@ test_histogram_sum 250.5
test_histogram_bucket{le="+Inf"} 100
`,
Validator: func(t *testing.T, req *CapturedRequest) {
count, found := extractHistogramCount(req, "test_histogram")
count, found := extractHistogramCount(req.Request, "test_histogram")
may(t, found, "Histogram count should be present in some form")
if found {
must(t).Equal(100.0, count, "Histogram count value must be correct")
Expand All @@ -69,7 +69,7 @@ test_histogram_sum 250.5
test_histogram_bucket{le="+Inf"} 100
`,
Validator: func(t *testing.T, req *CapturedRequest) {
sum, found := extractHistogramSum(req, "test_histogram")
sum, found := extractHistogramSum(req.Request, "test_histogram")
may(t, found, "Histogram sum should be present in some form")
if found {
must(t).Equal(250.5, sum, "Histogram sum value must be correct")
Expand Down
2 changes: 1 addition & 1 deletion remotewrite/sender/labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestLabelValidation_Old(t *testing.T) {
for _, ts := range req.Request.Timeseries {
labels := extractLabels(&ts, req.Request.Symbols)
if labels["__name__"] == "test_metric" {
must(t).True(isSorted(labels, req.Request.Symbols, ts.LabelsRefs),
must(t).True(isSorted(req.Request.Symbols, ts.LabelsRefs),
"Labels must be sorted in lexicographic order")
break
}
Expand Down
Loading