Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add limit for max range query splits by interval #6458

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4193,6 +4193,13 @@ The `query_range_config` configures the query splitting and caching in the Corte
# CLI flag: -querier.split-queries-by-interval
[split_queries_by_interval: <duration> | default = 0s]

# Maximum number of splits for a range query, 0 disables it. Uses a multiple of
# `split-queries-by-interval` to maintain the number of splits below the limit.
# If vertical sharding is enabled for a query, the combined total number of
# vertical and interval shards is kept below this limit
# CLI flag: -querier.split-queries-by-interval-max-splits
[split_queries_by_interval_max_splits: <int> | default = 0]

# Mutate incoming queries to align their start and end with their step.
# CLI flag: -querier.align-querier-with-step
[align_queries_with_step: <boolean> | default = false]
Expand Down
22 changes: 15 additions & 7 deletions pkg/querier/tripperware/queryrange/query_range_middlewares.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ const day = 24 * time.Hour

// Config for query_range middleware chain.
type Config struct {
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"`
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries"`
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"`
SplitQueriesByIntervalMaxSplits int `yaml:"split_queries_by_interval_max_splits"`
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries"`
// List of headers which query_range middleware chain would forward to downstream querier.
ForwardHeaders flagext.StringSlice `yaml:"forward_headers_list"`

Expand All @@ -50,6 +51,7 @@ type Config struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "Maximum number of retries for a single request; beyond this, the downstream error is returned.")
f.DurationVar(&cfg.SplitQueriesByInterval, "querier.split-queries-by-interval", 0, "Split queries by an interval and execute in parallel, 0 disables it. You should use a multiple of 24 hours (same as the storage bucketing scheme), to avoid queriers downloading and processing the same chunks. This also determines how cache keys are chosen when result caching is enabled")
f.IntVar(&cfg.SplitQueriesByIntervalMaxSplits, "querier.split-queries-by-interval-max-splits", 0, "Maximum number of splits for a range query, 0 disables it. Uses a multiple of `split-queries-by-interval` to maintain the number of splits below the limit. If vertical sharding is enabled for a query, the combined total number of vertical and interval shards is kept below this limit")
f.BoolVar(&cfg.AlignQueriesWithStep, "querier.align-querier-with-step", false, "Mutate incoming queries to align their start and end with their step.")
f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.")
f.Var(&cfg.ForwardHeaders, "frontend.forward-headers-list", "List of headers forwarded by the query Frontend to downstream querier.")
Expand All @@ -66,6 +68,9 @@ func (cfg *Config) Validate(qCfg querier.Config) error {
return errors.Wrap(err, "invalid ResultsCache config")
}
}
if cfg.SplitQueriesByIntervalMaxSplits > 0 && cfg.SplitQueriesByInterval <= 0 {
return errors.New("split-queries-by-interval-max-splits requires that a value for split-queries-by-interval is set.")
}
return nil
}

Expand All @@ -89,8 +94,11 @@ func Middlewares(
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("step_align", metrics), StepAlignMiddleware)
}
if cfg.SplitQueriesByInterval != 0 {
staticIntervalFn := func(_ tripperware.Request) time.Duration { return cfg.SplitQueriesByInterval }
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(staticIntervalFn, limits, prometheusCodec, registerer))
intervalFn := staticIntervalFn(cfg)
if cfg.SplitQueriesByIntervalMaxSplits != 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the limit be applied to both range splits and vertical spits?

func (s shardBy) Do(ctx context.Context, r Request) (Response, error) {

Copy link
Contributor Author

@afhassan afhassan Dec 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically this sets a limit for the total range and vertical splits for a given query. The number of vertical shards is static, so the max number of of splits for a given query becomes split_queries_by_interval_max_splits x query_vertical_shard_size. Because of this adding a separate limit for vertical sharding when the number of vertical shards is a static config would be redundant because we limit it already.

intervalFn = dynamicIntervalFn(cfg, queryAnalyzer)
}
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(intervalFn, limits, prometheusCodec, registerer))
}

var c cache.Cache
Expand Down
23 changes: 23 additions & 0 deletions pkg/querier/tripperware/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/promql/parser"
"github.com/thanos-io/thanos/pkg/querysharding"
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/querier/tripperware"
Expand Down Expand Up @@ -135,3 +136,25 @@ func nextIntervalBoundary(t, step int64, interval time.Duration) int64 {
}
return target
}

func staticIntervalFn(cfg Config) func(r tripperware.Request) time.Duration {
return func(_ tripperware.Request) time.Duration {
return cfg.SplitQueriesByInterval
}
}

func dynamicIntervalFn(cfg Config, queryAnalyzer querysharding.Analyzer) func(r tripperware.Request) time.Duration {
return func(r tripperware.Request) time.Duration {
queryRange := time.Duration((r.GetEnd() - r.GetStart()) * int64(time.Millisecond))
baseInterval := cfg.SplitQueriesByInterval

analysis, _ := queryAnalyzer.Analyze(r.GetQuery())
afhassan marked this conversation as resolved.
Show resolved Hide resolved
maxSplits := time.Duration(cfg.SplitQueriesByIntervalMaxSplits)
if cfg.VerticalShardSize > 0 && analysis.IsShardable() {
maxSplits = time.Duration(cfg.SplitQueriesByIntervalMaxSplits / cfg.VerticalShardSize)
}

n := (queryRange + baseInterval*maxSplits - 1) / (baseInterval * maxSplits)
return n * cfg.SplitQueriesByInterval
}
}
Loading