Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Documentation

- docs: comprehensive accuracy audit across 9 doc files — fix wrong flag defaults (`-cb-open-duration` 2s→10s, `goMemLimitPercent` 70→85, `GOGC` 100→200, Go requirement 1.26.2→1.26.3, Loki benchmark stack 3.4.x→3.6.x), correct "not exposed as flags" claim for `-rate-limit-per-second`/`-rate-limit-burst`/`-max-concurrent` in configuration.md + operations.md + scaling.md, add missing Cold Storage Backend section to configuration.md (`-cold-enabled`, `-cold-backend`, `-cold-boundary`, `-cold-overlap`, `-cold-manifest-refresh`, `-cold-timeout`; shipped in 1.28.0 with zero doc coverage), add Go Runtime Tuning flag section, extend compatibility-loki.md release notes from v1.17.1 through v1.31.2, refresh roadmap.md Completed section through v1.31.2, clarify buildinfo `2.9.0` is intentional.

## [1.31.3] - 2026-05-13

### CI
Expand Down
2 changes: 2 additions & 0 deletions docs/KNOWN_ISSUES.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ equivalents.
| Startup warm readiness | When patterns or label-values startup warm is configured, readiness can remain `503` until disk restore or peer warm completes. |
| Older VictoriaLogs metadata paths | Newer VictoriaLogs versions let the proxy prefer stream-only metadata APIs. Older versions may fall back to broader field APIs, which can change how strictly stream-shaped some browse endpoints feel. |
| Large body fields | Very large body fields can still be dropped on the VictoriaLogs side. Track the upstream issue: [VictoriaLogs issue #91](https://github.com/VictoriaMetrics/victorialogs-datasource/issues/91). |
| Optional tenant header | The proxy accepts requests without an `X-Scope-OrgID` header and routes them to the default tenant. In deployments where tenant isolation is required, callers must supply the header; the proxy does not enforce a hard rejection. Add `X-Scope-OrgID` enforcement at the ingress/gateway layer if you need a strict tenant boundary. |
| Multi-tenant serial fanout | When `X-Scope-OrgID` contains multiple tenants (e.g. `tenant1\|tenant2\|tenant3`), the proxy issues sub-requests to each tenant sequentially. Latency scales linearly with the number of tenants. For high fan-out counts (>4 tenants), consider running per-tenant Grafana datasources instead of relying on the multi-tenant merge path. |

## What Is No Longer an Open Gap

Expand Down
26 changes: 20 additions & 6 deletions internal/proxy/cold_dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ func countNDJSONLines(body []byte) int {
//
// Returns the accumulated NDJSON rows in ascending time order.
func (p *Proxy) coldBackwardChunkedFetch(ctx context.Context, baseParams url.Values, startNs, endNs int64, limit int) ([]byte, error) {
var accumulated []byte
// Chunks are collected newest-to-oldest in a slice and joined at the end to avoid
// O(N²) copy amplification from prepend-into-growing-buffer on each iteration.
var chunks [][]byte
accCount := 0
chunkEnd := endNs
chunkDurNs := coldBackwardChunkDuration.Nanoseconds()
Expand All @@ -137,10 +139,19 @@ func (p *Proxy) coldBackwardChunkedFetch(ctx context.Context, baseParams url.Val
chunkStart = startNs
}

// Cap per-chunk limit to the remaining rows needed; never overfetch.
chunkLimit := limit - accCount
if chunkLimit <= 0 {
break
}
if chunkLimit > maxLimitValue {
chunkLimit = maxLimitValue
}

chunkParams := cloneURLValues(baseParams)
chunkParams.Set("start", strconv.FormatInt(chunkStart, 10))
chunkParams.Set("end", strconv.FormatInt(chunkEnd, 10))
chunkParams.Set("limit", strconv.Itoa(maxLimitValue))
chunkParams.Set("limit", strconv.Itoa(chunkLimit))

resp, err := p.coldRouter.ColdPost(ctx, "/select/logsql/query", chunkParams)
if err != nil {
Expand All @@ -158,17 +169,20 @@ func (p *Proxy) coldBackwardChunkedFetch(ctx context.Context, baseParams url.Val
}

chunkCount := countNDJSONLines(chunkBody)
// Prepend this chunk so the accumulation buffer stays in ascending order
// (oldest chunk first, newest chunk last).
accumulated = append(chunkBody, accumulated...)
chunks = append(chunks, chunkBody)
accCount += chunkCount

if accCount >= limit {
break // have enough rows — older chunks cannot contribute to newest N
}
chunkEnd = chunkStart
}
return accumulated, nil

// chunks is newest-to-oldest; reverse so the joined result is ascending (oldest first).
for i, j := 0, len(chunks)-1; i < j; i, j = i+1, j-1 {
chunks[i], chunks[j] = chunks[j], chunks[i]
}
return bytes.Join(chunks, nil), nil
}

func (p *Proxy) proxyLogQueryCold(w http.ResponseWriter, r *http.Request, logsqlQuery string) {
Expand Down
18 changes: 10 additions & 8 deletions internal/proxy/compat_coverage_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@ func TestCompatHelpers_ParseQuantileAndUnwrapErrorName(t *testing.T) {
}

// rate / bytes_rate: sliding window (range != step) always requires manual path.
// When range == step (tumbling window), VL native stats is semantically equivalent,
// including for queries with parser stages.
// Tumbling window (range == step) also requires manual path for parser-stage queries:
// Loki excludes parse-failed lines from metric aggregation; VL native stats counts all.
// Callers that explicitly add "| drop __error__" opt in to VL's count-all semantics;
// that opt-in is checked at the call site (handleStatsCompatRange) before this function.
if !shouldUseManualRangeMetricCompat(`{app="api"} | unpack_json`, "rate", false) {
t.Fatal("expected parser-stage rate to use manual fallback when range != step")
}
if shouldUseManualRangeMetricCompat(`{app="api"} | unpack_json`, "rate", true) {
t.Fatal("expected parser-stage rate to use VL native stats when range == step")
if !shouldUseManualRangeMetricCompat(`{app="api"} | unpack_json`, "rate", true) {
t.Fatal("expected parser-stage rate to use manual fallback even when range == step (no drop-error opt-in)")
}
if !shouldUseManualRangeMetricCompat(`{app="api"}`, "rate", false) {
t.Fatal("expected non-parser rate to use manual fallback when range != step (sliding window)")
Expand Down Expand Up @@ -99,14 +101,14 @@ func TestCompatHelpers_ParseQuantileAndUnwrapErrorName(t *testing.T) {
if !shouldUseManualRangeMetricCompat(`{app="api"} | unpack_json`, "count_over_time", false) {
t.Fatal("expected parser-stage count_over_time to use manual fallback when range != step")
}
if shouldUseManualRangeMetricCompat(`{app="api"} | unpack_json`, "count_over_time", true) {
t.Fatal("expected parser-stage count_over_time to use VL native stats when range == step")
if !shouldUseManualRangeMetricCompat(`{app="api"} | unpack_json`, "count_over_time", true) {
t.Fatal("expected parser-stage count_over_time to use manual fallback even when range == step (no drop-error opt-in)")
}
if !shouldUseManualRangeMetricCompat(`{app="api"} | unpack_json`, "bytes_over_time", false) {
t.Fatal("expected parser-stage bytes_over_time to use manual fallback when range != step")
}
if shouldUseManualRangeMetricCompat(`{app="api"} | unpack_json`, "bytes_over_time", true) {
t.Fatal("expected parser-stage bytes_over_time to use VL native stats when range == step")
if !shouldUseManualRangeMetricCompat(`{app="api"} | unpack_json`, "bytes_over_time", true) {
t.Fatal("expected parser-stage bytes_over_time to use manual fallback even when range == step (no drop-error opt-in)")
}
}

Expand Down
3 changes: 3 additions & 0 deletions internal/proxy/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ func (p *Proxy) handleMultiTenantFanout(w http.ResponseWriter, r *http.Request,
return true
}

// TODO: fanout is serial — each tenant sub-request blocks the next. For high
// fan-out counts (>4 tenants) parallel dispatch would reduce latency. Tracked
// in docs/KNOWN_ISSUES.md under "Multi-tenant serial fanout".
recorders := make([]*httptest.ResponseRecorder, 0, len(filteredTenants))
for _, tenantID := range filteredTenants {
subReq := filteredReq.Clone(filteredReq.Context())
Expand Down
45 changes: 41 additions & 4 deletions internal/proxy/range_metric_compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type originalRangeMetricSpec struct {
UnwrapField string
UnwrapConv string
HasUnwrap bool
BaseQuery string // inner stream selector + pipeline, without range window [T]
}

type rangeMetricSample struct {
Expand Down Expand Up @@ -129,6 +130,7 @@ func parseOriginalRangeMetricSpec(logql string) (originalRangeMetricSpec, bool)
if spec.Window < 0 {
return originalRangeMetricSpec{}, false
}
spec.BaseQuery = strings.TrimSpace(body[:bracketOpen])

unwrap := rangeMetricUnwrapRE.FindStringSubmatch(body)
if len(unwrap) == 2 {
Expand Down Expand Up @@ -290,11 +292,25 @@ func (p *Proxy) handleStatsCompatRange(w http.ResponseWriter, r *http.Request, o
// (e.g. sum(rate({...} | json [w]))). For non-sliding windows (range == step), VL can
// execute them natively — no manual decomposition needed. For sliding windows (range >
// step), fall through to the manual path which implements correct per-step accumulation.
//
// Exception: queries with parser stages (| json, | logfmt, etc.) without an explicit
// "| drop __error__" opt-in must NOT use VL native stats for tumbling windows. Loki
// excludes parse-failed lines from metric aggregation; VL counts all lines. The manual
// path (collectRangeMetricSamples) preserves Loki's error-exclusion semantics.
if strings.Contains(logsqlQuery, "| math ") {
step, stepOk := parsePositiveStepDuration(r.FormValue("step"))
origSpec, hasOrigSpec := parseOriginalRangeMetricSpec(originalLogql)
if stepOk && hasOrigSpec && origSpec.Window > 0 && origSpec.Window <= step {
return false
spec, specOk := parseStatsCompatSpec(logsqlQuery)
// Parser stages without an explicit drop-error opt-in require the manual path
// to preserve Loki's error-exclusion semantics. Use origSpec.BaseQuery (the inner
// LogQL stream selector + pipeline without the range window) for the drop-error
// check — hasDropErrorOnlyPostParserStage requires the pipeline without outer
// aggregation or range window brackets.
if !specOk || !queryUsesParserStages(spec.BaseQuery) || hasDropErrorOnlyPostParserStage(origSpec.BaseQuery) {
return false
}
// Parser stage without drop-error — fall through to the manual path below.
}
}
spec, ok := parseStatsCompatSpec(logsqlQuery)
Expand All @@ -319,6 +335,13 @@ func (p *Proxy) handleStatsCompatRange(w http.ResponseWriter, r *http.Request, o
// semantically equivalent for VL native stats — only range > step produces
// overlapping sliding windows where VL tumbling-bucket stats diverges from LogQL.
noSlidingOverlap := step > 0 && origSpec.Window > 0 && origSpec.Window <= step
// For tumbling windows, an explicit "| drop __error__" in the original LogQL opts in to
// VL's count-all semantics (parse failures counted). Use origSpec.BaseQuery — the inner
// pipeline without outer aggregation or range brackets — so hasDropErrorOnlyPostParserStage
// can correctly identify the drop-error clause.
if noSlidingOverlap && queryUsesParserStages(spec.BaseQuery) && hasOrigSpec && hasDropErrorOnlyPostParserStage(origSpec.BaseQuery) {
return false
}
if !shouldUseManualRangeMetricCompat(spec.BaseQuery, manualFunc, noSlidingOverlap) {
return false
}
Expand Down Expand Up @@ -388,6 +411,16 @@ func shouldUseManualRangeMetricCompat(baseQuery, manualFunc string, rangeEqualsS
// natively supports inline filter pipelines and parser stages.
switch manualFunc {
case "rate", "bytes_rate", "count_over_time", "bytes_over_time":
// Parser stages require the slow log-fetch path: Loki excludes parse-failed lines
// from metric aggregation while VL native stats counts all lines. The drop-error
// opt-in check (hasDropErrorOnlyPostParserStage) only works on LogQL syntax; the
// baseQuery here is VL-translated syntax (| unpack_json etc.), so we conservatively
// route all parser-stage queries to the manual path. Bare parser metric queries
// (proxyBareParserMetricQueryRange) handle the drop-error fast path for the
// ungrouped case where the original LogQL is available.
if queryUsesParserStages(baseQuery) {
return true
}
return !rangeEqualsStep
}

Expand Down Expand Up @@ -444,7 +477,10 @@ func (p *Proxy) proxyManualRangeMetricRange(w http.ResponseWriter, r *http.Reque
case "__bytes__":
statsAggFunc = "sum_len(_msg) as c"
}
if statsAggFunc != "" {
// Mirror the parser-stage guard from shouldUseManualRangeMetricCompat: if the query
// uses parser stages, skip the stats_query_range fast path to preserve Loki's
// parse-error exclusion semantics (Loki excludes parse-failed lines; VL counts all).
if statsAggFunc != "" && !queryUsesParserStages(spec.BaseQuery) {
hasStreamSentinel := false
for _, g := range spec.GroupBy {
if g == "_stream" {
Expand Down Expand Up @@ -576,11 +612,12 @@ func (p *Proxy) collectRangeMetricHits(
defer resp.Body.Close()

if resp.StatusCode >= 400 {
body, _ := io.ReadAll(resp.Body)
body, _ := readBodyLimited(resp.Body, maxUpstreamErrorBodyBytes)
return nil, fmt.Errorf("stats_query_range backend %d: %s", resp.StatusCode, strings.TrimSpace(string(body)))
}

body, err := io.ReadAll(resp.Body)
const maxStatsResponseBytes = 64 << 20 // 64 MB
body, err := readBodyLimited(resp.Body, maxStatsResponseBytes)
if err != nil {
return nil, err
}
Expand Down
Loading