Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add limit for max range query splits by interval #6458

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions pkg/querier/tripperware/queryrange/query_range_middlewares.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package queryrange

import (
"flag"
"math"
"time"

"github.com/go-kit/log"
Expand All @@ -34,11 +35,12 @@ const day = 24 * time.Hour

// Config for query_range middleware chain.
type Config struct {
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"`
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries"`
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"`
SplitQueriesByIntervalMaxSplits int `yaml:"split_queries_by_interval_max_splits"`
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries"`
// List of headers which query_range middleware chain would forward to downstream querier.
ForwardHeaders flagext.StringSlice `yaml:"forward_headers_list"`

Expand All @@ -50,6 +52,7 @@ type Config struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "Maximum number of retries for a single request; beyond this, the downstream error is returned.")
f.DurationVar(&cfg.SplitQueriesByInterval, "querier.split-queries-by-interval", 0, "Split queries by an interval and execute in parallel, 0 disables it. You should use a multiple of 24 hours (same as the storage bucketing scheme), to avoid queriers downloading and processing the same chunks. This also determines how cache keys are chosen when result caching is enabled")
f.IntVar(&cfg.SplitQueriesByIntervalMaxSplits, "querier.split-queries-by-interval-max-splits", 0, "Maximum number of splits by interval for a query, 0 disables it. Uses a multiple of `split-queries-by-interval` to ensure the number of splits remain below the limit.")
f.BoolVar(&cfg.AlignQueriesWithStep, "querier.align-querier-with-step", false, "Mutate incoming queries to align their start and end with their step.")
f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.")
f.Var(&cfg.ForwardHeaders, "frontend.forward-headers-list", "List of headers forwarded by the query Frontend to downstream querier.")
Expand All @@ -66,6 +69,9 @@ func (cfg *Config) Validate(qCfg querier.Config) error {
return errors.Wrap(err, "invalid ResultsCache config")
}
}
if cfg.SplitQueriesByIntervalMaxSplits > 0 && cfg.SplitQueriesByInterval <= 0 {
return errors.New("split-queries-by-interval-max-splits requires that a value for split-queries-by-interval is set.")
}
return nil
}

Expand All @@ -89,8 +95,16 @@ func Middlewares(
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("step_align", metrics), StepAlignMiddleware)
}
if cfg.SplitQueriesByInterval != 0 {
staticIntervalFn := func(_ tripperware.Request) time.Duration { return cfg.SplitQueriesByInterval }
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(staticIntervalFn, limits, prometheusCodec, registerer))
intervalFn := func(_ tripperware.Request) time.Duration { return cfg.SplitQueriesByInterval }
if cfg.SplitQueriesByIntervalMaxSplits != 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the limit be applied to both range splits and vertical spits?

func (s shardBy) Do(ctx context.Context, r Request) (Response, error) {

Copy link
Contributor Author

@afhassan afhassan Dec 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically this sets a limit for the total range and vertical splits for a given query. The number of vertical shards is static, so the max number of of splits for a given query becomes split_queries_by_interval_max_splits x query_vertical_shard_size. Because of this adding a separate limit for vertical sharding when the number of vertical shards is a static config would be redundant because we limit it already.

intervalFn = func(r tripperware.Request) time.Duration {
queryRange := time.Duration((r.GetEnd() - r.GetStart()) * int64(time.Millisecond))
baseInterval := cfg.SplitQueriesByInterval
n := int(math.Ceil(float64(queryRange) / float64(baseInterval*time.Duration(cfg.SplitQueriesByIntervalMaxSplits))))
return time.Duration(n) * cfg.SplitQueriesByInterval
}
}
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(intervalFn, limits, prometheusCodec, registerer))
}

var c cache.Cache
Expand Down
Loading