diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f3f71dc90..e3d5573c47 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,10 +8,12 @@ * [CHANGE] Querier: pass query matchers to queryable `IsApplicable` hook. #10256 * [ENHANCEMENT] Distributor: OTLP receiver now converts also metric metadata. See also https://github.com/prometheus/prometheus/pull/15416. #10168 * [ENHANCEMENT] Distributor: discard float and histogram samples with duplicated timestamps from each timeseries in a request before the request is forwarded to ingesters. Discarded samples are tracked by the `cortex_discarded_samples_total` metrics with reason `sample_duplicate_timestamp`. #10145 +* [ENHANCEMENT] Ruler: Add `cortex_prometheus_rule_group_last_rule_duration_sum_seconds` metric to track the total evaluation duration of a rule group regardless of concurrency #10189 * [BUGFIX] Distributor: Use a boolean to track changes while merging the ReplicaDesc components, rather than comparing the objects directly. #10185 * [BUGFIX] Querier: fix timeout responding to query-frontend when response size is very close to `-querier.frontend-client.grpc-max-send-msg-size`. #10154 * [BUGFIX] Ruler: fix indeterminate rules being always run concurrently (instead of never) when `-ruler.max-independent-rule-evaluation-concurrency` is set. https://github.com/prometheus/prometheus/pull/15560 #10258 * [BUGFIX] PromQL: Fix various UTF-8 bugs related to quoting. https://github.com/prometheus/prometheus/pull/15531 #10258 +* [BUGFIX] Ruler: Fixed an issue when using the experimental `-ruler.max-independent-rule-evaluation-concurrency` feature, where if a rule group was eligible for concurrency, it would flap between running concurrently or not based on the time it took after running concurrently. #9726 #10189 ### Mixin diff --git a/pkg/ruler/manager_metrics.go b/pkg/ruler/manager_metrics.go index 9f5a3953fe..784b07340f 100644 --- a/pkg/ruler/manager_metrics.go +++ b/pkg/ruler/manager_metrics.go @@ -25,6 +25,7 @@ type ManagerMetrics struct { GroupInterval *prometheus.Desc GroupLastEvalTime *prometheus.Desc GroupLastDuration *prometheus.Desc + GroupLastRuleDurationSum *prometheus.Desc GroupLastRestoreDuration *prometheus.Desc GroupRules *prometheus.Desc GroupLastEvalSamples *prometheus.Desc @@ -89,6 +90,12 @@ func NewManagerMetrics(logger log.Logger) *ManagerMetrics { []string{"user", "rule_group"}, nil, ), + GroupLastRuleDurationSum: prometheus.NewDesc( + "cortex_prometheus_rule_group_last_rule_duration_sum_seconds", + "The sum of time in seconds it took to evaluate each rule in the group regardless of concurrency. This should be higher than the group duration if rules are evaluated concurrently.", + []string{"user", "rule_group"}, + nil, + ), GroupLastRestoreDuration: prometheus.NewDesc( "cortex_prometheus_rule_group_last_restore_duration_seconds", "The duration of the last alert rules alerts restoration using the `ALERTS_FOR_STATE` series across all rule groups.", @@ -131,6 +138,7 @@ func (m *ManagerMetrics) Describe(out chan<- *prometheus.Desc) { out <- m.GroupInterval out <- m.GroupLastEvalTime out <- m.GroupLastDuration + out <- m.GroupLastRuleDurationSum out <- m.GroupLastRestoreDuration out <- m.GroupRules out <- m.GroupLastEvalSamples @@ -156,6 +164,7 @@ func (m *ManagerMetrics) Collect(out chan<- prometheus.Metric) { data.SendSumOfGaugesPerTenant(out, m.GroupInterval, "prometheus_rule_group_interval_seconds", dskit_metrics.WithLabels("rule_group")) data.SendSumOfGaugesPerTenant(out, m.GroupLastEvalTime, "prometheus_rule_group_last_evaluation_timestamp_seconds", dskit_metrics.WithLabels("rule_group")) data.SendSumOfGaugesPerTenant(out, m.GroupLastDuration, "prometheus_rule_group_last_duration_seconds", dskit_metrics.WithLabels("rule_group")) + data.SendSumOfGaugesPerTenant(out, m.GroupLastRuleDurationSum, "prometheus_rule_group_last_rule_duration_sum_seconds", dskit_metrics.WithLabels("rule_group")) data.SendSumOfGaugesPerTenant(out, m.GroupRules, "prometheus_rule_group_rules", dskit_metrics.WithLabels("rule_group")) data.SendSumOfGaugesPerTenant(out, m.GroupLastEvalSamples, "prometheus_rule_group_last_evaluation_samples", dskit_metrics.WithLabels("rule_group")) } diff --git a/pkg/ruler/rule_concurrency.go b/pkg/ruler/rule_concurrency.go index 964ea9a326..ba4a506c2a 100644 --- a/pkg/ruler/rule_concurrency.go +++ b/pkg/ruler/rule_concurrency.go @@ -190,9 +190,19 @@ func (c *TenantConcurrencyController) Allow(_ context.Context, group *rules.Grou // isGroupAtRisk checks if the rule group's last evaluation time is within the risk threshold. func (c *TenantConcurrencyController) isGroupAtRisk(group *rules.Group) bool { interval := group.Interval().Seconds() - lastEvaluation := group.GetEvaluationTime().Seconds() + runtimeThreshold := interval * c.thresholdRuleConcurrency / 100 - return lastEvaluation >= interval*c.thresholdRuleConcurrency/100 + // If the group evaluation time is greater than the threshold, the group is at risk. + if group.GetEvaluationTime().Seconds() >= runtimeThreshold { + return true + } + + // If the total rule evaluation time is greater than the threshold, the group is at risk. + if group.GetRuleEvaluationTimeSum().Seconds() >= runtimeThreshold { + return true + } + + return false } // isRuleIndependent checks if the rule is independent of other rules. diff --git a/pkg/ruler/rule_concurrency_test.go b/pkg/ruler/rule_concurrency_test.go index ac953d2c4a..64be33715c 100644 --- a/pkg/ruler/rule_concurrency_test.go +++ b/pkg/ruler/rule_concurrency_test.go @@ -5,6 +5,7 @@ package ruler import ( "bytes" "context" + "fmt" "testing" "time" @@ -12,8 +13,10 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/rules" + "github.com/prometheus/prometheus/util/teststorage" "github.com/stretchr/testify/require" "go.uber.org/atomic" "golang.org/x/sync/semaphore" @@ -264,11 +267,51 @@ func TestIsRuleIndependent(t *testing.T) { } func TestGroupAtRisk(t *testing.T) { - exp, err := parser.ParseExpr("vector(1)") - require.NoError(t, err) - rule1 := rules.NewRecordingRule("test", exp, labels.Labels{}) - rule1.SetNoDependencyRules(true) - rule1.SetNoDependentRules(true) + createAndEvalTestGroup := func(interval time.Duration, evalConcurrently bool) *rules.Group { + st := teststorage.New(t) + defer st.Close() + + // Create 100 rules that all take 1ms to evaluate. + var createdRules []rules.Rule + ruleCt := 100 + ruleWaitTime := 1 * time.Millisecond + for i := 0; i < ruleCt; i++ { + q, err := parser.ParseExpr("vector(1)") + require.NoError(t, err) + rule := rules.NewRecordingRule(fmt.Sprintf("test_rule%d", i), q, labels.Labels{}) + rule.SetNoDependencyRules(true) + rule.SetNoDependentRules(true) + createdRules = append(createdRules, rule) + } + + // Create the group and evaluate it + opts := rules.GroupOptions{ + Interval: interval, + Opts: &rules.ManagerOptions{ + Appendable: st, + QueryFunc: func(_ context.Context, _ string, _ time.Time) (promql.Vector, error) { + time.Sleep(ruleWaitTime) + return promql.Vector{}, nil + }, + }, + Rules: createdRules, + } + if evalConcurrently { + opts.Opts.RuleConcurrencyController = &allowAllConcurrencyController{} + } + g := rules.NewGroup(opts) + rules.DefaultEvalIterationFunc(context.Background(), g, time.Now()) + + // Sanity check that we're actually running the rules concurrently. + // The group should take less time than the sum of all rules if we're running them concurrently, more otherwise. + if evalConcurrently { + require.Less(t, g.GetEvaluationTime(), time.Duration(ruleCt)*ruleWaitTime) + } else { + require.Greater(t, g.GetEvaluationTime(), time.Duration(ruleCt)*ruleWaitTime) + } + + return g + } m := newMultiTenantConcurrencyControllerMetrics(prometheus.NewPedanticRegistry()) controller := &TenantConcurrencyController{ @@ -284,44 +327,48 @@ func TestGroupAtRisk(t *testing.T) { } tc := map[string]struct { - group *rules.Group - expected bool + groupInterval time.Duration + evalConcurrently bool + expected bool }{ "group last evaluation greater than interval": { - group: func() *rules.Group { - g := rules.NewGroup(rules.GroupOptions{ - Interval: -1 * time.Minute, - Opts: &rules.ManagerOptions{}, - }) - return g - }(), - expected: true, + // Total runtime: 100x1ms ~ 100ms (run sequentially), > 1ms -> Not at risk + groupInterval: 1 * time.Millisecond, + evalConcurrently: false, + expected: true, }, "group last evaluation less than interval": { - group: func() *rules.Group { - g := rules.NewGroup(rules.GroupOptions{ - Interval: 1 * time.Minute, - Opts: &rules.ManagerOptions{}, - }) - return g - }(), - expected: false, + // Total runtime: 100x1ms ~ 100ms (run sequentially), < 1s -> Not at risk + groupInterval: 1 * time.Second, + evalConcurrently: false, + expected: false, }, - "group last evaluation exactly at concurrency trigger threshold": { - group: func() *rules.Group { - g := rules.NewGroup(rules.GroupOptions{ - Interval: 0 * time.Minute, - Opts: &rules.ManagerOptions{}, - }) - return g - }(), - expected: true, + "group total rule evaluation duration of last evaluation greater than threshold": { + // Total runtime: 100x1ms ~ 100ms, > 50ms -> Group isn't at risk for its runtime, but it is for the sum of all rules. + groupInterval: 50 * time.Millisecond, + evalConcurrently: true, + expected: true, + }, + "group total rule evaluation duration of last evaluation less than threshold": { + // Total runtime: 100x1ms ~ 100ms, < 1s -> Not at risk + groupInterval: 1 * time.Second, + evalConcurrently: true, + expected: false, }, } for name, tt := range tc { t.Run(name, func(t *testing.T) { - require.Equal(t, tt.expected, controller.isGroupAtRisk(tt.group)) + group := createAndEvalTestGroup(tt.groupInterval, tt.evalConcurrently) + require.Equal(t, tt.expected, controller.isGroupAtRisk(group)) }) } } + +type allowAllConcurrencyController struct{} + +func (a *allowAllConcurrencyController) Allow(_ context.Context, _ *rules.Group, _ rules.Rule) bool { + return true +} + +func (a *allowAllConcurrencyController) Done(_ context.Context) {}