Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,18 @@ querier:
# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.
# CLI flag: -querier.parquet-queryable-shard-cache-ttl
[parquet_queryable_shard_cache_ttl: <duration> | default = 24h]

# [Experimental] If true, parquet queryable will honor projection hints and
# only materialize requested labels. Projection is only applied when all
# queried blocks are parquet blocks and not querying ingesters.
# CLI flag: -querier.parquet-queryable-honor-projection-hints
[parquet_queryable_honor_projection_hints: <boolean> | default = false]

# [Experimental] Time buffer to use when checking if query overlaps with
# ingester data. Projection hints are disabled if query time range overlaps
# with (now - query-ingesters-within - buffer).
# CLI flag: -querier.parquet-queryable-projection-hints-ingester-buffer
[parquet_queryable_projection_hints_ingester_buffer: <duration> | default = 1h]
```

### `blocks_storage_config`
Expand Down
12 changes: 12 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4828,6 +4828,18 @@ thanos_engine:
# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.
# CLI flag: -querier.parquet-queryable-shard-cache-ttl
[parquet_queryable_shard_cache_ttl: <duration> | default = 24h]

# [Experimental] If true, parquet queryable will honor projection hints and only
# materialize requested labels. Projection is only applied when all queried
# blocks are parquet blocks and not querying ingesters.
# CLI flag: -querier.parquet-queryable-honor-projection-hints
[parquet_queryable_honor_projection_hints: <boolean> | default = false]

# [Experimental] Time buffer to use when checking if query overlaps with
# ingester data. Projection hints are disabled if query time range overlaps with
# (now - query-ingesters-within - buffer).
# CLI flag: -querier.parquet-queryable-projection-hints-ingester-buffer
[parquet_queryable_projection_hints_ingester_buffer: <duration> | default = 1h]
```

### `query_frontend_config`
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
github.com/oklog/ulid/v2 v2.1.1
github.com/parquet-go/parquet-go v0.25.1
github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71
github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94
github.com/prometheus/client_golang/exp v0.0.0-20250914183048-a974e0d45e0a
github.com/prometheus/procfs v0.16.1
github.com/sercand/kuberesolver/v5 v5.1.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1634,8 +1634,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71 h1:BwrzRNGy0GbnBA7rQd85G6NuFvydvwTXxRB9XiA5TXk=
github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71/go.mod h1:gewN7ZuOXJh0X2I57iGHyDLbLvL891P2Ynko2QM5axY=
github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94 h1:6WmPxbqGMjBKLOZvurIZR5eEBF0Rd0t1oQ06PMWaHe8=
github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94/go.mod h1:gewN7ZuOXJh0X2I57iGHyDLbLvL891P2Ynko2QM5axY=
github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4=
github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis=
github.com/prometheus/alertmanager v0.29.0 h1:/ET4NmAGx2Dv9kStrXIBqBgHyiSgIk4OetY+hoZRfgc=
Expand Down
213 changes: 213 additions & 0 deletions integration/parquet_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (
"fmt"
"math/rand"
"path/filepath"
"slices"
"strconv"
"testing"
"time"

"github.com/cortexproject/promqlsmith"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
Expand All @@ -24,6 +26,7 @@ import (
"github.com/cortexproject/cortex/integration/e2ecortex"
"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/util/log"
cortex_testutil "github.com/cortexproject/cortex/pkg/util/test"
)
Expand Down Expand Up @@ -176,3 +179,213 @@ func TestParquetFuzz(t *testing.T) {
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "type", "parquet"))))
}

func TestParquetProjectionPushdown(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

consul := e2edb.NewConsulWithName("consul")
memcached := e2ecache.NewMemcached()
require.NoError(t, s.StartAndWaitReady(consul, memcached))

baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
flags := mergeFlags(
baseFlags,
map[string]string{
"-target": "all,parquet-converter",
"-blocks-storage.tsdb.block-ranges-period": "1m,24h",
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl": "1s",
"-blocks-storage.bucket-store.bucket-index.idle-timeout": "1s",
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
"-querier.query-store-for-labels-enabled": "true",
// compactor
"-compactor.cleanup-interval": "1s",
// Ingester.
"-ring.store": "consul",
"-consul.hostname": consul.NetworkHTTPEndpoint(),
// Distributor.
"-distributor.replication-factor": "1",
// Store-gateway.
"-store-gateway.sharding-enabled": "false",
"--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
// parquet-converter
"-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(),
"-parquet-converter.conversion-interval": "1s",
"-parquet-converter.enabled": "true",
// Querier - Enable Thanos engine with projection optimizer
"-querier.thanos-engine": "true",
"-querier.optimizers": "propagate-matchers,sort-matchers,merge-selects,detect-histogram-stats,projection", // Enable all optimizers including projection
"-querier.enable-parquet-queryable": "true",
"-querier.parquet-queryable-honor-projection-hints": "true", // Honor projection hints
// Set query-ingesters-within to 2h so queries older than 2h don't hit ingesters
// Since test queries are 24-48h old, they won't query ingesters and projection will be enabled
"-querier.query-ingesters-within": "2h",
// Enable cache for parquet labels and chunks
"-blocks-storage.bucket-store.parquet-labels-cache.backend": "inmemory,memcached",
"-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
"-blocks-storage.bucket-store.chunks-cache.backend": "inmemory,memcached",
"-blocks-storage.bucket-store.chunks-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
},
)

// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

ctx := context.Background()
rnd := rand.New(rand.NewSource(time.Now().Unix()))
dir := filepath.Join(s.SharedDir(), "data")
numSeries := 20
numSamples := 100
lbls := make([]labels.Labels, 0, numSeries)
scrapeInterval := time.Minute
statusCodes := []string{"200", "400", "404", "500", "502"}
methods := []string{"GET", "POST", "PUT", "DELETE"}
now := time.Now()
// Make sure query time is old enough to not overlap with ingesters
// With query-ingesters-within=2h, queries with maxT < now-2h won't hit ingesters
// Using 24h-48h ago ensures no ingester overlap, allowing projection to be enabled
start := now.Add(-time.Hour * 48)
end := now.Add(-time.Hour * 24)

// Create series with multiple labels
for i := 0; i < numSeries; i++ {
lbls = append(lbls, labels.FromStrings(
labels.MetricName, "http_requests_total",
"job", "api-server",
"instance", fmt.Sprintf("instance-%d", i%5),
"status_code", statusCodes[i%len(statusCodes)],
"method", methods[i%len(methods)],
"path", fmt.Sprintf("/api/v1/endpoint%d", i%3),
"cluster", "test-cluster",
))
}

id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10)
require.NoError(t, err)
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
require.NoError(t, s.StartAndWaitReady(cortex))

storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, err)
bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil)

err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc)
require.NoError(t, err)

// Wait until we convert the blocks to parquet AND bucket index is updated
cortex_testutil.Poll(t, 30*time.Second, true, func() interface{} {
// Check if parquet marker exists
markerFound := false
err := bkt.Iter(context.Background(), "", func(name string) error {
if name == fmt.Sprintf("parquet-markers/%v-parquet-converter-mark.json", id.String()) {
markerFound = true
}
return nil
}, objstore.WithRecursiveIter())
if err != nil || !markerFound {
return false
}

// Check if bucket index exists AND contains the parquet block metadata
idx, err := bucketindex.ReadIndex(ctx, bkt, "user-1", nil, log.Logger)
if err != nil {
return false
}

// Verify the block is in the bucket index with parquet metadata
for _, b := range idx.Blocks {
if b.ID == id && b.Parquet != nil {
return true
}
}
return false
})

c, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

testCases := []struct {
name string
query string
expectedLabels []string // Labels that should be present in result
}{
{
name: "vector selector query should not use projection",
query: `http_requests_total`,
expectedLabels: []string{"__name__", "job", "instance", "status_code", "method", "path", "cluster"},
},
{
name: "simple_sum_by_job",
query: `sum by (job) (http_requests_total)`,
expectedLabels: []string{"job"},
},
{
name: "rate_with_aggregation",
query: `sum by (method) (rate(http_requests_total[5m]))`,
expectedLabels: []string{"method"},
},
{
name: "multiple_grouping_labels",
query: `sum by (job, status_code) (http_requests_total)`,
expectedLabels: []string{"job", "status_code"},
},
{
name: "aggregation without query",
query: `sum without (instance, method) (http_requests_total)`,
expectedLabels: []string{"job", "status_code", "path", "cluster"},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
t.Logf("Testing: %s", tc.query)

// Execute instant query
result, err := c.Query(tc.query, end)
require.NoError(t, err)
require.NotNil(t, result)

// Verify we got results
vector, ok := result.(model.Vector)
require.True(t, ok, "result should be a vector")
require.NotEmpty(t, vector, "query should return results")

t.Logf("Query returned %d series", len(vector))

// Verify projection worked: series should only have the expected labels
for _, sample := range vector {
actualLabels := make(map[string]struct{})
for _, label := range sample.Metric {
actualLabels[string(label)] = struct{}{}
}

// Check that all expected labels are present
for _, expectedLabel := range tc.expectedLabels {
_, ok := actualLabels[expectedLabel]
require.True(t, ok,
"series should have %s label", expectedLabel)
}

// Check that no unexpected labels are present
for lbl := range actualLabels {
if !slices.Contains(tc.expectedLabels, lbl) {
require.Fail(t, "series should not have unexpected label: %s", lbl)
}
}
}
})
}

// Verify that parquet blocks were queried
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "type", "parquet"))))
}
Loading
Loading