Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ linters:
- gocritic
- godot
- gofumpt
- gosec
- goimports
- misspell
- nolintlint
Expand All @@ -39,12 +40,12 @@ issues:
exclude-files:
# Skip autogenerated files.
- ^.*\.(pb|y)\.go$
# cgo bridge to the C++ core: identifiers intentionally mirror the C++ ABI.
- ^pp/go/cppbridge/entrypoint\.go$
exclude-dirs:
# Copied it from a different source.
- storage/remote/otlptranslator/prometheusremotewrite
- storage/remote/otlptranslator/prometheus
- pp
- pp-pkg
exclude-rules:
- linters:
- errcheck
Expand Down Expand Up @@ -74,7 +75,29 @@ issues:
- linters:
- perfsprint
text: "fmt.Sprintf can be replaced with string concatenation"
# gosec is too noisy in tests (weak rand, file perms, unhandled errors in helpers).
- path: _test\.go
linters:
- gosec
# cppbridge identifiers intentionally mirror the C++ ABI (Id, snake_case fields).
- path: ^pp/go/cppbridge/
linters:
- revive
text: "var-naming"
# fastcgo declares Go mirrors of runtime structs purely for memory layout.
- path: ^pp/go/cppbridge/fastcgo/
linters:
- unused
linters-settings:
govet:
enable:
- shadow
gosec:
excludes:
# G103: use of unsafe is intentional and pervasive in the C++ bridge.
- G103
# G104: unhandled errors are already covered (with excludes) by errcheck.
- G104
depguard:
rules:
main:
Expand Down
2 changes: 1 addition & 1 deletion pp-pkg/handler/adapter/remote_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type RemoteWrite struct {
contentLength int
}

// NewRefill init new RemoteWrite.
// NewRemoteWrite initializes a new RemoteWrite.
func NewRemoteWrite(
reader io.ReadCloser,
writer http.ResponseWriter,
Expand Down
4 changes: 2 additions & 2 deletions pp-pkg/handler/otlp_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ func (h *OTLPWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

converter := NewPPConverter(h.logger, req.Metrics().MetricCount())
if err := converter.FromMetrics(req.Metrics()); err != nil {
level.Warn(h.logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err)
if convErr := converter.FromMetrics(req.Metrics()); convErr != nil {
level.Warn(h.logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", convErr)
}

stats, err := h.adapter.AppendTimeSeries(
Expand Down
4 changes: 2 additions & 2 deletions pp-pkg/handler/pp_converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *PPConverterSuite) TestEmptyLabelsName() {

actual := ppconverter.TimeSeries().TimeSeries()

s.Require().Len(actual, 0)
s.Require().Empty(actual)
}

func (s *PPConverterSuite) TestEmptyLabelsValue() {
Expand All @@ -92,7 +92,7 @@ func (s *PPConverterSuite) TestEmptyLabelsValue() {

actual := ppconverter.TimeSeries().TimeSeries()

s.Require().Len(actual, 0)
s.Require().Empty(actual)
}

func createExportRequest(
Expand Down
3 changes: 2 additions & 1 deletion pp-pkg/handler/states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package handler_test
import (
"testing"

"github.com/stretchr/testify/suite"

"github.com/prometheus/prometheus/config"
pp_pkg_config "github.com/prometheus/prometheus/pp-pkg/config"
"github.com/prometheus/prometheus/pp-pkg/handler"
"github.com/prometheus/prometheus/pp/go/cppbridge"
rconfig "github.com/prometheus/prometheus/pp/go/relabeler/config"
"github.com/stretchr/testify/suite"
)

type StatesStorageSuite struct {
Expand Down
5 changes: 3 additions & 2 deletions pp-pkg/handler/storage/block/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ func (s *Storage) Writer(blockID uuid.UUID, shardID uint16, shardLog, segmentEnc
fileFn: func() (*os.File, error) {
fileName := path.Join(s.dir, fmt.Sprintf("%s-%d", blockID.String(), shardID))
dir := filepath.Dir(fileName)
if err := os.MkdirAll(dir, 0o744); err != nil {
if err := os.MkdirAll(dir, 0o750); err != nil {
return nil, err
}
return os.OpenFile(fileName, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0o644)
//nolint:gosec // fileName is derived from the configured storage dir and block/shard IDs
return os.OpenFile(fileName, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0o600)
},
blockHeader: storage.BlockHeader{
FileVersion: FileVersion,
Expand Down
5 changes: 3 additions & 2 deletions pp-pkg/remote/config.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package remote

import (
"crypto/md5"
"crypto/md5" //nolint:gosec // md5 is used only to fingerprint configs, not for security
"encoding/hex"
"time"

"gopkg.in/yaml.v2"

"github.com/prometheus/common/model"

"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pp/go/storage/remotewriter"
)
Expand Down Expand Up @@ -53,6 +54,6 @@ func toHash(data interface{}) (string, error) {
if err != nil {
return "", err
}
hash := md5.Sum(bytes)
hash := md5.Sum(bytes) //nolint:gosec // md5 is used only to fingerprint configs, not for security
return hex.EncodeToString(hash[:]), nil
}
9 changes: 5 additions & 4 deletions pp-pkg/rules/alerting_stuck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/stretchr/testify/require"
)

// scriptedQuery returns a QueryFunc that hands out canned promql.Vector values
Expand Down Expand Up @@ -85,9 +86,9 @@ func TestAlertingRule_OnePassEmpty_TransitionsToInactive(t *testing.T) {
F: 0,
}}
q, _ := scriptedQuery([]promql.Vector{
hot, // t=0 : pending starts (new entry in r.active)
hot, // t=1m : transitions to firing (holdDuration met)
nil, // t=2m : empty → must transition to Inactive
hot, // t=0 : pending starts (new entry in r.active)
hot, // t=1m : transitions to firing (holdDuration met)
nil, // t=2m : empty → must transition to Inactive
})

t0 := time.Unix(0, 0).UTC()
Expand Down
8 changes: 4 additions & 4 deletions pp-pkg/rules/alerting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) {
t.Logf("case %d", i)
evalTime := baseTime.Add(time.Duration(i) * time.Minute)
result[0].T = timestamp.FromTime(evalTime)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0)
require.NoError(t, err)
res, evalErr := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0)
require.NoError(t, evalErr)

var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
for _, smpl := range res {
Expand Down Expand Up @@ -885,8 +885,8 @@ func TestKeepFiringFor(t *testing.T) {
t.Logf("case %d", i)
evalTime := baseTime.Add(time.Duration(i) * time.Minute)
result[0].T = timestamp.FromTime(evalTime)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0)
require.NoError(t, err)
res, evalErr := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0)
require.NoError(t, evalErr)

var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
for _, smpl := range res {
Expand Down
3 changes: 2 additions & 1 deletion pp-pkg/rules/control_plane_expr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import (
"testing"
"time"

"github.com/prometheus/prometheus/promql/promqltest"
"github.com/stretchr/testify/require"

"github.com/prometheus/prometheus/promql/promqltest"
)

const cpmExpr = `max by (node) (kube_node_role{role="master"} unless kube_node_role{role="master"}` +
Expand Down
23 changes: 11 additions & 12 deletions pp-pkg/rules/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ func (g *Group) run(ctx context.Context) {
level.Error(g.logger).Log("msg", "Failed to commit batch storage", "err", err)
return
}

}
}(time.Now())
}()
Expand Down Expand Up @@ -939,7 +938,7 @@ func (g *Group) concurrencyEval(ctx context.Context, ts time.Time, bs storage.Ba
}

logger := log.WithPrefix(g.logger, "name", rule.Name(), "index", i)
ctx, sp := otel.Tracer("").Start(ctx, "rule")
spanCtx, sp := otel.Tracer("").Start(ctx, "rule")
sp.SetAttributes(attribute.String("name", rule.Name()))
defer func(t time.Time) {
sp.End()
Expand All @@ -956,7 +955,7 @@ func (g *Group) concurrencyEval(ctx context.Context, ts time.Time, bs storage.Ba

g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()

vector, err := rule.Eval(ctx, ruleQueryOffset, ts, queryFunc, g.opts.ExternalURL, g.Limit())
vector, err := rule.Eval(spanCtx, ruleQueryOffset, ts, queryFunc, g.opts.ExternalURL, g.Limit())
if err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
Expand All @@ -976,7 +975,7 @@ func (g *Group) concurrencyEval(ctx context.Context, ts time.Time, bs storage.Ba
samplesTotal.Add(float64(len(vector)))

if ar, ok := rule.(*AlertingRule); ok {
ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
ar.sendAlerts(spanCtx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
}

mtx.Lock()
Expand Down Expand Up @@ -1073,7 +1072,7 @@ func (g *Group) sequentiallyEval(
}

logger := log.WithPrefix(g.logger, "name", rule.Name(), "index", i)
ctx, sp := otel.Tracer("").Start(ctx, "rule")
spanCtx, sp := otel.Tracer("").Start(ctx, "rule")
sp.SetAttributes(attribute.String("name", rule.Name()))
defer func(t time.Time) {
sp.End()
Expand All @@ -1090,7 +1089,7 @@ func (g *Group) sequentiallyEval(

g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()

vector, err := rule.Eval(ctx, ruleQueryOffset, ts, queryFunc, g.opts.ExternalURL, g.Limit())
vector, err := rule.Eval(spanCtx, ruleQueryOffset, ts, queryFunc, g.opts.ExternalURL, g.Limit())
if err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
Expand All @@ -1110,18 +1109,18 @@ func (g *Group) sequentiallyEval(
samplesTotal += float64(len(vector))

if ar, ok := rule.(*AlertingRule); ok {
ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
ar.sendAlerts(spanCtx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
}

app := bs.Appender(ctx)
app := bs.Appender(spanCtx)
defer func() {
if err := app.Commit(); err != nil {
if commitErr := app.Commit(); commitErr != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
sp.SetStatus(codes.Error, err.Error())
rule.SetLastError(commitErr)
sp.SetStatus(codes.Error, commitErr.Error())
g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()

level.Warn(logger).Log("msg", "Rule sample appending failed", "err", err)
level.Warn(logger).Log("msg", "Rule sample appending failed", "err", commitErr)
return
}
}()
Expand Down
2 changes: 1 addition & 1 deletion pp-pkg/rules/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ func readSeriesSet(ss storage.SeriesSet) (map[string][]promql.FPoint, error) {
series := ss.At()

points := []promql.FPoint{}
it := series.Iterator(it)
it = series.Iterator(it)
for it.Next() == chunkenc.ValFloat {
t, v := it.At()
points = append(points, promql.FPoint{T: t, F: v})
Expand Down
1 change: 1 addition & 0 deletions pp-pkg/scrape/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/klauspost/compress/gzip"

"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
Expand Down
8 changes: 4 additions & 4 deletions pp-pkg/scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,12 +444,12 @@ func (sp *scrapePool) restartLoops(reuseCache bool) {
}

t := sp.activeTargets[fp]
interval, timeout, err := t.intervalAndTimeout(interval, timeout)
iterInterval, iterTimeout, err := t.intervalAndTimeout(interval, timeout)
var (
s = &targetScraper{
Target: t,
client: sp.client,
timeout: timeout,
timeout: iterTimeout,
bodySizeLimit: bodySizeLimit,
acceptHeader: acceptHeader(sp.config.ScrapeProtocols, validationScheme),
acceptEncodingHeader: acceptEncodingHeader(enableCompression),
Expand All @@ -465,8 +465,8 @@ func (sp *scrapePool) restartLoops(reuseCache bool) {
trackTimestampsStaleness: trackTimestampsStaleness,
mrc: mrc,
cache: cache,
interval: interval,
timeout: timeout,
interval: iterInterval,
timeout: iterTimeout,
validationScheme: validationScheme,
})
)
Expand Down
10 changes: 5 additions & 5 deletions pp-pkg/scrape/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,13 @@ func PopulateLabels(lb *labels.Builder, cfg *config.ScrapeConfig, noDefaultPort
// If the address is not valid, we don't append a port either.
addPort := func(s string) (string, string, bool) {
// If we can split, a port exists and we don't have to add one.
if host, port, err := net.SplitHostPort(s); err == nil {
if host, port, splitErr := net.SplitHostPort(s); splitErr == nil {
return host, port, false
}
// If adding a port makes it valid, the previous error
// was not due to an invalid address and we can append a port.
_, _, err := net.SplitHostPort(s + ":1234")
return "", "", err == nil
_, _, checkErr := net.SplitHostPort(s + ":1234")
return "", "", checkErr == nil
}

addr := lb.Get(model.AddressLabel)
Expand Down Expand Up @@ -394,8 +394,8 @@ func PopulateLabels(lb *labels.Builder, cfg *config.ScrapeConfig, noDefaultPort
}
}

if err := config.CheckTargetAddress(model.LabelValue(addr)); err != nil {
return labels.EmptyLabels(), labels.EmptyLabels(), err
if addrErr := config.CheckTargetAddress(model.LabelValue(addr)); addrErr != nil {
return labels.EmptyLabels(), labels.EmptyLabels(), addrErr
}

interval := lb.Get(model.ScrapeIntervalLabel)
Expand Down
22 changes: 11 additions & 11 deletions pp-pkg/storage/adapter_promql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ func (s *AdapterPromQLSuite) TestCPM_PodReplaced_AcrossRotation_PromppStorage()
innerV := s.queryAt(
`(kube_pod_status_ready{condition="true"} == 1) * on (pod, namespace) group_right () `+
`kube_controller_pod{controller_name="d8-control-plane-manager",controller_type="DaemonSet",namespace="kube-system"}`, t20m)
if !s.Equal(3, len(innerV), "INNER must yield exactly 3 series at t=20m — got %d:\n%v", len(innerV), innerV) {
if !s.Len(innerV, 3, "INNER must yield exactly 3 series at t=20m — got %d:\n%v", len(innerV), innerV) {
for i, ss := range innerV {
s.T().Logf(" inner[%d] = %s", i, ss.Metric.String())
}
Expand Down Expand Up @@ -769,11 +769,11 @@ func (s *AdapterPromQLSuite) TestCPM_RecordingRuleMissedIterations_AlertStaysFir
// cppbridge head ingest path needs more than 19 ms to make the new sample
// visible to a querier. Result:
//
// - alert eval @ T+6.329 sees the LATEST committed sample at T-60+6.308
// (previous minute) → distance = 60.021 s > LookbackDelta (60 s)
// - INNER subexpression is empty → `unless` cancels nothing
// - `max by (node)` returns all three masters → alert FIRING
// - API queries (run later) see the new sample already committed → empty
// - alert eval @ T+6.329 sees the LATEST committed sample at T-60+6.308
// (previous minute) → distance = 60.021 s > LookbackDelta (60 s)
// - INNER subexpression is empty → `unless` cancels nothing
// - `max by (node)` returns all three masters → alert FIRING
// - API queries (run later) see the new sample already committed → empty
//
// The fix is to add `query_offset` (per-group or global). An offset of 30 s
// pushes alert eval to ask storage for time `T-23.671`; the previous-minute
Expand All @@ -787,11 +787,11 @@ func (s *AdapterPromQLSuite) TestCPM_RecordingRuleMissedIterations_AlertStaysFir
// behind the alert eval timestamp).
func (s *AdapterPromQLSuite) TestCPM_OffsetCollisionRace_FixedByQueryOffset() {
const (
stepMs = int64(60_000)
offsetMs = int64(6_000) // shared offset of both groups inside the minute
alertJitterMs = int64(329) // alert eval starts 329 ms after minute+offset (matches prod 6.329)
nLastWritten = int64(29) // we write kube_controller_pod up to and including this step
alertStep = int64(30) // alert eval happens at this step's tick
stepMs = int64(60_000)
offsetMs = int64(6_000) // shared offset of both groups inside the minute
alertJitterMs = int64(329) // alert eval starts 329 ms after minute+offset (matches prod 6.329)
nLastWritten = int64(29) // we write kube_controller_pod up to and including this step
alertStep = int64(30) // alert eval happens at this step's tick
)

var points []scrapePoint
Expand Down
Loading
Loading