From 4bc04736617502fd3377403b31d7823dfb2cea71 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Tue, 9 Dec 2025 16:57:43 -0800 Subject: [PATCH 1/8] expose projection hints in parquet queryable Signed-off-by: yeya24 --- docs/blocks-storage/querier.md | 12 ++ docs/configuration/config-file-reference.md | 12 ++ integration/parquet_querier_test.go | 189 ++++++++++++++++++++ pkg/querier/parquet_queryable.go | 75 +++++--- pkg/querier/parquet_queryable_test.go | 171 ++++++++++++++++++ pkg/querier/querier.go | 14 +- schemas/cortex-config-schema.json | 13 ++ 7 files changed, 452 insertions(+), 34 deletions(-) diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 732324fec1f..16dce97111a 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -311,6 +311,18 @@ querier: # [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL. # CLI flag: -querier.parquet-queryable-shard-cache-ttl [parquet_queryable_shard_cache_ttl: | default = 24h] + + # [Experimental] If true, parquet queryable will honor projection hints and + # only materialize requested labels. Projection is only applied when all + # queried blocks are parquet blocks and not querying ingesters. + # CLI flag: -querier.parquet-queryable-honor-projection-hints + [parquet_queryable_honor_projection_hints: | default = false] + + # [Experimental] Time buffer to use when checking if query overlaps with + # ingester data. Projection hints are disabled if query time range overlaps + # with (now - query-ingesters-within - buffer). + # CLI flag: -querier.parquet-queryable-projection-hints-ingester-buffer + [parquet_queryable_projection_hints_ingester_buffer: | default = 1h] ``` ### `blocks_storage_config` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 58d1c89ebb9..13204941098 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4828,6 +4828,18 @@ thanos_engine: # [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL. # CLI flag: -querier.parquet-queryable-shard-cache-ttl [parquet_queryable_shard_cache_ttl: | default = 24h] + +# [Experimental] If true, parquet queryable will honor projection hints and only +# materialize requested labels. Projection is only applied when all queried +# blocks are parquet blocks and not querying ingesters. +# CLI flag: -querier.parquet-queryable-honor-projection-hints +[parquet_queryable_honor_projection_hints: | default = false] + +# [Experimental] Time buffer to use when checking if query overlaps with +# ingester data. Projection hints are disabled if query time range overlaps with +# (now - query-ingesters-within - buffer). +# CLI flag: -querier.parquet-queryable-projection-hints-ingester-buffer +[parquet_queryable_projection_hints_ingester_buffer: | default = 1h] ``` ### `query_frontend_config` diff --git a/integration/parquet_querier_test.go b/integration/parquet_querier_test.go index f4bb5a6c60d..ad797b2dbf2 100644 --- a/integration/parquet_querier_test.go +++ b/integration/parquet_querier_test.go @@ -7,12 +7,14 @@ import ( "fmt" "math/rand" "path/filepath" + "slices" "strconv" "testing" "time" "github.com/cortexproject/promqlsmith" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block" @@ -176,3 +178,190 @@ func TestParquetFuzz(t *testing.T) { require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers( labels.MustNewMatcher(labels.MatchEqual, "type", "parquet")))) } + +func TestParquetProjectionPushdown(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + consul := e2edb.NewConsulWithName("consul") + memcached := e2ecache.NewMemcached() + require.NoError(t, s.StartAndWaitReady(consul, memcached)) + + baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags()) + flags := mergeFlags( + baseFlags, + map[string]string{ + "-target": "all,parquet-converter", + "-blocks-storage.tsdb.block-ranges-period": "1m,24h", + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.bucket-store.sync-interval": "1s", + "-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl": "1s", + "-blocks-storage.bucket-store.bucket-index.idle-timeout": "1s", + "-blocks-storage.bucket-store.bucket-index.enabled": "true", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + "-querier.query-store-for-labels-enabled": "true", + // compactor + "-compactor.cleanup-interval": "1s", + // Ingester. + "-ring.store": "consul", + "-consul.hostname": consul.NetworkHTTPEndpoint(), + // Distributor. + "-distributor.replication-factor": "1", + // Store-gateway. + "-store-gateway.sharding-enabled": "false", + "--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + // parquet-converter + "-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(), + "-parquet-converter.conversion-interval": "1s", + "-parquet-converter.enabled": "true", + // Querier - Enable Thanos engine with projection optimizer + "-querier.thanos-engine": "true", + "-querier.optimizers": "default,projection", // Enable projection optimizer + "-querier.enable-parquet-queryable": "true", + "-querier.parquet-queryable-honor-projection-hints": "true", // Honor projection hints + // Set query-ingesters-within to 2h so queries older than 2h don't hit ingesters + // Since test queries are 24-48h old, they won't query ingesters and projection will be enabled + "-querier.query-ingesters-within": "2h", + // Enable cache for parquet labels and chunks + "-blocks-storage.bucket-store.parquet-labels-cache.backend": "inmemory,memcached", + "-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), + "-blocks-storage.bucket-store.chunks-cache.backend": "inmemory,memcached", + "-blocks-storage.bucket-store.chunks-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), + }, + ) + + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + ctx := context.Background() + rnd := rand.New(rand.NewSource(time.Now().Unix())) + dir := filepath.Join(s.SharedDir(), "data") + numSeries := 20 + numSamples := 100 + lbls := make([]labels.Labels, 0, numSeries) + scrapeInterval := time.Minute + statusCodes := []string{"200", "400", "404", "500", "502"} + methods := []string{"GET", "POST", "PUT", "DELETE"} + now := time.Now() + // Make sure query time is old enough to not overlap with ingesters + // With query-ingesters-within=2h, queries with maxT < now-2h won't hit ingesters + // Using 24h-48h ago ensures no ingester overlap, allowing projection to be enabled + start := now.Add(-time.Hour * 48) + end := now.Add(-time.Hour * 24) + + // Create series with multiple labels + for i := 0; i < numSeries; i++ { + lbls = append(lbls, labels.FromStrings( + labels.MetricName, "http_requests_total", + "job", "api-server", + "instance", fmt.Sprintf("instance-%d", i%5), + "status_code", statusCodes[i%len(statusCodes)], + "method", methods[i%len(methods)], + "path", fmt.Sprintf("/api/v1/endpoint%d", i%3), + "cluster", "test-cluster", + )) + } + + id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10) + require.NoError(t, err) + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio)) + + cortex := e2ecortex.NewSingleBinary("cortex", flags, "") + require.NoError(t, s.StartAndWaitReady(cortex)) + + storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, err) + bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil) + + err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc) + require.NoError(t, err) + + // Wait until we convert the blocks to parquet + cortex_testutil.Poll(t, 30*time.Second, true, func() interface{} { + found := false + foundBucketIndex := false + + err := bkt.Iter(context.Background(), "", func(name string) error { + if name == fmt.Sprintf("parquet-markers/%v-parquet-converter-mark.json", id.String()) { + found = true + } + if name == "bucket-index.json.gz" { + foundBucketIndex = true + } + return nil + }, objstore.WithRecursiveIter()) + require.NoError(t, err) + return found && foundBucketIndex + }) + + c, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + // Test queries that should use projection hints + testCases := []struct { + name string + query string + expectedLabels []string // Labels that should be present in result (besides __name__) + }{ + { + name: "simple_sum_by_job", + query: `sum by (job) (http_requests_total)`, + expectedLabels: []string{"job"}, + }, + { + name: "rate_with_aggregation", + query: `sum by (method) (rate(http_requests_total[5m]))`, + expectedLabels: []string{"method"}, + }, + { + name: "multiple_grouping_labels", + query: `sum by (job, status_code) (http_requests_total)`, + expectedLabels: []string{"job", "status_code"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Logf("Testing: %s", tc.query) + + // Execute instant query + result, err := c.Query(tc.query, end) + require.NoError(t, err) + require.NotNil(t, result) + + // Verify we got results + matrix := result.(promql.Matrix) + require.NotEmpty(t, matrix, "query should return results") + + t.Logf("Query returned %d series", len(matrix)) + + // Verify projection worked: series should only have the expected labels + for i, series := range matrix { + actualLabels := make(map[string]struct{}) + for _, label := range series.Metric { + actualLabels[label.Name] = struct{}{} + } + + // Check that no unexpected labels are present + for lbl := range actualLabels { + if !slices.Contains(tc.expectedLabels, lbl) { + require.Fail(t, "series should not have %s label", lbl) + } + } + // Check that all expected labels are present + for _, expectedLabel := range tc.expectedLabels { + require.True(t, actualLabels[expectedLabel], + "series should have %s label", expectedLabel) + } + } + }) + } + + // Verify that parquet blocks were queried + require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "type", "parquet")))) +} diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 40be5ff8997..4872eade8d7 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -96,11 +96,13 @@ func newParquetQueryableFallbackMetrics(reg prometheus.Registerer) *parquetQuery type parquetQueryableWithFallback struct { services.Service - fallbackDisabled bool - queryStoreAfter time.Duration - parquetQueryable storage.Queryable - cache cacheInterface[parquet_storage.ParquetShard] - blockStorageQueryable *BlocksStoreQueryable + fallbackDisabled bool + queryStoreAfter time.Duration + queryIngestersWithin time.Duration + projectionHintsIngesterBuffer time.Duration + parquetQueryable storage.Queryable + cache cacheInterface[parquet_storage.ParquetShard] + blockStorageQueryable *BlocksStoreQueryable finder BlocksFinder @@ -148,6 +150,7 @@ func NewParquetQueryable( } parquetQueryableOpts := []queryable.QueryableOpts{ + queryable.WithHonorProjectionHints(config.ParquetQueryableHonorProjectionHints), queryable.WithRowCountLimitFunc(func(ctx context.Context) int64 { // Ignore error as this shouldn't happen. // If failed to resolve tenant we will just use the default limit value. @@ -253,18 +256,20 @@ func NewParquetQueryable( }, constraintCacheFunc, cDecoder, parquetQueryableOpts...) p := &parquetQueryableWithFallback{ - subservices: manager, - blockStorageQueryable: blockStorageQueryable, - parquetQueryable: parquetQueryable, - cache: cache, - queryStoreAfter: config.QueryStoreAfter, - subservicesWatcher: services.NewFailureWatcher(), - finder: blockStorageQueryable.finder, - metrics: newParquetQueryableFallbackMetrics(reg), - limits: limits, - logger: logger, - defaultBlockStoreType: blockStoreType(config.ParquetQueryableDefaultBlockStore), - fallbackDisabled: config.ParquetQueryableFallbackDisabled, + subservices: manager, + blockStorageQueryable: blockStorageQueryable, + parquetQueryable: parquetQueryable, + cache: cache, + queryStoreAfter: config.QueryStoreAfter, + queryIngestersWithin: config.QueryIngestersWithin, + projectionHintsIngesterBuffer: config.ParquetQueryableProjectionHintsIngesterBuffer, + subservicesWatcher: services.NewFailureWatcher(), + finder: blockStorageQueryable.finder, + metrics: newParquetQueryableFallbackMetrics(reg), + limits: limits, + logger: logger, + defaultBlockStoreType: blockStoreType(config.ParquetQueryableDefaultBlockStore), + fallbackDisabled: config.ParquetQueryableFallbackDisabled, } p.Service = services.NewBasicService(p.starting, p.running, p.stopping) @@ -311,17 +316,19 @@ func (p *parquetQueryableWithFallback) Querier(mint, maxt int64) (storage.Querie } return &parquetQuerierWithFallback{ - minT: mint, - maxT: maxt, - parquetQuerier: pq, - queryStoreAfter: p.queryStoreAfter, - blocksStoreQuerier: bsq, - finder: p.finder, - metrics: p.metrics, - limits: p.limits, - logger: p.logger, - defaultBlockStoreType: p.defaultBlockStoreType, - fallbackDisabled: p.fallbackDisabled, + minT: mint, + maxT: maxt, + parquetQuerier: pq, + queryStoreAfter: p.queryStoreAfter, + queryIngestersWithin: p.queryIngestersWithin, + projectionHintsIngesterBuffer: p.projectionHintsIngesterBuffer, + blocksStoreQuerier: bsq, + finder: p.finder, + metrics: p.metrics, + limits: p.limits, + logger: p.logger, + defaultBlockStoreType: p.defaultBlockStoreType, + fallbackDisabled: p.fallbackDisabled, }, nil } @@ -335,7 +342,9 @@ type parquetQuerierWithFallback struct { // If set, the querier manipulates the max time to not be greater than // "now - queryStoreAfter" so that most recent blocks are not queried. - queryStoreAfter time.Duration + queryStoreAfter time.Duration + queryIngestersWithin time.Duration + projectionHintsIngesterBuffer time.Duration // metrics metrics *parquetQueryableFallbackMetrics @@ -500,6 +509,14 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool sortSeries = true } + queryIngesters := q.queryIngestersWithin == 0 || maxt >= util.TimeToMillis(time.Now().Add(-q.queryIngestersWithin).Add(-q.projectionHintsIngesterBuffer)) + disableProjection := len(remaining) > 0 || queryIngesters + // Reset projection hints if there are mixed blocks (both parquet and non-parquet) or the query needs to merge results between ingester and parquet blocks + if disableProjection { + hints.ProjectionLabels = nil + hints.ProjectionInclude = false + } + promises := make([]chan storage.SeriesSet, 0, 2) if len(parquet) > 0 { diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 250b1831442..fdea54b3013 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -631,6 +631,177 @@ func (mockParquetQuerier) Close() error { return nil } +func TestSelectProjectionHints(t *testing.T) { + block1 := ulid.MustNew(1, nil) + block2 := ulid.MustNew(2, nil) + now := time.Now() + + tests := map[string]struct { + minT int64 + maxT int64 + queryIngestersWithin time.Duration + projectionHintsIngesterBuffer time.Duration + hasRemainingBlocks bool // Whether there are non-parquet (TSDB) blocks + inputProjectionLabels []string + expectedProjectionLabels []string // nil means projection disabled + expectedProjectionInclude bool + }{ + "projection enabled: all parquet blocks, query older than ingester window": { + minT: util.TimeToMillis(now.Add(-10 * time.Hour)), + maxT: util.TimeToMillis(now.Add(-5 * time.Hour)), + queryIngestersWithin: 2 * time.Hour, + projectionHintsIngesterBuffer: 1 * time.Hour, + hasRemainingBlocks: false, + inputProjectionLabels: []string{"__name__", "job"}, + expectedProjectionLabels: []string{"__name__", "job"}, // Preserved + expectedProjectionInclude: true, + }, + "projection disabled: mixed blocks (parquet + TSDB)": { + minT: util.TimeToMillis(now.Add(-10 * time.Hour)), + maxT: util.TimeToMillis(now.Add(-5 * time.Hour)), + queryIngestersWithin: 2 * time.Hour, + projectionHintsIngesterBuffer: 1 * time.Hour, + hasRemainingBlocks: true, // Mixed blocks + inputProjectionLabels: []string{"__name__", "job"}, + expectedProjectionLabels: nil, // Reset + expectedProjectionInclude: false, + }, + "projection disabled: query overlaps with ingester window": { + minT: util.TimeToMillis(now.Add(-1 * time.Hour)), + maxT: util.TimeToMillis(now.Add(-30 * time.Minute)), + queryIngestersWithin: 2 * time.Hour, + projectionHintsIngesterBuffer: 1 * time.Hour, + hasRemainingBlocks: false, + inputProjectionLabels: []string{"__name__", "job"}, + expectedProjectionLabels: nil, // Reset (maxT >= now - 2h - 1h = now - 3h) + expectedProjectionInclude: false, + }, + "projection disabled: query within buffer zone": { + minT: util.TimeToMillis(now.Add(-4 * time.Hour)), + maxT: util.TimeToMillis(now.Add(-2*time.Hour - 30*time.Minute)), // Well within buffer + queryIngestersWithin: 2 * time.Hour, + projectionHintsIngesterBuffer: 1 * time.Hour, + hasRemainingBlocks: false, + inputProjectionLabels: []string{"__name__", "job"}, + expectedProjectionLabels: nil, // Reset (maxT = now - 2.5h, threshold = now - 3h, so 2.5h > 3h means disabled) + expectedProjectionInclude: false, + }, + "projection disabled: queryIngestersWithin is 0 (always query ingesters)": { + minT: util.TimeToMillis(now.Add(-10 * time.Hour)), + maxT: util.TimeToMillis(now.Add(-5 * time.Hour)), + queryIngestersWithin: 0, // Always query ingesters + projectionHintsIngesterBuffer: 1 * time.Hour, + hasRemainingBlocks: false, + inputProjectionLabels: []string{"__name__", "job"}, + expectedProjectionLabels: nil, // Reset + expectedProjectionInclude: false, + }, + "projection enabled: query just outside ingester window with buffer": { + minT: util.TimeToMillis(now.Add(-10 * time.Hour)), + maxT: util.TimeToMillis(now.Add(-3*time.Hour - 1*time.Minute)), // Just before threshold + queryIngestersWithin: 2 * time.Hour, + projectionHintsIngesterBuffer: 1 * time.Hour, + hasRemainingBlocks: false, + inputProjectionLabels: []string{"__name__", "job"}, + expectedProjectionLabels: []string{"__name__", "job"}, // Preserved + expectedProjectionInclude: true, + }, + "projection enabled: no buffer, query outside ingester window": { + minT: util.TimeToMillis(now.Add(-10 * time.Hour)), + maxT: util.TimeToMillis(now.Add(-2*time.Hour - 1*time.Minute)), + queryIngestersWithin: 2 * time.Hour, + projectionHintsIngesterBuffer: 0, // No buffer + hasRemainingBlocks: false, + inputProjectionLabels: []string{"__name__", "job"}, + expectedProjectionLabels: []string{"__name__", "job"}, // Preserved + expectedProjectionInclude: true, + }, + "projection disabled: recent query (current time)": { + minT: util.TimeToMillis(now.Add(-1 * time.Hour)), + maxT: util.TimeToMillis(now), // Right now + queryIngestersWithin: 2 * time.Hour, + projectionHintsIngesterBuffer: 1 * time.Hour, + hasRemainingBlocks: false, + inputProjectionLabels: []string{"__name__", "job"}, + expectedProjectionLabels: nil, // Reset + expectedProjectionInclude: false, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "user-1") + finder := &blocksFinderMock{} + + // Setup blocks + var blocks bucketindex.Blocks + if testData.hasRemainingBlocks { + // Mixed: one parquet, one TSDB + blocks = bucketindex.Blocks{ + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + &bucketindex.Block{ID: block2}, // No parquet metadata = TSDB block + } + } else { + // All parquet + blocks = bucketindex.Blocks{ + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + } + } + finder.On("GetBlocks", mock.Anything, "user-1", testData.minT, mock.Anything, mock.Anything).Return(blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark{}, nil) + + // Mock TSDB querier (for remaining blocks) + mockTSDBQuerier := &mockParquetQuerier{} + + // Mock parquet querier (captures hints) + mockParquetQuerierInstance := &mockParquetQuerier{} + + // Create the parquetQuerierWithFallback + pq := &parquetQuerierWithFallback{ + minT: testData.minT, + maxT: testData.maxT, + finder: finder, + blocksStoreQuerier: mockTSDBQuerier, + parquetQuerier: mockParquetQuerierInstance, + queryIngestersWithin: testData.queryIngestersWithin, + projectionHintsIngesterBuffer: testData.projectionHintsIngesterBuffer, + queryStoreAfter: 0, // Disable queryStoreAfter manipulation + metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), + limits: defaultOverrides(t, 0), + logger: log.NewNopLogger(), + defaultBlockStoreType: parquetBlockStore, + fallbackDisabled: false, + } + + matchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test_metric"), + } + + // Create input hints with projection + inputHints := &storage.SelectHints{ + Start: testData.minT, + End: testData.maxT, + ProjectionLabels: testData.inputProjectionLabels, + ProjectionInclude: true, + } + + // Execute Select + set := pq.Select(ctx, false, inputHints, matchers...) + require.NotNil(t, set) + + // Verify the hints passed to the parquet querier + if !testData.hasRemainingBlocks { + // If all parquet blocks, verify hints passed to parquet querier + require.NotNil(t, mockParquetQuerierInstance.queriedHints, "parquet querier should have been called") + require.Equal(t, testData.expectedProjectionLabels, mockParquetQuerierInstance.queriedHints.ProjectionLabels, + "projection labels mismatch") + require.Equal(t, testData.expectedProjectionInclude, mockParquetQuerierInstance.queriedHints.ProjectionInclude, + "projection include flag mismatch") + } + }) + } +} + func TestMaterializedLabelsFilterCallback(t *testing.T) { tests := []struct { name string diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index a6912ea024a..0de2c714101 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -92,11 +92,13 @@ type Config struct { EnablePromQLExperimentalFunctions bool `yaml:"enable_promql_experimental_functions"` // Query Parquet files if available - EnableParquetQueryable bool `yaml:"enable_parquet_queryable"` - ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"` - ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"` - ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"` - ParquetQueryableShardCacheTTL time.Duration `yaml:"parquet_queryable_shard_cache_ttl"` + EnableParquetQueryable bool `yaml:"enable_parquet_queryable"` + ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"` + ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"` + ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"` + ParquetQueryableShardCacheTTL time.Duration `yaml:"parquet_queryable_shard_cache_ttl"` + ParquetQueryableHonorProjectionHints bool `yaml:"parquet_queryable_honor_projection_hints"` + ParquetQueryableProjectionHintsIngesterBuffer time.Duration `yaml:"parquet_queryable_projection_hints_ingester_buffer"` DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"` } @@ -150,6 +152,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.ParquetQueryableShardCacheSize, "querier.parquet-queryable-shard-cache-size", 512, "[Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.") f.DurationVar(&cfg.ParquetQueryableShardCacheTTL, "querier.parquet-queryable-shard-cache-ttl", 24*time.Hour, "[Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.") f.StringVar(&cfg.ParquetQueryableDefaultBlockStore, "querier.parquet-queryable-default-block-store", string(parquetBlockStore), "[Experimental] Parquet queryable's default block store to query. Valid options are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback to store gateway.") + f.BoolVar(&cfg.ParquetQueryableHonorProjectionHints, "querier.parquet-queryable-honor-projection-hints", false, "[Experimental] If true, parquet queryable will honor projection hints and only materialize requested labels. Projection is only applied when all queried blocks are parquet blocks and not querying ingesters.") + f.DurationVar(&cfg.ParquetQueryableProjectionHintsIngesterBuffer, "querier.parquet-queryable-projection-hints-ingester-buffer", time.Hour, "[Experimental] Time buffer to use when checking if query overlaps with ingester data. Projection hints are disabled if query time range overlaps with (now - query-ingesters-within - buffer).") f.BoolVar(&cfg.DistributedExecEnabled, "querier.distributed-exec-enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.") f.BoolVar(&cfg.ParquetQueryableFallbackDisabled, "querier.parquet-queryable-fallback-disabled", false, "[Experimental] Disable Parquet queryable to fallback queries to Store Gateway if the block is not available as Parquet files but available in TSDB. Setting this to true will disable the fallback and users can remove Store Gateway. But need to make sure Parquet files are created before it is queryable.") } diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 66fbc2d76db..c9315ad43fe 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -5840,6 +5840,19 @@ "type": "boolean", "x-cli-flag": "querier.parquet-queryable-fallback-disabled" }, + "parquet_queryable_honor_projection_hints": { + "default": false, + "description": "[Experimental] If true, parquet queryable will honor projection hints and only materialize requested labels. Projection is only applied when all queried blocks are parquet blocks and not querying ingesters.", + "type": "boolean", + "x-cli-flag": "querier.parquet-queryable-honor-projection-hints" + }, + "parquet_queryable_projection_hints_ingester_buffer": { + "default": "1h0m0s", + "description": "[Experimental] Time buffer to use when checking if query overlaps with ingester data. Projection hints are disabled if query time range overlaps with (now - query-ingesters-within - buffer).", + "type": "string", + "x-cli-flag": "querier.parquet-queryable-projection-hints-ingester-buffer", + "x-format": "duration" + }, "parquet_queryable_shard_cache_size": { "default": 512, "description": "[Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.", From c4dbd84fd76ff9ca479c9297bd16cc256c13ce21 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Tue, 9 Dec 2025 22:34:49 -0800 Subject: [PATCH 2/8] update tests Signed-off-by: yeya24 --- integration/parquet_querier_test.go | 16 +++++++++------- pkg/querier/parquet_queryable.go | 3 ++- pkg/querier/parquet_queryable_test.go | 22 +++++++++++++++++++++- 3 files changed, 32 insertions(+), 9 deletions(-) diff --git a/integration/parquet_querier_test.go b/integration/parquet_querier_test.go index ad797b2dbf2..ef6d68ad4dd 100644 --- a/integration/parquet_querier_test.go +++ b/integration/parquet_querier_test.go @@ -305,7 +305,7 @@ func TestParquetProjectionPushdown(t *testing.T) { testCases := []struct { name string query string - expectedLabels []string // Labels that should be present in result (besides __name__) + expectedLabels []string // Labels that should be present in result }{ { name: "simple_sum_by_job", @@ -346,17 +346,19 @@ func TestParquetProjectionPushdown(t *testing.T) { actualLabels[label.Name] = struct{}{} } + // Check that all expected labels are present + for _, expectedLabel := range tc.expectedLabels { + _, ok := actualLabels[expectedLabel] + require.True(t, ok, + "series should have %s label", expectedLabel) + } + // Check that no unexpected labels are present for lbl := range actualLabels { if !slices.Contains(tc.expectedLabels, lbl) { - require.Fail(t, "series should not have %s label", lbl) + require.Fail(t, "series should not have unexpected label: %s", lbl) } } - // Check that all expected labels are present - for _, expectedLabel := range tc.expectedLabels { - require.True(t, actualLabels[expectedLabel], - "series should have %s label", expectedLabel) - } } }) } diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 4872eade8d7..89e89b58ef2 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -510,7 +510,8 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool } queryIngesters := q.queryIngestersWithin == 0 || maxt >= util.TimeToMillis(time.Now().Add(-q.queryIngestersWithin).Add(-q.projectionHintsIngesterBuffer)) - disableProjection := len(remaining) > 0 || queryIngesters + // Only enable projection if ProjectionInclude is true. + disableProjection := len(remaining) > 0 || queryIngesters || !hints.ProjectionInclude // Reset projection hints if there are mixed blocks (both parquet and non-parquet) or the query needs to merge results between ingester and parquet blocks if disableProjection { hints.ProjectionLabels = nil diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index fdea54b3013..c867f0dbd9f 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -643,6 +643,7 @@ func TestSelectProjectionHints(t *testing.T) { projectionHintsIngesterBuffer time.Duration hasRemainingBlocks bool // Whether there are non-parquet (TSDB) blocks inputProjectionLabels []string + inputProjectionInclude bool // Input ProjectionInclude value expectedProjectionLabels []string // nil means projection disabled expectedProjectionInclude bool }{ @@ -653,6 +654,7 @@ func TestSelectProjectionHints(t *testing.T) { projectionHintsIngesterBuffer: 1 * time.Hour, hasRemainingBlocks: false, inputProjectionLabels: []string{"__name__", "job"}, + inputProjectionInclude: true, expectedProjectionLabels: []string{"__name__", "job"}, // Preserved expectedProjectionInclude: true, }, @@ -663,6 +665,7 @@ func TestSelectProjectionHints(t *testing.T) { projectionHintsIngesterBuffer: 1 * time.Hour, hasRemainingBlocks: true, // Mixed blocks inputProjectionLabels: []string{"__name__", "job"}, + inputProjectionInclude: true, expectedProjectionLabels: nil, // Reset expectedProjectionInclude: false, }, @@ -673,6 +676,7 @@ func TestSelectProjectionHints(t *testing.T) { projectionHintsIngesterBuffer: 1 * time.Hour, hasRemainingBlocks: false, inputProjectionLabels: []string{"__name__", "job"}, + inputProjectionInclude: true, expectedProjectionLabels: nil, // Reset (maxT >= now - 2h - 1h = now - 3h) expectedProjectionInclude: false, }, @@ -683,6 +687,7 @@ func TestSelectProjectionHints(t *testing.T) { projectionHintsIngesterBuffer: 1 * time.Hour, hasRemainingBlocks: false, inputProjectionLabels: []string{"__name__", "job"}, + inputProjectionInclude: true, expectedProjectionLabels: nil, // Reset (maxT = now - 2.5h, threshold = now - 3h, so 2.5h > 3h means disabled) expectedProjectionInclude: false, }, @@ -693,6 +698,7 @@ func TestSelectProjectionHints(t *testing.T) { projectionHintsIngesterBuffer: 1 * time.Hour, hasRemainingBlocks: false, inputProjectionLabels: []string{"__name__", "job"}, + inputProjectionInclude: true, expectedProjectionLabels: nil, // Reset expectedProjectionInclude: false, }, @@ -703,6 +709,7 @@ func TestSelectProjectionHints(t *testing.T) { projectionHintsIngesterBuffer: 1 * time.Hour, hasRemainingBlocks: false, inputProjectionLabels: []string{"__name__", "job"}, + inputProjectionInclude: true, expectedProjectionLabels: []string{"__name__", "job"}, // Preserved expectedProjectionInclude: true, }, @@ -713,6 +720,7 @@ func TestSelectProjectionHints(t *testing.T) { projectionHintsIngesterBuffer: 0, // No buffer hasRemainingBlocks: false, inputProjectionLabels: []string{"__name__", "job"}, + inputProjectionInclude: true, expectedProjectionLabels: []string{"__name__", "job"}, // Preserved expectedProjectionInclude: true, }, @@ -723,9 +731,21 @@ func TestSelectProjectionHints(t *testing.T) { projectionHintsIngesterBuffer: 1 * time.Hour, hasRemainingBlocks: false, inputProjectionLabels: []string{"__name__", "job"}, + inputProjectionInclude: true, expectedProjectionLabels: nil, // Reset expectedProjectionInclude: false, }, + "projection disabled: ProjectionInclude is false, disable projection": { + minT: util.TimeToMillis(now.Add(-10 * time.Hour)), + maxT: util.TimeToMillis(now.Add(-5 * time.Hour)), + queryIngestersWithin: 2 * time.Hour, + projectionHintsIngesterBuffer: 1 * time.Hour, + hasRemainingBlocks: false, + inputProjectionLabels: []string{"job"}, + inputProjectionInclude: false, + expectedProjectionLabels: nil, + expectedProjectionInclude: false, + }, } for testName, testData := range tests { @@ -782,7 +802,7 @@ func TestSelectProjectionHints(t *testing.T) { Start: testData.minT, End: testData.maxT, ProjectionLabels: testData.inputProjectionLabels, - ProjectionInclude: true, + ProjectionInclude: testData.inputProjectionInclude, } // Execute Select From b29a5f72b723b434218896803b2a4889856057d4 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Thu, 11 Dec 2025 09:07:37 -0800 Subject: [PATCH 3/8] upgrade parquet common version Signed-off-by: yeya24 --- go.mod | 2 +- go.sum | 6 +++--- integration/parquet_querier_test.go | 20 ++++++++++++++----- .../parquet-common/search/materialize.go | 3 ++- vendor/modules.txt | 2 +- 5 files changed, 22 insertions(+), 11 deletions(-) diff --git a/go.mod b/go.mod index a5f5e66dee1..9a26035ca69 100644 --- a/go.mod +++ b/go.mod @@ -87,7 +87,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/oklog/ulid/v2 v2.1.1 github.com/parquet-go/parquet-go v0.25.1 - github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71 + github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94 github.com/prometheus/client_golang/exp v0.0.0-20250914183048-a974e0d45e0a github.com/prometheus/procfs v0.16.1 github.com/sercand/kuberesolver/v5 v5.1.1 diff --git a/go.sum b/go.sum index e8d973c9536..de4f4b193d6 100644 --- a/go.sum +++ b/go.sum @@ -1590,7 +1590,7 @@ github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQ github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= github.com/opentracing-contrib/go-grpc v0.1.2 h1:MP16Ozc59kqqwn1v18aQxpeGZhsBanJ2iurZYaQSZ+g= github.com/opentracing-contrib/go-grpc v0.1.2/go.mod h1:glU6rl1Fhfp9aXUHkE36K2mR4ht8vih0ekOVlWKEUHM= -github.com/opentracing-contrib/go-stdlib v1.1.0 h1:cZBWc4pA4e65tqTJddbflK435S0tDImj6c9BMvkdUH0= +github.com/opentracing-contrib/go-stdlib v1.1.0 h1:hSJ8yYaiAO/k2YZUeWJWpQCPE2wRCDtxRnir0gU6wbA= github.com/opentracing-contrib/go-stdlib v1.1.0/go.mod h1:S0p+X9p6dcBkoMTL+Qq2VOvxKs9ys5PpYWXWqlCS0bQ= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= @@ -1634,8 +1634,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71 h1:BwrzRNGy0GbnBA7rQd85G6NuFvydvwTXxRB9XiA5TXk= -github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71/go.mod h1:gewN7ZuOXJh0X2I57iGHyDLbLvL891P2Ynko2QM5axY= +github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94 h1:6WmPxbqGMjBKLOZvurIZR5eEBF0Rd0t1oQ06PMWaHe8= +github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94/go.mod h1:gewN7ZuOXJh0X2I57iGHyDLbLvL891P2Ynko2QM5axY= github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4= github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis= github.com/prometheus/alertmanager v0.29.0 h1:/ET4NmAGx2Dv9kStrXIBqBgHyiSgIk4OetY+hoZRfgc= diff --git a/integration/parquet_querier_test.go b/integration/parquet_querier_test.go index ef6d68ad4dd..542c29e220f 100644 --- a/integration/parquet_querier_test.go +++ b/integration/parquet_querier_test.go @@ -13,8 +13,8 @@ import ( "time" "github.com/cortexproject/promqlsmith" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block" @@ -301,12 +301,16 @@ func TestParquetProjectionPushdown(t *testing.T) { c, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1") require.NoError(t, err) - // Test queries that should use projection hints testCases := []struct { name string query string expectedLabels []string // Labels that should be present in result }{ + { + name: "vector selector query should not use projection", + query: `http_requests_total`, + expectedLabels: []string{"__name__", "job", "instance", "status_code", "method", "path", "cluster"}, + }, { name: "simple_sum_by_job", query: `sum by (job) (http_requests_total)`, @@ -322,6 +326,11 @@ func TestParquetProjectionPushdown(t *testing.T) { query: `sum by (job, status_code) (http_requests_total)`, expectedLabels: []string{"job", "status_code"}, }, + { + name: "aggregation without query", + query: `sum without (instance, method) (http_requests_total)`, + expectedLabels: []string{"job", "status_code", "path", "cluster"}, + }, } for _, tc := range testCases { @@ -334,16 +343,17 @@ func TestParquetProjectionPushdown(t *testing.T) { require.NotNil(t, result) // Verify we got results - matrix := result.(promql.Matrix) + matrix, ok := result.(model.Matrix) + require.True(t, ok, "result should be a matrix") require.NotEmpty(t, matrix, "query should return results") t.Logf("Query returned %d series", len(matrix)) // Verify projection worked: series should only have the expected labels - for i, series := range matrix { + for _, series := range matrix { actualLabels := make(map[string]struct{}) for _, label := range series.Metric { - actualLabels[label.Name] = struct{}{} + actualLabels[string(label.Name)] = struct{}{} } // Check that all expected labels are present diff --git a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go index 24daad7ef26..01c854275cf 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go @@ -467,7 +467,8 @@ func (m *Materializer) MaterializeLabels(ctx context.Context, hints *prom_storag break } } - needsHash = !seriesHashExcluded + // Hash column is only needed if projection labels are provided and series hash is not excluded. + needsHash = !seriesHashExcluded && len(hints.ProjectionLabels) > 0 for _, labelName := range hints.ProjectionLabels { if labelName == schema.SeriesHashColumn { diff --git a/vendor/modules.txt b/vendor/modules.txt index d4aecb1b3e7..91fa9c2d44c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -952,7 +952,7 @@ github.com/planetscale/vtprotobuf/types/known/wrapperspb # github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 ## explicit github.com/pmezard/go-difflib/difflib -# github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71 +# github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94 ## explicit; go 1.24.0 github.com/prometheus-community/parquet-common/convert github.com/prometheus-community/parquet-common/queryable From 7284e52da7020628099242dbf7ab4b9a426918a4 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Thu, 11 Dec 2025 09:54:44 -0800 Subject: [PATCH 4/8] lint Signed-off-by: yeya24 --- go.sum | 2 +- integration/parquet_querier_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go.sum b/go.sum index de4f4b193d6..8dab5b15dfe 100644 --- a/go.sum +++ b/go.sum @@ -1590,7 +1590,7 @@ github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQ github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= github.com/opentracing-contrib/go-grpc v0.1.2 h1:MP16Ozc59kqqwn1v18aQxpeGZhsBanJ2iurZYaQSZ+g= github.com/opentracing-contrib/go-grpc v0.1.2/go.mod h1:glU6rl1Fhfp9aXUHkE36K2mR4ht8vih0ekOVlWKEUHM= -github.com/opentracing-contrib/go-stdlib v1.1.0 h1:hSJ8yYaiAO/k2YZUeWJWpQCPE2wRCDtxRnir0gU6wbA= +github.com/opentracing-contrib/go-stdlib v1.1.0 h1:cZBWc4pA4e65tqTJddbflK435S0tDImj6c9BMvkdUH0= github.com/opentracing-contrib/go-stdlib v1.1.0/go.mod h1:S0p+X9p6dcBkoMTL+Qq2VOvxKs9ys5PpYWXWqlCS0bQ= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= diff --git a/integration/parquet_querier_test.go b/integration/parquet_querier_test.go index 542c29e220f..f24462b0d8e 100644 --- a/integration/parquet_querier_test.go +++ b/integration/parquet_querier_test.go @@ -353,7 +353,7 @@ func TestParquetProjectionPushdown(t *testing.T) { for _, series := range matrix { actualLabels := make(map[string]struct{}) for _, label := range series.Metric { - actualLabels[string(label.Name)] = struct{}{} + actualLabels[string(label)] = struct{}{} } // Check that all expected labels are present From e25a78ed0a2478675be9026601ef25b02cc6fea2 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Thu, 11 Dec 2025 19:20:46 -0800 Subject: [PATCH 5/8] don't projection if any block doesn't have hash column Signed-off-by: yeya24 --- integration/parquet_querier_test.go | 2 +- pkg/querier/parquet_queryable.go | 19 +++- pkg/querier/parquet_queryable_test.go | 145 +++++++++++++++++++++++--- 3 files changed, 150 insertions(+), 16 deletions(-) diff --git a/integration/parquet_querier_test.go b/integration/parquet_querier_test.go index f24462b0d8e..a4e38d0e438 100644 --- a/integration/parquet_querier_test.go +++ b/integration/parquet_querier_test.go @@ -219,7 +219,7 @@ func TestParquetProjectionPushdown(t *testing.T) { "-parquet-converter.enabled": "true", // Querier - Enable Thanos engine with projection optimizer "-querier.thanos-engine": "true", - "-querier.optimizers": "default,projection", // Enable projection optimizer + "-querier.optimizers": "propagate-matchers,sort-matchers,merge-selects,detect-histogram-stats,projection", // Enable all optimizers including projection "-querier.enable-parquet-queryable": "true", "-querier.parquet-queryable-honor-projection-hints": "true", // Honor projection hints // Set query-ingesters-within to 2h so queries older than 2h don't hit ingesters diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 89e89b58ef2..78ba15b350a 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -28,6 +28,7 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/querysharding" "github.com/cortexproject/cortex/pkg/storage/bucket" + cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/util" @@ -511,8 +512,11 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool queryIngesters := q.queryIngestersWithin == 0 || maxt >= util.TimeToMillis(time.Now().Add(-q.queryIngestersWithin).Add(-q.projectionHintsIngesterBuffer)) // Only enable projection if ProjectionInclude is true. - disableProjection := len(remaining) > 0 || queryIngesters || !hints.ProjectionInclude - // Reset projection hints if there are mixed blocks (both parquet and non-parquet) or the query needs to merge results between ingester and parquet blocks + disableProjection := len(remaining) > 0 || queryIngesters || !hints.ProjectionInclude || !allParquetBlocksHaveHashColumn(parquet) + // Reset projection hints if: + // - there are mixed blocks (both parquet and non-parquet) + // - the query needs to merge results between ingester and parquet blocks + // - not all parquet blocks have hash column (version < 2) if disableProjection { hints.ProjectionLabels = nil hints.ProjectionInclude = false @@ -617,6 +621,17 @@ func (q *parquetQuerierWithFallback) incrementOpsMetric(method string, remaining } } +// allParquetBlocksHaveHashColumn checks if all parquet blocks have version >= 2, which means they have the hash column. +// Parquet blocks with version 1 don't have the hash column, so projection cannot be enabled for them. +func allParquetBlocksHaveHashColumn(blocks []*bucketindex.Block) bool { + for _, b := range blocks { + if b.Parquet == nil || b.Parquet.Version < cortex_parquet.ParquetConverterMarkVersion2 { + return false + } + } + return true +} + type shardMatcherLabelsFilter struct { shardMatcher *storepb.ShardMatcher } diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index c867f0dbd9f..912d5147c55 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -173,7 +173,7 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { } finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT, mock.Anything).Return(bucketindex.Blocks{ - &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, &bucketindex.Block{ID: block2}, }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) @@ -242,8 +242,8 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { } finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything, mock.Anything).Return(bucketindex.Blocks{ - &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, - &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, + &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) t.Run("select", func(t *testing.T) { @@ -316,8 +316,8 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { } finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT, mock.Anything).Return(bucketindex.Blocks{ - &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, - &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, + &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) t.Run("select", func(t *testing.T) { @@ -642,17 +642,20 @@ func TestSelectProjectionHints(t *testing.T) { queryIngestersWithin time.Duration projectionHintsIngesterBuffer time.Duration hasRemainingBlocks bool // Whether there are non-parquet (TSDB) blocks + parquetBlockVersion int // Version of parquet blocks (1 or 2) + mixedVersions bool // If true, block1 is v1, block2 is v2 inputProjectionLabels []string inputProjectionInclude bool // Input ProjectionInclude value expectedProjectionLabels []string // nil means projection disabled expectedProjectionInclude bool }{ - "projection enabled: all parquet blocks, query older than ingester window": { + "projection enabled: all parquet blocks v2, query older than ingester window": { minT: util.TimeToMillis(now.Add(-10 * time.Hour)), maxT: util.TimeToMillis(now.Add(-5 * time.Hour)), queryIngestersWithin: 2 * time.Hour, projectionHintsIngesterBuffer: 1 * time.Hour, hasRemainingBlocks: false, + parquetBlockVersion: parquet.ParquetConverterMarkVersion2, // Version 2 has hash column inputProjectionLabels: []string{"__name__", "job"}, inputProjectionInclude: true, expectedProjectionLabels: []string{"__name__", "job"}, // Preserved @@ -664,6 +667,7 @@ func TestSelectProjectionHints(t *testing.T) { queryIngestersWithin: 2 * time.Hour, projectionHintsIngesterBuffer: 1 * time.Hour, hasRemainingBlocks: true, // Mixed blocks + parquetBlockVersion: parquet.ParquetConverterMarkVersion2, inputProjectionLabels: []string{"__name__", "job"}, inputProjectionInclude: true, expectedProjectionLabels: nil, // Reset @@ -675,6 +679,7 @@ func TestSelectProjectionHints(t *testing.T) { queryIngestersWithin: 2 * time.Hour, projectionHintsIngesterBuffer: 1 * time.Hour, hasRemainingBlocks: false, + parquetBlockVersion: parquet.ParquetConverterMarkVersion2, inputProjectionLabels: []string{"__name__", "job"}, inputProjectionInclude: true, expectedProjectionLabels: nil, // Reset (maxT >= now - 2h - 1h = now - 3h) @@ -686,6 +691,7 @@ func TestSelectProjectionHints(t *testing.T) { queryIngestersWithin: 2 * time.Hour, projectionHintsIngesterBuffer: 1 * time.Hour, hasRemainingBlocks: false, + parquetBlockVersion: parquet.ParquetConverterMarkVersion2, inputProjectionLabels: []string{"__name__", "job"}, inputProjectionInclude: true, expectedProjectionLabels: nil, // Reset (maxT = now - 2.5h, threshold = now - 3h, so 2.5h > 3h means disabled) @@ -697,6 +703,7 @@ func TestSelectProjectionHints(t *testing.T) { queryIngestersWithin: 0, // Always query ingesters projectionHintsIngesterBuffer: 1 * time.Hour, hasRemainingBlocks: false, + parquetBlockVersion: parquet.ParquetConverterMarkVersion2, inputProjectionLabels: []string{"__name__", "job"}, inputProjectionInclude: true, expectedProjectionLabels: nil, // Reset @@ -708,6 +715,7 @@ func TestSelectProjectionHints(t *testing.T) { queryIngestersWithin: 2 * time.Hour, projectionHintsIngesterBuffer: 1 * time.Hour, hasRemainingBlocks: false, + parquetBlockVersion: parquet.ParquetConverterMarkVersion2, inputProjectionLabels: []string{"__name__", "job"}, inputProjectionInclude: true, expectedProjectionLabels: []string{"__name__", "job"}, // Preserved @@ -719,6 +727,7 @@ func TestSelectProjectionHints(t *testing.T) { queryIngestersWithin: 2 * time.Hour, projectionHintsIngesterBuffer: 0, // No buffer hasRemainingBlocks: false, + parquetBlockVersion: parquet.ParquetConverterMarkVersion2, inputProjectionLabels: []string{"__name__", "job"}, inputProjectionInclude: true, expectedProjectionLabels: []string{"__name__", "job"}, // Preserved @@ -730,6 +739,7 @@ func TestSelectProjectionHints(t *testing.T) { queryIngestersWithin: 2 * time.Hour, projectionHintsIngesterBuffer: 1 * time.Hour, hasRemainingBlocks: false, + parquetBlockVersion: parquet.ParquetConverterMarkVersion2, inputProjectionLabels: []string{"__name__", "job"}, inputProjectionInclude: true, expectedProjectionLabels: nil, // Reset @@ -741,11 +751,36 @@ func TestSelectProjectionHints(t *testing.T) { queryIngestersWithin: 2 * time.Hour, projectionHintsIngesterBuffer: 1 * time.Hour, hasRemainingBlocks: false, + parquetBlockVersion: parquet.ParquetConverterMarkVersion2, inputProjectionLabels: []string{"job"}, inputProjectionInclude: false, expectedProjectionLabels: nil, expectedProjectionInclude: false, }, + "projection disabled: parquet blocks version 1 (no hash column)": { + minT: util.TimeToMillis(now.Add(-10 * time.Hour)), + maxT: util.TimeToMillis(now.Add(-5 * time.Hour)), + queryIngestersWithin: 2 * time.Hour, + projectionHintsIngesterBuffer: 1 * time.Hour, + hasRemainingBlocks: false, + parquetBlockVersion: parquet.ParquetConverterMarkVersion1, // Version 1 doesn't have hash column + inputProjectionLabels: []string{"__name__", "job"}, + inputProjectionInclude: true, + expectedProjectionLabels: nil, // Reset because version 1 doesn't support projection + expectedProjectionInclude: false, + }, + "projection disabled: mixed parquet block versions (v1 and v2)": { + minT: util.TimeToMillis(now.Add(-10 * time.Hour)), + maxT: util.TimeToMillis(now.Add(-5 * time.Hour)), + queryIngestersWithin: 2 * time.Hour, + projectionHintsIngesterBuffer: 1 * time.Hour, + hasRemainingBlocks: false, + mixedVersions: true, // block1 is v1, block2 is v2 + inputProjectionLabels: []string{"__name__", "job"}, + inputProjectionInclude: true, + expectedProjectionLabels: nil, // Reset because not all blocks support projection + expectedProjectionInclude: false, + }, } for testName, testData := range tests { @@ -758,14 +793,20 @@ func TestSelectProjectionHints(t *testing.T) { if testData.hasRemainingBlocks { // Mixed: one parquet, one TSDB blocks = bucketindex.Blocks{ - &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: testData.parquetBlockVersion}}, &bucketindex.Block{ID: block2}, // No parquet metadata = TSDB block } + } else if testData.mixedVersions { + // Mixed parquet versions: block1 is v1, block2 is v2 + blocks = bucketindex.Blocks{ + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, + &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion2}}, + } } else { - // All parquet + // All parquet with same version blocks = bucketindex.Blocks{ - &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, - &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: testData.parquetBlockVersion}}, + &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: testData.parquetBlockVersion}}, } } finder.On("GetBlocks", mock.Anything, "user-1", testData.minT, mock.Anything, mock.Anything).Return(blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark{}, nil) @@ -822,6 +863,84 @@ func TestSelectProjectionHints(t *testing.T) { } } +func TestAllParquetBlocksHaveHashColumn(t *testing.T) { + block1 := ulid.MustNew(1, nil) + block2 := ulid.MustNew(2, nil) + block3 := ulid.MustNew(3, nil) + + tests := map[string]struct { + blocks []*bucketindex.Block + expected bool + }{ + "all blocks v2": { + blocks: []*bucketindex.Block{ + {ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion2}}, + {ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion2}}, + }, + expected: true, + }, + "all blocks v1": { + blocks: []*bucketindex.Block{ + {ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, + {ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, + }, + expected: false, + }, + "mixed versions v1 and v2": { + blocks: []*bucketindex.Block{ + {ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, + {ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion2}}, + }, + expected: false, + }, + "one block nil parquet metadata": { + blocks: []*bucketindex.Block{ + {ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion2}}, + {ID: block2, Parquet: nil}, + }, + expected: false, + }, + "all blocks nil parquet metadata": { + blocks: []*bucketindex.Block{ + {ID: block1, Parquet: nil}, + {ID: block2, Parquet: nil}, + }, + expected: false, + }, + "single block v2": { + blocks: []*bucketindex.Block{ + {ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion2}}, + }, + expected: true, + }, + "single block v1": { + blocks: []*bucketindex.Block{ + {ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, + }, + expected: false, + }, + "empty blocks": { + blocks: []*bucketindex.Block{}, + expected: true, // No blocks with version < 2, so return true + }, + "all blocks v3 or higher": { + blocks: []*bucketindex.Block{ + {ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 3}}, + {ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 4}}, + {ID: block3, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion2}}, + }, + expected: true, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + result := allParquetBlocksHaveHashColumn(testData.blocks) + require.Equal(t, testData.expected, result, "unexpected result for %s", testName) + }) + } +} + func TestMaterializedLabelsFilterCallback(t *testing.T) { tests := []struct { name string @@ -989,7 +1108,7 @@ func TestParquetQueryableFallbackDisabled(t *testing.T) { // Set up blocks where block1 has parquet metadata but block2 doesn't finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything, mock.Anything).Return(bucketindex.Blocks{ - &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, // Available as parquet + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, // Available as parquet &bucketindex.Block{ID: block2}, // Not available as parquet }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) @@ -1048,8 +1167,8 @@ func TestParquetQueryableFallbackDisabled(t *testing.T) { // Set up blocks where both blocks have parquet metadata finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything, mock.Anything).Return(bucketindex.Blocks{ - &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, // Available as parquet - &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, // Available as parquet + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, // Available as parquet + &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, // Available as parquet }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) t.Run("select should work without error", func(t *testing.T) { From 025e9fb581f962af766e3523e169697aee60f655 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Thu, 11 Dec 2025 22:40:34 -0800 Subject: [PATCH 6/8] fix test Signed-off-by: yeya24 --- integration/parquet_querier_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/integration/parquet_querier_test.go b/integration/parquet_querier_test.go index a4e38d0e438..1d66724e5a2 100644 --- a/integration/parquet_querier_test.go +++ b/integration/parquet_querier_test.go @@ -343,16 +343,16 @@ func TestParquetProjectionPushdown(t *testing.T) { require.NotNil(t, result) // Verify we got results - matrix, ok := result.(model.Matrix) - require.True(t, ok, "result should be a matrix") - require.NotEmpty(t, matrix, "query should return results") + vector, ok := result.(model.Vector) + require.True(t, ok, "result should be a vector") + require.NotEmpty(t, vector, "query should return results") - t.Logf("Query returned %d series", len(matrix)) + t.Logf("Query returned %d series", len(vector)) // Verify projection worked: series should only have the expected labels - for _, series := range matrix { + for _, sample := range vector { actualLabels := make(map[string]struct{}) - for _, label := range series.Metric { + for _, label := range sample.Metric { actualLabels[string(label)] = struct{}{} } From 4e48c6b86f73ae33b2e9e95d3c92a54bb7063098 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Fri, 12 Dec 2025 00:11:10 -0800 Subject: [PATCH 7/8] wait longer by checking bucket index file contains parquet file Signed-off-by: yeya24 --- integration/parquet_querier_test.go | 32 ++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/integration/parquet_querier_test.go b/integration/parquet_querier_test.go index 1d66724e5a2..b5e4ad24dbf 100644 --- a/integration/parquet_querier_test.go +++ b/integration/parquet_querier_test.go @@ -26,6 +26,7 @@ import ( "github.com/cortexproject/cortex/integration/e2ecortex" "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/util/log" cortex_testutil "github.com/cortexproject/cortex/pkg/util/test" ) @@ -280,22 +281,33 @@ func TestParquetProjectionPushdown(t *testing.T) { err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc) require.NoError(t, err) - // Wait until we convert the blocks to parquet + // Wait until we convert the blocks to parquet AND bucket index is updated cortex_testutil.Poll(t, 30*time.Second, true, func() interface{} { - found := false - foundBucketIndex := false - + // Check if parquet marker exists + markerFound := false err := bkt.Iter(context.Background(), "", func(name string) error { if name == fmt.Sprintf("parquet-markers/%v-parquet-converter-mark.json", id.String()) { - found = true - } - if name == "bucket-index.json.gz" { - foundBucketIndex = true + markerFound = true } return nil }, objstore.WithRecursiveIter()) - require.NoError(t, err) - return found && foundBucketIndex + if err != nil || !markerFound { + return false + } + + // Check if bucket index exists AND contains the parquet block metadata + idx, err := bucketindex.ReadIndex(ctx, bkt, "user-1", nil, log.Logger) + if err != nil { + return false + } + + // Verify the block is in the bucket index with parquet metadata + for _, b := range idx.Blocks { + if b.ID == id && b.Parquet != nil { + return true + } + } + return false }) c, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1") From a1bbfbb371415deba8b72e77fe620416a6f7cca1 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Sun, 14 Dec 2025 17:33:39 -0800 Subject: [PATCH 8/8] fix tests Signed-off-by: yeya24 --- integration/parquet_querier_test.go | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/integration/parquet_querier_test.go b/integration/parquet_querier_test.go index b5e4ad24dbf..b7ad18f6d68 100644 --- a/integration/parquet_querier_test.go +++ b/integration/parquet_querier_test.go @@ -25,6 +25,7 @@ import ( e2edb "github.com/cortexproject/cortex/integration/e2e/db" "github.com/cortexproject/cortex/integration/e2ecortex" "github.com/cortexproject/cortex/pkg/storage/bucket" + cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet" "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/util/log" @@ -180,7 +181,7 @@ func TestParquetFuzz(t *testing.T) { labels.MustNewMatcher(labels.MatchEqual, "type", "parquet")))) } -func TestParquetProjectionPushdown(t *testing.T) { +func TestParquetProjectionPushdownFuzz(t *testing.T) { s, err := e2e.NewScenario(networkName) require.NoError(t, err) defer s.Close() @@ -247,14 +248,12 @@ func TestParquetProjectionPushdown(t *testing.T) { statusCodes := []string{"200", "400", "404", "500", "502"} methods := []string{"GET", "POST", "PUT", "DELETE"} now := time.Now() - // Make sure query time is old enough to not overlap with ingesters - // With query-ingesters-within=2h, queries with maxT < now-2h won't hit ingesters - // Using 24h-48h ago ensures no ingester overlap, allowing projection to be enabled - start := now.Add(-time.Hour * 48) - end := now.Add(-time.Hour * 24) + // Make sure query time is old enough to not overlap with ingesters. + start := now.Add(-time.Hour * 72) + end := now.Add(-time.Hour * 48) // Create series with multiple labels - for i := 0; i < numSeries; i++ { + for i := range numSeries { lbls = append(lbls, labels.FromStrings( labels.MetricName, "http_requests_total", "job", "api-server", @@ -276,16 +275,17 @@ func TestParquetProjectionPushdown(t *testing.T) { storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"]) require.NoError(t, err) - bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil) + bkt := storage.GetBucket() + userBucket := bucket.NewUserBucketClient("user-1", bkt, nil) - err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc) + err = block.Upload(ctx, log.Logger, userBucket, filepath.Join(dir, id.String()), metadata.NoneFunc) require.NoError(t, err) // Wait until we convert the blocks to parquet AND bucket index is updated - cortex_testutil.Poll(t, 30*time.Second, true, func() interface{} { + cortex_testutil.Poll(t, 300*time.Second, true, func() interface{} { // Check if parquet marker exists markerFound := false - err := bkt.Iter(context.Background(), "", func(name string) error { + err := userBucket.Iter(context.Background(), "", func(name string) error { if name == fmt.Sprintf("parquet-markers/%v-parquet-converter-mark.json", id.String()) { markerFound = true } @@ -304,6 +304,7 @@ func TestParquetProjectionPushdown(t *testing.T) { // Verify the block is in the bucket index with parquet metadata for _, b := range idx.Blocks { if b.ID == id && b.Parquet != nil { + require.True(t, b.Parquet.Version == cortex_parquet.CurrentVersion) return true } } @@ -364,7 +365,7 @@ func TestParquetProjectionPushdown(t *testing.T) { // Verify projection worked: series should only have the expected labels for _, sample := range vector { actualLabels := make(map[string]struct{}) - for _, label := range sample.Metric { + for label := range sample.Metric { actualLabels[string(label)] = struct{}{} }