Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* [ENHANCEMENT] Distributor: Added `cortex_distributor_received_histogram_buckets` metric to track number of buckets in received native histogram samples before validation, per user. #7569
* [ENHANCEMENT] Distributor: Add `WrappedHistogram` with configurable size limit (`-validation.max-native-histogram-size-bytes`) to cap native histogram protobuf size before unmarshalling. #7570
* [ENHANCEMENT] Ingester: Add lazy regex evaluation on head postings cache miss. Defers expensive regex matchers on high-cardinality labels to per-series filtering when a selective equality matcher already narrows the result set. Configured via `-blocks-storage.expanded_postings_cache.head.lazy-matcher-max-cardinality` (disabled by default). #7553
* [ENHANCEMENT] Store Gateway: Resolve the parquet shard count from the bucket index instead of reading the converter mark for each block, reducing object storage calls when the bucket index is enabled. A `component` label is added to the bucket index loader metrics to distinguish store-queryable and store-gateway. #7648
* [ENHANCEMENT] Query Frontend: Improve the slow query log with `source`, `user_agent`, `engine_type`, `block_store_type`, and query stats fields to aid slow query diagnosis. #7601
* [ENHANCEMENT] Ring: Add ring metric to count number of duplicate tokens. #7626
* [ENHANCEMENT] Metrics: Add native histogram support to all remaining production histograms, enabling dual-format (classic + native) exposition across all Cortex components. #7636
Expand Down
5 changes: 3 additions & 2 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -2006,13 +2006,14 @@ blocks_storage:
[enabled: <boolean> | default = true]

# How frequently a bucket index, which previously failed to load, should
# be tried to load again. This option is used only by querier.
# be tried to load again. This option is used by querier and store-gateway
# parquet mode.
# CLI flag: -blocks-storage.bucket-store.bucket-index.update-on-error-interval
[update_on_error_interval: <duration> | default = 1m]

# How long a unused bucket index should be cached. Once this timeout
# expires, the unused bucket index is removed from the in-memory cache.
# This option is used only by querier.
# This option is used by querier and store-gateway parquet mode.
# CLI flag: -blocks-storage.bucket-store.bucket-index.idle-timeout
[idle_timeout: <duration> | default = 1h]

Expand Down
5 changes: 3 additions & 2 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -2064,13 +2064,14 @@ blocks_storage:
[enabled: <boolean> | default = true]

# How frequently a bucket index, which previously failed to load, should
# be tried to load again. This option is used only by querier.
# be tried to load again. This option is used by querier and store-gateway
# parquet mode.
# CLI flag: -blocks-storage.bucket-store.bucket-index.update-on-error-interval
[update_on_error_interval: <duration> | default = 1m]

# How long a unused bucket index should be cached. Once this timeout
# expires, the unused bucket index is removed from the in-memory cache.
# This option is used only by querier.
# This option is used by querier and store-gateway parquet mode.
# CLI flag: -blocks-storage.bucket-store.bucket-index.idle-timeout
[idle_timeout: <duration> | default = 1h]

Expand Down
5 changes: 3 additions & 2 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2650,13 +2650,14 @@ bucket_store:
[enabled: <boolean> | default = true]

# How frequently a bucket index, which previously failed to load, should be
# tried to load again. This option is used only by querier.
# tried to load again. This option is used by querier and store-gateway
# parquet mode.
# CLI flag: -blocks-storage.bucket-store.bucket-index.update-on-error-interval
[update_on_error_interval: <duration> | default = 1m]

# How long a unused bucket index should be cached. Once this timeout
# expires, the unused bucket index is removed from the in-memory cache. This
# option is used only by querier.
# option is used by querier and store-gateway parquet mode.
# CLI flag: -blocks-storage.bucket-store.bucket-index.idle-timeout
[idle_timeout: <duration> | default = 1h]

Expand Down
11 changes: 11 additions & 0 deletions integration/parquet_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,9 +619,20 @@ func TestParquetMultiShardQuery(t *testing.T) {
if tc.viaStoreGateway {
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_querier_storegateway_instances_hit_per_query"}, e2e.WithMetricCount, e2e.SkipMissingMetrics))
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_querier_blocks_consistency_checks_total"}, e2e.SkipMissingMetrics))
// The store-gateway parquet bucket store resolves the shard count via the
// bucket index loader, which is tagged with component="store-gateway".
if tc.extraStoreGateways == 0 {
// Only assert on the single binary when there are no extra replicas, since
// with replication the query may be served by another store-gateway.
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_bucket_index_loads_total"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "component", "store-gateway"))))
}
} else {
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "type", "parquet"))))
// The querier's bucket index loader is tagged with component="store-queryable".
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_bucket_index_loads_total"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "component", "store-queryable"))))
}
})
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa
MaxStalePeriod: storageCfg.BucketStore.BucketIndex.MaxStalePeriod,
IgnoreDeletionMarksDelay: storageCfg.BucketStore.IgnoreDeletionMarksDelay,
IgnoreBlocksWithin: storageCfg.BucketStore.IgnoreBlocksWithin,
}, bucketClient, limits, logger, reg)
}, bucketClient, limits, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "store-queryable"}, reg))
} else {
usersScanner, err := users.NewScanner(storageCfg.UsersScanner, bucketClient, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg))
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,8 @@ type BucketIndexConfig struct {

func (cfg *BucketIndexConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.BoolVar(&cfg.Enabled, prefix+"enabled", true, "True to enable querier and store-gateway to discover blocks in the storage via bucket index instead of bucket scanning. Disabling the bucket index is not recommended for production.")
f.DurationVar(&cfg.UpdateOnErrorInterval, prefix+"update-on-error-interval", time.Minute, "How frequently a bucket index, which previously failed to load, should be tried to load again. This option is used only by querier.")
f.DurationVar(&cfg.IdleTimeout, prefix+"idle-timeout", time.Hour, "How long a unused bucket index should be cached. Once this timeout expires, the unused bucket index is removed from the in-memory cache. This option is used only by querier.")
f.DurationVar(&cfg.UpdateOnErrorInterval, prefix+"update-on-error-interval", time.Minute, "How frequently a bucket index, which previously failed to load, should be tried to load again. This option is used by querier and store-gateway parquet mode.")
f.DurationVar(&cfg.IdleTimeout, prefix+"idle-timeout", time.Hour, "How long a unused bucket index should be cached. Once this timeout expires, the unused bucket index is removed from the in-memory cache. This option is used by querier and store-gateway parquet mode.")
f.DurationVar(&cfg.MaxStalePeriod, prefix+"max-stale-period", time.Hour, "The maximum allowed age of a bucket index (last updated) before queries start failing because the bucket index is too old. The bucket index is periodically updated by the compactor, while this check is enforced in the querier (at query time).")
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type BucketStores interface {
storepb.StoreServer
SyncBlocks(ctx context.Context) error
InitialSync(ctx context.Context) error
Stop() error
}

// ThanosBucketStores is a multi-tenant wrapper of Thanos BucketStore.
Expand Down Expand Up @@ -230,6 +231,9 @@ func (u *ThanosBucketStores) SyncBlocks(ctx context.Context) error {
})
}

// Stop implements BucketStores. ThanosBucketStores has no background services to stop.
func (u *ThanosBucketStores) Stop() error { return nil }

func (u *ThanosBucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(context.Context, *store.BucketStore) error) error {
retries := backoff.New(ctx, backoff.Config{
MinBackoff: 1 * time.Second,
Expand Down
3 changes: 3 additions & 0 deletions pkg/storegateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,9 @@ func (g *StoreGateway) running(ctx context.Context) error {
}

func (g *StoreGateway) stopping(_ error) error {
if g.stores != nil {
_ = g.stores.Stop()
}
if g.subservices != nil {
return services.StopManagerAndAwaitStopped(context.Background(), g.subservices)
}
Expand Down
67 changes: 52 additions & 15 deletions pkg/storegateway/parquet_bucket_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,20 @@ import (
"google.golang.org/grpc/status"

cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/util/parquetutil"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/cortexproject/cortex/pkg/util/validation"
)

type parquetBucketStore struct {
logger log.Logger
bucket objstore.InstrumentedBucket
limits *validation.Overrides
concurrency int
logger log.Logger
bucket objstore.InstrumentedBucket
indexLoader *bucketindex.Loader // nil when bucket index disabled
limits *validation.Overrides
userID string
bucketIndexEnabled bool
concurrency int

chunksDecoder *schema.PrometheusParquetChunksDecoder

Expand Down Expand Up @@ -67,20 +71,15 @@ func (p *parquetBucketStore) findParquetBlocks(ctx context.Context, blockMatcher
bucketOpener := parquet_storage.NewParquetBucketOpener(p.bucket)
noopQuota := search.NewQuota(search.NoopQuotaLimitFunc(ctx))

// Read converter marks and expand to per-shard (blockID, shardID) lists.
// TODO(Sungjin1212): Read the shard count from the bucket index instead of reading the converter mark for each block.
shardCounts, err := p.resolveShardCounts(ctx, blockIDs)
if err != nil {
return nil, err
}

var shardBlockIDs []string
var shardIDs []int
for _, blockID := range blockIDs {
uid, err := ulid.Parse(blockID)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse block ID %s", blockID)
}
marker, err := cortex_parquet.ReadConverterMark(ctx, uid, p.bucket, p.logger)
if err != nil {
return nil, errors.Wrapf(err, "failed to read converter mark for block %s", blockID)
}
numShards := marker.Shards
numShards := shardCounts[blockID]
if numShards <= 0 {
// backward compatibility: blocks without a shard count have one shard
numShards = 1
Expand Down Expand Up @@ -112,6 +111,44 @@ func (p *parquetBucketStore) findParquetBlocks(ctx context.Context, blockMatcher
return parquetBlocks, nil
}

// resolveShardCounts returns the number of parquet shards for each requested block ID.
//
// When the bucket index is enabled, the shard count is read from the bucket index.
// When the bucket index is disabled, it falls back to reading the converter mark
// for each block.
func (p *parquetBucketStore) resolveShardCounts(ctx context.Context, blockIDs []string) (map[string]int, error) {
shardCounts := make(map[string]int, len(blockIDs))

if p.bucketIndexEnabled && p.indexLoader != nil {
idx, _, err := p.indexLoader.GetIndex(ctx, p.userID)
if err != nil {
return nil, errors.Wrap(err, "failed to get bucket index")
}
for _, b := range idx.Blocks {
numShards := 1
if b.Parquet != nil && b.Parquet.Shards > 0 {
numShards = b.Parquet.Shards
}
shardCounts[b.ID.String()] = numShards
}
return shardCounts, nil
}

// Fallback: read the converter mark for each block.
for _, blockID := range blockIDs {
uid, err := ulid.Parse(blockID)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse block ID %s", blockID)
}
marker, err := cortex_parquet.ReadConverterMark(ctx, uid, p.bucket, p.logger)
if err != nil {
return nil, errors.Wrapf(err, "failed to read converter mark for block %s", blockID)
}
shardCounts[blockID] = marker.Shards
}
return shardCounts, nil
}

// Series implements the store interface for a single parquet bucket store
func (p *parquetBucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) {
spanLog, ctx := spanlogger.New(seriesSrv.Context(), "ParquetBucketStore.Series")
Expand Down
47 changes: 39 additions & 8 deletions pkg/storegateway/parquet_bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"sort"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -31,9 +32,11 @@ import (
"github.com/cortexproject/cortex/pkg/querysharding"
"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
cortex_util "github.com/cortexproject/cortex/pkg/util"
cortex_errors "github.com/cortexproject/cortex/pkg/util/errors"
"github.com/cortexproject/cortex/pkg/util/parquetutil"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/cortexproject/cortex/pkg/util/users"
"github.com/cortexproject/cortex/pkg/util/validation"
Expand All @@ -60,6 +63,10 @@ type ParquetBucketStores struct {
rowRangesCache search.RowRangesForConstraintsCache

inflightRequests *cortex_util.InflightRequestTracker

// indexLoader lazily loads and caches the per-tenant bucket index in memory
// It is non-nil only when BucketIndex.Enabled.
indexLoader *bucketindex.Loader
}

// newParquetBucketStores creates a new multi-tenant parquet bucket stores
Expand Down Expand Up @@ -108,6 +115,15 @@ func newParquetBucketStores(cfg tsdb.BlocksStorageConfig, bucketClient objstore.
u.matcherCache = storecache.NoopMatchersCache
}

if cfg.BucketStore.BucketIndex.Enabled {
u.indexLoader = bucketindex.NewLoader(bucketindex.LoaderConfig{
CheckInterval: time.Minute,
UpdateOnStaleInterval: cfg.BucketStore.SyncInterval,
UpdateOnErrorInterval: cfg.BucketStore.BucketIndex.UpdateOnErrorInterval,
IdleTimeout: cfg.BucketStore.BucketIndex.IdleTimeout,
}, bucketClient, limits, logger, reg)
}

return u, nil
}

Expand Down Expand Up @@ -218,6 +234,18 @@ func (u *ParquetBucketStores) SyncBlocks(ctx context.Context) error {

// InitialSync implements BucketStores
func (u *ParquetBucketStores) InitialSync(ctx context.Context) error {
if u.indexLoader != nil {
// Start indexLoader
return services.StartAndAwaitRunning(ctx, u.indexLoader)
}
return nil
}

// Stop implements BucketStores
func (u *ParquetBucketStores) Stop() error {
if u.indexLoader != nil {
return services.StopAndAwaitTerminated(context.Background(), u.indexLoader)
}
return nil
}

Expand Down Expand Up @@ -270,14 +298,17 @@ func (u *ParquetBucketStores) createParquetBucketStore(userID string, userLogger
userBucket := bucket.NewUserBucketClient(userID, u.bucket, u.limits)

store := &parquetBucketStore{
logger: userLogger,
bucket: userBucket,
limits: u.limits,
concurrency: u.cfg.BucketStore.ParquetQueryConcurrency,
chunksDecoder: u.chunksDecoder,
matcherCache: u.matcherCache,
parquetShardCache: u.parquetShardCache,
rowRangesCache: u.rowRangesCache,
logger: userLogger,
bucket: userBucket,
indexLoader: u.indexLoader,
limits: u.limits,
userID: userID,
bucketIndexEnabled: u.cfg.BucketStore.BucketIndex.Enabled,
concurrency: u.cfg.BucketStore.ParquetQueryConcurrency,
chunksDecoder: u.chunksDecoder,
matcherCache: u.matcherCache,
parquetShardCache: u.parquetShardCache,
rowRangesCache: u.rowRangesCache,
}

return store, nil
Expand Down
Loading
Loading