diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 732324fec1f..16dce97111a 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -311,6 +311,18 @@ querier: # [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL. # CLI flag: -querier.parquet-queryable-shard-cache-ttl [parquet_queryable_shard_cache_ttl: | default = 24h] + + # [Experimental] If true, parquet queryable will honor projection hints and + # only materialize requested labels. Projection is only applied when all + # queried blocks are parquet blocks and not querying ingesters. + # CLI flag: -querier.parquet-queryable-honor-projection-hints + [parquet_queryable_honor_projection_hints: | default = false] + + # [Experimental] Time buffer to use when checking if query overlaps with + # ingester data. Projection hints are disabled if query time range overlaps + # with (now - query-ingesters-within - buffer). + # CLI flag: -querier.parquet-queryable-projection-hints-ingester-buffer + [parquet_queryable_projection_hints_ingester_buffer: | default = 1h] ``` ### `blocks_storage_config` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 58d1c89ebb9..13204941098 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4828,6 +4828,18 @@ thanos_engine: # [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL. # CLI flag: -querier.parquet-queryable-shard-cache-ttl [parquet_queryable_shard_cache_ttl: | default = 24h] + +# [Experimental] If true, parquet queryable will honor projection hints and only +# materialize requested labels. Projection is only applied when all queried +# blocks are parquet blocks and not querying ingesters. +# CLI flag: -querier.parquet-queryable-honor-projection-hints +[parquet_queryable_honor_projection_hints: | default = false] + +# [Experimental] Time buffer to use when checking if query overlaps with +# ingester data. Projection hints are disabled if query time range overlaps with +# (now - query-ingesters-within - buffer). +# CLI flag: -querier.parquet-queryable-projection-hints-ingester-buffer +[parquet_queryable_projection_hints_ingester_buffer: | default = 1h] ``` ### `query_frontend_config` diff --git a/go.mod b/go.mod index a5f5e66dee1..9a26035ca69 100644 --- a/go.mod +++ b/go.mod @@ -87,7 +87,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/oklog/ulid/v2 v2.1.1 github.com/parquet-go/parquet-go v0.25.1 - github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71 + github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94 github.com/prometheus/client_golang/exp v0.0.0-20250914183048-a974e0d45e0a github.com/prometheus/procfs v0.16.1 github.com/sercand/kuberesolver/v5 v5.1.1 diff --git a/go.sum b/go.sum index e8d973c9536..8dab5b15dfe 100644 --- a/go.sum +++ b/go.sum @@ -1634,8 +1634,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71 h1:BwrzRNGy0GbnBA7rQd85G6NuFvydvwTXxRB9XiA5TXk= -github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71/go.mod h1:gewN7ZuOXJh0X2I57iGHyDLbLvL891P2Ynko2QM5axY= +github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94 h1:6WmPxbqGMjBKLOZvurIZR5eEBF0Rd0t1oQ06PMWaHe8= +github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94/go.mod h1:gewN7ZuOXJh0X2I57iGHyDLbLvL891P2Ynko2QM5axY= github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4= github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis= github.com/prometheus/alertmanager v0.29.0 h1:/ET4NmAGx2Dv9kStrXIBqBgHyiSgIk4OetY+hoZRfgc= diff --git a/integration/parquet_querier_test.go b/integration/parquet_querier_test.go index f4bb5a6c60d..b7ad18f6d68 100644 --- a/integration/parquet_querier_test.go +++ b/integration/parquet_querier_test.go @@ -7,11 +7,13 @@ import ( "fmt" "math/rand" "path/filepath" + "slices" "strconv" "testing" "time" "github.com/cortexproject/promqlsmith" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" @@ -23,7 +25,9 @@ import ( e2edb "github.com/cortexproject/cortex/integration/e2e/db" "github.com/cortexproject/cortex/integration/e2ecortex" "github.com/cortexproject/cortex/pkg/storage/bucket" + cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet" "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/util/log" cortex_testutil "github.com/cortexproject/cortex/pkg/util/test" ) @@ -176,3 +180,213 @@ func TestParquetFuzz(t *testing.T) { require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers( labels.MustNewMatcher(labels.MatchEqual, "type", "parquet")))) } + +func TestParquetProjectionPushdownFuzz(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + consul := e2edb.NewConsulWithName("consul") + memcached := e2ecache.NewMemcached() + require.NoError(t, s.StartAndWaitReady(consul, memcached)) + + baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags()) + flags := mergeFlags( + baseFlags, + map[string]string{ + "-target": "all,parquet-converter", + "-blocks-storage.tsdb.block-ranges-period": "1m,24h", + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.bucket-store.sync-interval": "1s", + "-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl": "1s", + "-blocks-storage.bucket-store.bucket-index.idle-timeout": "1s", + "-blocks-storage.bucket-store.bucket-index.enabled": "true", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + "-querier.query-store-for-labels-enabled": "true", + // compactor + "-compactor.cleanup-interval": "1s", + // Ingester. + "-ring.store": "consul", + "-consul.hostname": consul.NetworkHTTPEndpoint(), + // Distributor. + "-distributor.replication-factor": "1", + // Store-gateway. + "-store-gateway.sharding-enabled": "false", + "--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + // parquet-converter + "-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(), + "-parquet-converter.conversion-interval": "1s", + "-parquet-converter.enabled": "true", + // Querier - Enable Thanos engine with projection optimizer + "-querier.thanos-engine": "true", + "-querier.optimizers": "propagate-matchers,sort-matchers,merge-selects,detect-histogram-stats,projection", // Enable all optimizers including projection + "-querier.enable-parquet-queryable": "true", + "-querier.parquet-queryable-honor-projection-hints": "true", // Honor projection hints + // Set query-ingesters-within to 2h so queries older than 2h don't hit ingesters + // Since test queries are 24-48h old, they won't query ingesters and projection will be enabled + "-querier.query-ingesters-within": "2h", + // Enable cache for parquet labels and chunks + "-blocks-storage.bucket-store.parquet-labels-cache.backend": "inmemory,memcached", + "-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), + "-blocks-storage.bucket-store.chunks-cache.backend": "inmemory,memcached", + "-blocks-storage.bucket-store.chunks-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), + }, + ) + + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + ctx := context.Background() + rnd := rand.New(rand.NewSource(time.Now().Unix())) + dir := filepath.Join(s.SharedDir(), "data") + numSeries := 20 + numSamples := 100 + lbls := make([]labels.Labels, 0, numSeries) + scrapeInterval := time.Minute + statusCodes := []string{"200", "400", "404", "500", "502"} + methods := []string{"GET", "POST", "PUT", "DELETE"} + now := time.Now() + // Make sure query time is old enough to not overlap with ingesters. + start := now.Add(-time.Hour * 72) + end := now.Add(-time.Hour * 48) + + // Create series with multiple labels + for i := range numSeries { + lbls = append(lbls, labels.FromStrings( + labels.MetricName, "http_requests_total", + "job", "api-server", + "instance", fmt.Sprintf("instance-%d", i%5), + "status_code", statusCodes[i%len(statusCodes)], + "method", methods[i%len(methods)], + "path", fmt.Sprintf("/api/v1/endpoint%d", i%3), + "cluster", "test-cluster", + )) + } + + id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10) + require.NoError(t, err) + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio)) + + cortex := e2ecortex.NewSingleBinary("cortex", flags, "") + require.NoError(t, s.StartAndWaitReady(cortex)) + + storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, err) + bkt := storage.GetBucket() + userBucket := bucket.NewUserBucketClient("user-1", bkt, nil) + + err = block.Upload(ctx, log.Logger, userBucket, filepath.Join(dir, id.String()), metadata.NoneFunc) + require.NoError(t, err) + + // Wait until we convert the blocks to parquet AND bucket index is updated + cortex_testutil.Poll(t, 300*time.Second, true, func() interface{} { + // Check if parquet marker exists + markerFound := false + err := userBucket.Iter(context.Background(), "", func(name string) error { + if name == fmt.Sprintf("parquet-markers/%v-parquet-converter-mark.json", id.String()) { + markerFound = true + } + return nil + }, objstore.WithRecursiveIter()) + if err != nil || !markerFound { + return false + } + + // Check if bucket index exists AND contains the parquet block metadata + idx, err := bucketindex.ReadIndex(ctx, bkt, "user-1", nil, log.Logger) + if err != nil { + return false + } + + // Verify the block is in the bucket index with parquet metadata + for _, b := range idx.Blocks { + if b.ID == id && b.Parquet != nil { + require.True(t, b.Parquet.Version == cortex_parquet.CurrentVersion) + return true + } + } + return false + }) + + c, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + testCases := []struct { + name string + query string + expectedLabels []string // Labels that should be present in result + }{ + { + name: "vector selector query should not use projection", + query: `http_requests_total`, + expectedLabels: []string{"__name__", "job", "instance", "status_code", "method", "path", "cluster"}, + }, + { + name: "simple_sum_by_job", + query: `sum by (job) (http_requests_total)`, + expectedLabels: []string{"job"}, + }, + { + name: "rate_with_aggregation", + query: `sum by (method) (rate(http_requests_total[5m]))`, + expectedLabels: []string{"method"}, + }, + { + name: "multiple_grouping_labels", + query: `sum by (job, status_code) (http_requests_total)`, + expectedLabels: []string{"job", "status_code"}, + }, + { + name: "aggregation without query", + query: `sum without (instance, method) (http_requests_total)`, + expectedLabels: []string{"job", "status_code", "path", "cluster"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Logf("Testing: %s", tc.query) + + // Execute instant query + result, err := c.Query(tc.query, end) + require.NoError(t, err) + require.NotNil(t, result) + + // Verify we got results + vector, ok := result.(model.Vector) + require.True(t, ok, "result should be a vector") + require.NotEmpty(t, vector, "query should return results") + + t.Logf("Query returned %d series", len(vector)) + + // Verify projection worked: series should only have the expected labels + for _, sample := range vector { + actualLabels := make(map[string]struct{}) + for label := range sample.Metric { + actualLabels[string(label)] = struct{}{} + } + + // Check that all expected labels are present + for _, expectedLabel := range tc.expectedLabels { + _, ok := actualLabels[expectedLabel] + require.True(t, ok, + "series should have %s label", expectedLabel) + } + + // Check that no unexpected labels are present + for lbl := range actualLabels { + if !slices.Contains(tc.expectedLabels, lbl) { + require.Fail(t, "series should not have unexpected label: %s", lbl) + } + } + } + }) + } + + // Verify that parquet blocks were queried + require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "type", "parquet")))) +} diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 40be5ff8997..78ba15b350a 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -28,6 +28,7 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/querysharding" "github.com/cortexproject/cortex/pkg/storage/bucket" + cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/util" @@ -96,11 +97,13 @@ func newParquetQueryableFallbackMetrics(reg prometheus.Registerer) *parquetQuery type parquetQueryableWithFallback struct { services.Service - fallbackDisabled bool - queryStoreAfter time.Duration - parquetQueryable storage.Queryable - cache cacheInterface[parquet_storage.ParquetShard] - blockStorageQueryable *BlocksStoreQueryable + fallbackDisabled bool + queryStoreAfter time.Duration + queryIngestersWithin time.Duration + projectionHintsIngesterBuffer time.Duration + parquetQueryable storage.Queryable + cache cacheInterface[parquet_storage.ParquetShard] + blockStorageQueryable *BlocksStoreQueryable finder BlocksFinder @@ -148,6 +151,7 @@ func NewParquetQueryable( } parquetQueryableOpts := []queryable.QueryableOpts{ + queryable.WithHonorProjectionHints(config.ParquetQueryableHonorProjectionHints), queryable.WithRowCountLimitFunc(func(ctx context.Context) int64 { // Ignore error as this shouldn't happen. // If failed to resolve tenant we will just use the default limit value. @@ -253,18 +257,20 @@ func NewParquetQueryable( }, constraintCacheFunc, cDecoder, parquetQueryableOpts...) p := &parquetQueryableWithFallback{ - subservices: manager, - blockStorageQueryable: blockStorageQueryable, - parquetQueryable: parquetQueryable, - cache: cache, - queryStoreAfter: config.QueryStoreAfter, - subservicesWatcher: services.NewFailureWatcher(), - finder: blockStorageQueryable.finder, - metrics: newParquetQueryableFallbackMetrics(reg), - limits: limits, - logger: logger, - defaultBlockStoreType: blockStoreType(config.ParquetQueryableDefaultBlockStore), - fallbackDisabled: config.ParquetQueryableFallbackDisabled, + subservices: manager, + blockStorageQueryable: blockStorageQueryable, + parquetQueryable: parquetQueryable, + cache: cache, + queryStoreAfter: config.QueryStoreAfter, + queryIngestersWithin: config.QueryIngestersWithin, + projectionHintsIngesterBuffer: config.ParquetQueryableProjectionHintsIngesterBuffer, + subservicesWatcher: services.NewFailureWatcher(), + finder: blockStorageQueryable.finder, + metrics: newParquetQueryableFallbackMetrics(reg), + limits: limits, + logger: logger, + defaultBlockStoreType: blockStoreType(config.ParquetQueryableDefaultBlockStore), + fallbackDisabled: config.ParquetQueryableFallbackDisabled, } p.Service = services.NewBasicService(p.starting, p.running, p.stopping) @@ -311,17 +317,19 @@ func (p *parquetQueryableWithFallback) Querier(mint, maxt int64) (storage.Querie } return &parquetQuerierWithFallback{ - minT: mint, - maxT: maxt, - parquetQuerier: pq, - queryStoreAfter: p.queryStoreAfter, - blocksStoreQuerier: bsq, - finder: p.finder, - metrics: p.metrics, - limits: p.limits, - logger: p.logger, - defaultBlockStoreType: p.defaultBlockStoreType, - fallbackDisabled: p.fallbackDisabled, + minT: mint, + maxT: maxt, + parquetQuerier: pq, + queryStoreAfter: p.queryStoreAfter, + queryIngestersWithin: p.queryIngestersWithin, + projectionHintsIngesterBuffer: p.projectionHintsIngesterBuffer, + blocksStoreQuerier: bsq, + finder: p.finder, + metrics: p.metrics, + limits: p.limits, + logger: p.logger, + defaultBlockStoreType: p.defaultBlockStoreType, + fallbackDisabled: p.fallbackDisabled, }, nil } @@ -335,7 +343,9 @@ type parquetQuerierWithFallback struct { // If set, the querier manipulates the max time to not be greater than // "now - queryStoreAfter" so that most recent blocks are not queried. - queryStoreAfter time.Duration + queryStoreAfter time.Duration + queryIngestersWithin time.Duration + projectionHintsIngesterBuffer time.Duration // metrics metrics *parquetQueryableFallbackMetrics @@ -500,6 +510,18 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool sortSeries = true } + queryIngesters := q.queryIngestersWithin == 0 || maxt >= util.TimeToMillis(time.Now().Add(-q.queryIngestersWithin).Add(-q.projectionHintsIngesterBuffer)) + // Only enable projection if ProjectionInclude is true. + disableProjection := len(remaining) > 0 || queryIngesters || !hints.ProjectionInclude || !allParquetBlocksHaveHashColumn(parquet) + // Reset projection hints if: + // - there are mixed blocks (both parquet and non-parquet) + // - the query needs to merge results between ingester and parquet blocks + // - not all parquet blocks have hash column (version < 2) + if disableProjection { + hints.ProjectionLabels = nil + hints.ProjectionInclude = false + } + promises := make([]chan storage.SeriesSet, 0, 2) if len(parquet) > 0 { @@ -599,6 +621,17 @@ func (q *parquetQuerierWithFallback) incrementOpsMetric(method string, remaining } } +// allParquetBlocksHaveHashColumn checks if all parquet blocks have version >= 2, which means they have the hash column. +// Parquet blocks with version 1 don't have the hash column, so projection cannot be enabled for them. +func allParquetBlocksHaveHashColumn(blocks []*bucketindex.Block) bool { + for _, b := range blocks { + if b.Parquet == nil || b.Parquet.Version < cortex_parquet.ParquetConverterMarkVersion2 { + return false + } + } + return true +} + type shardMatcherLabelsFilter struct { shardMatcher *storepb.ShardMatcher } diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 250b1831442..912d5147c55 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -173,7 +173,7 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { } finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT, mock.Anything).Return(bucketindex.Blocks{ - &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, &bucketindex.Block{ID: block2}, }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) @@ -242,8 +242,8 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { } finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything, mock.Anything).Return(bucketindex.Blocks{ - &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, - &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, + &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) t.Run("select", func(t *testing.T) { @@ -316,8 +316,8 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { } finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT, mock.Anything).Return(bucketindex.Blocks{ - &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, - &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, + &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) t.Run("select", func(t *testing.T) { @@ -631,6 +631,316 @@ func (mockParquetQuerier) Close() error { return nil } +func TestSelectProjectionHints(t *testing.T) { + block1 := ulid.MustNew(1, nil) + block2 := ulid.MustNew(2, nil) + now := time.Now() + + tests := map[string]struct { + minT int64 + maxT int64 + queryIngestersWithin time.Duration + projectionHintsIngesterBuffer time.Duration + hasRemainingBlocks bool // Whether there are non-parquet (TSDB) blocks + parquetBlockVersion int // Version of parquet blocks (1 or 2) + mixedVersions bool // If true, block1 is v1, block2 is v2 + inputProjectionLabels []string + inputProjectionInclude bool // Input ProjectionInclude value + expectedProjectionLabels []string // nil means projection disabled + expectedProjectionInclude bool + }{ + "projection enabled: all parquet blocks v2, query older than ingester window": { + minT: util.TimeToMillis(now.Add(-10 * time.Hour)), + maxT: util.TimeToMillis(now.Add(-5 * time.Hour)), + queryIngestersWithin: 2 * time.Hour, + projectionHintsIngesterBuffer: 1 * time.Hour, + hasRemainingBlocks: false, + parquetBlockVersion: parquet.ParquetConverterMarkVersion2, // Version 2 has hash column + inputProjectionLabels: []string{"__name__", "job"}, + inputProjectionInclude: true, + expectedProjectionLabels: []string{"__name__", "job"}, // Preserved + expectedProjectionInclude: true, + }, + "projection disabled: mixed blocks (parquet + TSDB)": { + minT: util.TimeToMillis(now.Add(-10 * time.Hour)), + maxT: util.TimeToMillis(now.Add(-5 * time.Hour)), + queryIngestersWithin: 2 * time.Hour, + projectionHintsIngesterBuffer: 1 * time.Hour, + hasRemainingBlocks: true, // Mixed blocks + parquetBlockVersion: parquet.ParquetConverterMarkVersion2, + inputProjectionLabels: []string{"__name__", "job"}, + inputProjectionInclude: true, + expectedProjectionLabels: nil, // Reset + expectedProjectionInclude: false, + }, + "projection disabled: query overlaps with ingester window": { + minT: util.TimeToMillis(now.Add(-1 * time.Hour)), + maxT: util.TimeToMillis(now.Add(-30 * time.Minute)), + queryIngestersWithin: 2 * time.Hour, + projectionHintsIngesterBuffer: 1 * time.Hour, + hasRemainingBlocks: false, + parquetBlockVersion: parquet.ParquetConverterMarkVersion2, + inputProjectionLabels: []string{"__name__", "job"}, + inputProjectionInclude: true, + expectedProjectionLabels: nil, // Reset (maxT >= now - 2h - 1h = now - 3h) + expectedProjectionInclude: false, + }, + "projection disabled: query within buffer zone": { + minT: util.TimeToMillis(now.Add(-4 * time.Hour)), + maxT: util.TimeToMillis(now.Add(-2*time.Hour - 30*time.Minute)), // Well within buffer + queryIngestersWithin: 2 * time.Hour, + projectionHintsIngesterBuffer: 1 * time.Hour, + hasRemainingBlocks: false, + parquetBlockVersion: parquet.ParquetConverterMarkVersion2, + inputProjectionLabels: []string{"__name__", "job"}, + inputProjectionInclude: true, + expectedProjectionLabels: nil, // Reset (maxT = now - 2.5h, threshold = now - 3h, so 2.5h > 3h means disabled) + expectedProjectionInclude: false, + }, + "projection disabled: queryIngestersWithin is 0 (always query ingesters)": { + minT: util.TimeToMillis(now.Add(-10 * time.Hour)), + maxT: util.TimeToMillis(now.Add(-5 * time.Hour)), + queryIngestersWithin: 0, // Always query ingesters + projectionHintsIngesterBuffer: 1 * time.Hour, + hasRemainingBlocks: false, + parquetBlockVersion: parquet.ParquetConverterMarkVersion2, + inputProjectionLabels: []string{"__name__", "job"}, + inputProjectionInclude: true, + expectedProjectionLabels: nil, // Reset + expectedProjectionInclude: false, + }, + "projection enabled: query just outside ingester window with buffer": { + minT: util.TimeToMillis(now.Add(-10 * time.Hour)), + maxT: util.TimeToMillis(now.Add(-3*time.Hour - 1*time.Minute)), // Just before threshold + queryIngestersWithin: 2 * time.Hour, + projectionHintsIngesterBuffer: 1 * time.Hour, + hasRemainingBlocks: false, + parquetBlockVersion: parquet.ParquetConverterMarkVersion2, + inputProjectionLabels: []string{"__name__", "job"}, + inputProjectionInclude: true, + expectedProjectionLabels: []string{"__name__", "job"}, // Preserved + expectedProjectionInclude: true, + }, + "projection enabled: no buffer, query outside ingester window": { + minT: util.TimeToMillis(now.Add(-10 * time.Hour)), + maxT: util.TimeToMillis(now.Add(-2*time.Hour - 1*time.Minute)), + queryIngestersWithin: 2 * time.Hour, + projectionHintsIngesterBuffer: 0, // No buffer + hasRemainingBlocks: false, + parquetBlockVersion: parquet.ParquetConverterMarkVersion2, + inputProjectionLabels: []string{"__name__", "job"}, + inputProjectionInclude: true, + expectedProjectionLabels: []string{"__name__", "job"}, // Preserved + expectedProjectionInclude: true, + }, + "projection disabled: recent query (current time)": { + minT: util.TimeToMillis(now.Add(-1 * time.Hour)), + maxT: util.TimeToMillis(now), // Right now + queryIngestersWithin: 2 * time.Hour, + projectionHintsIngesterBuffer: 1 * time.Hour, + hasRemainingBlocks: false, + parquetBlockVersion: parquet.ParquetConverterMarkVersion2, + inputProjectionLabels: []string{"__name__", "job"}, + inputProjectionInclude: true, + expectedProjectionLabels: nil, // Reset + expectedProjectionInclude: false, + }, + "projection disabled: ProjectionInclude is false, disable projection": { + minT: util.TimeToMillis(now.Add(-10 * time.Hour)), + maxT: util.TimeToMillis(now.Add(-5 * time.Hour)), + queryIngestersWithin: 2 * time.Hour, + projectionHintsIngesterBuffer: 1 * time.Hour, + hasRemainingBlocks: false, + parquetBlockVersion: parquet.ParquetConverterMarkVersion2, + inputProjectionLabels: []string{"job"}, + inputProjectionInclude: false, + expectedProjectionLabels: nil, + expectedProjectionInclude: false, + }, + "projection disabled: parquet blocks version 1 (no hash column)": { + minT: util.TimeToMillis(now.Add(-10 * time.Hour)), + maxT: util.TimeToMillis(now.Add(-5 * time.Hour)), + queryIngestersWithin: 2 * time.Hour, + projectionHintsIngesterBuffer: 1 * time.Hour, + hasRemainingBlocks: false, + parquetBlockVersion: parquet.ParquetConverterMarkVersion1, // Version 1 doesn't have hash column + inputProjectionLabels: []string{"__name__", "job"}, + inputProjectionInclude: true, + expectedProjectionLabels: nil, // Reset because version 1 doesn't support projection + expectedProjectionInclude: false, + }, + "projection disabled: mixed parquet block versions (v1 and v2)": { + minT: util.TimeToMillis(now.Add(-10 * time.Hour)), + maxT: util.TimeToMillis(now.Add(-5 * time.Hour)), + queryIngestersWithin: 2 * time.Hour, + projectionHintsIngesterBuffer: 1 * time.Hour, + hasRemainingBlocks: false, + mixedVersions: true, // block1 is v1, block2 is v2 + inputProjectionLabels: []string{"__name__", "job"}, + inputProjectionInclude: true, + expectedProjectionLabels: nil, // Reset because not all blocks support projection + expectedProjectionInclude: false, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "user-1") + finder := &blocksFinderMock{} + + // Setup blocks + var blocks bucketindex.Blocks + if testData.hasRemainingBlocks { + // Mixed: one parquet, one TSDB + blocks = bucketindex.Blocks{ + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: testData.parquetBlockVersion}}, + &bucketindex.Block{ID: block2}, // No parquet metadata = TSDB block + } + } else if testData.mixedVersions { + // Mixed parquet versions: block1 is v1, block2 is v2 + blocks = bucketindex.Blocks{ + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, + &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion2}}, + } + } else { + // All parquet with same version + blocks = bucketindex.Blocks{ + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: testData.parquetBlockVersion}}, + &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: testData.parquetBlockVersion}}, + } + } + finder.On("GetBlocks", mock.Anything, "user-1", testData.minT, mock.Anything, mock.Anything).Return(blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark{}, nil) + + // Mock TSDB querier (for remaining blocks) + mockTSDBQuerier := &mockParquetQuerier{} + + // Mock parquet querier (captures hints) + mockParquetQuerierInstance := &mockParquetQuerier{} + + // Create the parquetQuerierWithFallback + pq := &parquetQuerierWithFallback{ + minT: testData.minT, + maxT: testData.maxT, + finder: finder, + blocksStoreQuerier: mockTSDBQuerier, + parquetQuerier: mockParquetQuerierInstance, + queryIngestersWithin: testData.queryIngestersWithin, + projectionHintsIngesterBuffer: testData.projectionHintsIngesterBuffer, + queryStoreAfter: 0, // Disable queryStoreAfter manipulation + metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), + limits: defaultOverrides(t, 0), + logger: log.NewNopLogger(), + defaultBlockStoreType: parquetBlockStore, + fallbackDisabled: false, + } + + matchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test_metric"), + } + + // Create input hints with projection + inputHints := &storage.SelectHints{ + Start: testData.minT, + End: testData.maxT, + ProjectionLabels: testData.inputProjectionLabels, + ProjectionInclude: testData.inputProjectionInclude, + } + + // Execute Select + set := pq.Select(ctx, false, inputHints, matchers...) + require.NotNil(t, set) + + // Verify the hints passed to the parquet querier + if !testData.hasRemainingBlocks { + // If all parquet blocks, verify hints passed to parquet querier + require.NotNil(t, mockParquetQuerierInstance.queriedHints, "parquet querier should have been called") + require.Equal(t, testData.expectedProjectionLabels, mockParquetQuerierInstance.queriedHints.ProjectionLabels, + "projection labels mismatch") + require.Equal(t, testData.expectedProjectionInclude, mockParquetQuerierInstance.queriedHints.ProjectionInclude, + "projection include flag mismatch") + } + }) + } +} + +func TestAllParquetBlocksHaveHashColumn(t *testing.T) { + block1 := ulid.MustNew(1, nil) + block2 := ulid.MustNew(2, nil) + block3 := ulid.MustNew(3, nil) + + tests := map[string]struct { + blocks []*bucketindex.Block + expected bool + }{ + "all blocks v2": { + blocks: []*bucketindex.Block{ + {ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion2}}, + {ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion2}}, + }, + expected: true, + }, + "all blocks v1": { + blocks: []*bucketindex.Block{ + {ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, + {ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, + }, + expected: false, + }, + "mixed versions v1 and v2": { + blocks: []*bucketindex.Block{ + {ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, + {ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion2}}, + }, + expected: false, + }, + "one block nil parquet metadata": { + blocks: []*bucketindex.Block{ + {ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion2}}, + {ID: block2, Parquet: nil}, + }, + expected: false, + }, + "all blocks nil parquet metadata": { + blocks: []*bucketindex.Block{ + {ID: block1, Parquet: nil}, + {ID: block2, Parquet: nil}, + }, + expected: false, + }, + "single block v2": { + blocks: []*bucketindex.Block{ + {ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion2}}, + }, + expected: true, + }, + "single block v1": { + blocks: []*bucketindex.Block{ + {ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, + }, + expected: false, + }, + "empty blocks": { + blocks: []*bucketindex.Block{}, + expected: true, // No blocks with version < 2, so return true + }, + "all blocks v3 or higher": { + blocks: []*bucketindex.Block{ + {ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 3}}, + {ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 4}}, + {ID: block3, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion2}}, + }, + expected: true, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + result := allParquetBlocksHaveHashColumn(testData.blocks) + require.Equal(t, testData.expected, result, "unexpected result for %s", testName) + }) + } +} + func TestMaterializedLabelsFilterCallback(t *testing.T) { tests := []struct { name string @@ -798,7 +1108,7 @@ func TestParquetQueryableFallbackDisabled(t *testing.T) { // Set up blocks where block1 has parquet metadata but block2 doesn't finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything, mock.Anything).Return(bucketindex.Blocks{ - &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, // Available as parquet + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, // Available as parquet &bucketindex.Block{ID: block2}, // Not available as parquet }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) @@ -857,8 +1167,8 @@ func TestParquetQueryableFallbackDisabled(t *testing.T) { // Set up blocks where both blocks have parquet metadata finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything, mock.Anything).Return(bucketindex.Blocks{ - &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, // Available as parquet - &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, // Available as parquet + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, // Available as parquet + &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, // Available as parquet }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) t.Run("select should work without error", func(t *testing.T) { diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index a6912ea024a..0de2c714101 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -92,11 +92,13 @@ type Config struct { EnablePromQLExperimentalFunctions bool `yaml:"enable_promql_experimental_functions"` // Query Parquet files if available - EnableParquetQueryable bool `yaml:"enable_parquet_queryable"` - ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"` - ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"` - ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"` - ParquetQueryableShardCacheTTL time.Duration `yaml:"parquet_queryable_shard_cache_ttl"` + EnableParquetQueryable bool `yaml:"enable_parquet_queryable"` + ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"` + ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"` + ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"` + ParquetQueryableShardCacheTTL time.Duration `yaml:"parquet_queryable_shard_cache_ttl"` + ParquetQueryableHonorProjectionHints bool `yaml:"parquet_queryable_honor_projection_hints"` + ParquetQueryableProjectionHintsIngesterBuffer time.Duration `yaml:"parquet_queryable_projection_hints_ingester_buffer"` DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"` } @@ -150,6 +152,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.ParquetQueryableShardCacheSize, "querier.parquet-queryable-shard-cache-size", 512, "[Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.") f.DurationVar(&cfg.ParquetQueryableShardCacheTTL, "querier.parquet-queryable-shard-cache-ttl", 24*time.Hour, "[Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.") f.StringVar(&cfg.ParquetQueryableDefaultBlockStore, "querier.parquet-queryable-default-block-store", string(parquetBlockStore), "[Experimental] Parquet queryable's default block store to query. Valid options are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback to store gateway.") + f.BoolVar(&cfg.ParquetQueryableHonorProjectionHints, "querier.parquet-queryable-honor-projection-hints", false, "[Experimental] If true, parquet queryable will honor projection hints and only materialize requested labels. Projection is only applied when all queried blocks are parquet blocks and not querying ingesters.") + f.DurationVar(&cfg.ParquetQueryableProjectionHintsIngesterBuffer, "querier.parquet-queryable-projection-hints-ingester-buffer", time.Hour, "[Experimental] Time buffer to use when checking if query overlaps with ingester data. Projection hints are disabled if query time range overlaps with (now - query-ingesters-within - buffer).") f.BoolVar(&cfg.DistributedExecEnabled, "querier.distributed-exec-enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.") f.BoolVar(&cfg.ParquetQueryableFallbackDisabled, "querier.parquet-queryable-fallback-disabled", false, "[Experimental] Disable Parquet queryable to fallback queries to Store Gateway if the block is not available as Parquet files but available in TSDB. Setting this to true will disable the fallback and users can remove Store Gateway. But need to make sure Parquet files are created before it is queryable.") } diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 66fbc2d76db..c9315ad43fe 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -5840,6 +5840,19 @@ "type": "boolean", "x-cli-flag": "querier.parquet-queryable-fallback-disabled" }, + "parquet_queryable_honor_projection_hints": { + "default": false, + "description": "[Experimental] If true, parquet queryable will honor projection hints and only materialize requested labels. Projection is only applied when all queried blocks are parquet blocks and not querying ingesters.", + "type": "boolean", + "x-cli-flag": "querier.parquet-queryable-honor-projection-hints" + }, + "parquet_queryable_projection_hints_ingester_buffer": { + "default": "1h0m0s", + "description": "[Experimental] Time buffer to use when checking if query overlaps with ingester data. Projection hints are disabled if query time range overlaps with (now - query-ingesters-within - buffer).", + "type": "string", + "x-cli-flag": "querier.parquet-queryable-projection-hints-ingester-buffer", + "x-format": "duration" + }, "parquet_queryable_shard_cache_size": { "default": 512, "description": "[Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.", diff --git a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go index 24daad7ef26..01c854275cf 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go @@ -467,7 +467,8 @@ func (m *Materializer) MaterializeLabels(ctx context.Context, hints *prom_storag break } } - needsHash = !seriesHashExcluded + // Hash column is only needed if projection labels are provided and series hash is not excluded. + needsHash = !seriesHashExcluded && len(hints.ProjectionLabels) > 0 for _, labelName := range hints.ProjectionLabels { if labelName == schema.SeriesHashColumn { diff --git a/vendor/modules.txt b/vendor/modules.txt index d4aecb1b3e7..91fa9c2d44c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -952,7 +952,7 @@ github.com/planetscale/vtprotobuf/types/known/wrapperspb # github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 ## explicit github.com/pmezard/go-difflib/difflib -# github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71 +# github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94 ## explicit; go 1.24.0 github.com/prometheus-community/parquet-common/convert github.com/prometheus-community/parquet-common/queryable