Skip to content

Commit

Permalink
MQE: track number of processed samples in each query
Browse files Browse the repository at this point in the history
  • Loading branch information
charleskorn committed Dec 13, 2024
1 parent 33f8016 commit 344bfc7
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 4 deletions.
109 changes: 109 additions & 0 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2807,3 +2807,112 @@ func TestCompareVariousMixedMetricsComparisonOps(t *testing.T) {

runMixedMetricsTests(t, expressions, pointsPerSeries, seriesData, false)
}

func TestQueryStats(t *testing.T) {
opts := NewTestEngineOpts()
mimirEngine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger())
require.NoError(t, err)

prometheusEngine := promql.NewEngine(opts.CommonOpts)

start := timestamp.Time(0)
end := start.Add(10 * time.Minute)

storage := promqltest.LoadedStorage(t, `
load 1m
dense_series 0 1 2 3 4 5 6 7 8 9 10
start_series 0 1 _ _ _ _ _ _ _ _ _
end_series _ _ _ _ _ 5 6 7 8 9 10
sparse_series 0 _ _ _ _ _ _ 7 _ _ _
`)

runQueryAndGetTotalSamples := func(t *testing.T, engine promql.QueryEngine, expr string, isInstantQuery bool) int64 {
var q promql.Query
var err error

if isInstantQuery {
q, err = engine.NewInstantQuery(context.Background(), storage, nil, expr, end)
} else {
q, err = engine.NewRangeQuery(context.Background(), storage, nil, expr, start, end, time.Minute)
}

require.NoError(t, err)

defer q.Close()

res := q.Exec(context.Background())
require.NoError(t, res.Err)

return q.Stats().Samples.TotalSamples
}

testCases := map[string]struct {
expr string
isInstantQuery bool
expectedTotalSamples int64
}{
"instant vector selector with point at every time step": {
expr: `dense_series{}`,
expectedTotalSamples: 11,
},
"instant vector selector with points only in start of time range": {
expr: `start_series{}`,
expectedTotalSamples: 2 + 4, // 2 for original points, plus 4 for lookback to last point.
},
"instant vector selector with points only at end of time range": {
expr: `end_series{}`,
expectedTotalSamples: 6,
},
"instant vector selector with sparse points": {
expr: `sparse_series{}`,
expectedTotalSamples: 5 + 4, // 5 for first point at T=0, and 4 for second point at T=7
},

"raw range vector selector with single point": {
expr: `dense_series[45s]`,
isInstantQuery: true,
expectedTotalSamples: 1,
},
"raw range vector selector with multiple points": {
expr: `dense_series[3m45s]`,
isInstantQuery: true,
expectedTotalSamples: 4,
},

"range vector selector with point at every time step": {
expr: `sum_over_time(dense_series{}[30s])`,
expectedTotalSamples: 11,
},
"range vector selector with points only in start of time range": {
expr: `sum_over_time(start_series{}[30s])`,
expectedTotalSamples: 2,
},
"range vector selector with points only at end of time range": {
expr: `sum_over_time(end_series{}[30s])`,
expectedTotalSamples: 6,
},
"range vector selector with sparse points": {
expr: `sum_over_time(sparse_series{}[30s])`,
expectedTotalSamples: 2,
},
"range vector selector where range overlaps previous step's range": {
expr: `sum_over_time(dense_series{}[1m30s])`,
expectedTotalSamples: 21, // Each step except the first selects two points.
},

"expression with multiple selectors": {
expr: `dense_series{} + end_series{}`,
expectedTotalSamples: 11 + 6,
},
}

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
prometheusCount := runQueryAndGetTotalSamples(t, prometheusEngine, testCase.expr, testCase.isInstantQuery)
require.Equal(t, testCase.expectedTotalSamples, prometheusCount, "invalid test case: expected samples does not match value from Prometheus' engine")

mimirCount := runQueryAndGetTotalSamples(t, mimirEngine, testCase.expr, testCase.isInstantQuery)
require.Equal(t, testCase.expectedTotalSamples, mimirCount)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
type InstantVectorSelector struct {
Selector *Selector
MemoryConsumptionTracker *limiting.MemoryConsumptionTracker
Stats *types.QueryStats

chunkIterator chunkenc.Iterator
memoizedIterator *storage.MemoizedSeriesIterator
Expand Down Expand Up @@ -119,6 +120,8 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe
continue
}

v.Stats.TotalSamples++

// if (f, h) have been set by PeekPrev, we do not know if f is 0 because that's the actual value, or because
// the previous value had a histogram.
// PeekPrev will set the histogram to nil, or the value to 0 if the other type exists.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) {
LookbackDelta: 5 * time.Minute,
},
MemoryConsumptionTracker: limiting.NewMemoryConsumptionTracker(0, nil),
Stats: &types.QueryStats{},
}

ctx := context.Background()
Expand Down Expand Up @@ -239,6 +240,7 @@ func TestInstantVectorSelector_SliceSizing(t *testing.T) {
LookbackDelta: 5 * time.Minute,
},
MemoryConsumptionTracker: limiting.NewMemoryConsumptionTracker(0, nil),
Stats: &types.QueryStats{},
}

ctx := context.Background()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

type RangeVectorSelector struct {
Selector *Selector
Stats *types.QueryStats

rangeMilliseconds int64
chunkIterator chunkenc.Iterator
Expand All @@ -32,9 +33,10 @@ type RangeVectorSelector struct {

var _ types.RangeVectorOperator = &RangeVectorSelector{}

func NewRangeVectorSelector(selector *Selector, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) *RangeVectorSelector {
func NewRangeVectorSelector(selector *Selector, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, stats *types.QueryStats) *RangeVectorSelector {
return &RangeVectorSelector{
Selector: selector,
Stats: stats,
floats: types.NewFPointRingBuffer(memoryConsumptionTracker),
histograms: types.NewHPointRingBuffer(memoryConsumptionTracker),
stepData: &types.RangeVectorStepData{},
Expand Down Expand Up @@ -102,6 +104,8 @@ func (m *RangeVectorSelector) NextStepSamples() (*types.RangeVectorStepData, err
m.stepData.RangeStart = rangeStart
m.stepData.RangeEnd = rangeEnd

m.Stats.TotalSamples += int64(m.stepData.Floats.Count() + m.stepData.Histograms.Count())

return m.stepData, nil
}

Expand Down
13 changes: 10 additions & 3 deletions pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Query struct {
cancel context.CancelCauseFunc
memoryConsumptionTracker *limiting.MemoryConsumptionTracker
annotations *annotations.Annotations
stats *types.QueryStats

// Time range of the top-level query.
// Subqueries may use a different range.
Expand Down Expand Up @@ -79,6 +80,7 @@ func newQuery(ctx context.Context, queryable storage.Queryable, opts promql.Quer
qs: qs,
memoryConsumptionTracker: limiting.NewMemoryConsumptionTracker(maxEstimatedMemoryConsumptionPerQuery, engine.queriesRejectedDueToPeakMemoryConsumption),
annotations: annotations.New(),
stats: &types.QueryStats{},

statement: &parser.EvalStmt{
Expr: expr,
Expand Down Expand Up @@ -164,6 +166,7 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr, timeRange types

ExpressionPosition: e.PositionRange(),
},
Stats: q.stats,
}, nil
case *parser.AggregateExpr:
if !q.engine.featureToggles.EnableAggregationOperations {
Expand Down Expand Up @@ -343,7 +346,7 @@ func (q *Query) convertToRangeVectorOperator(expr parser.Expr, timeRange types.Q
ExpressionPosition: e.PositionRange(),
}

return selectors.NewRangeVectorSelector(selector, q.memoryConsumptionTracker), nil
return selectors.NewRangeVectorSelector(selector, q.memoryConsumptionTracker, q.stats), nil

case *parser.SubqueryExpr:
if !q.engine.featureToggles.EnableSubqueries {
Expand Down Expand Up @@ -829,8 +832,12 @@ func (q *Query) Statement() parser.Statement {
}

func (q *Query) Stats() *stats.Statistics {
// Not yet supported.
return nil
return &stats.Statistics{
Timers: stats.NewQueryTimers(),
Samples: &stats.QuerySamples{
TotalSamples: q.stats.TotalSamples,
},
}
}

func (q *Query) Cancel() {
Expand Down
18 changes: 18 additions & 0 deletions pkg/streamingpromql/types/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/util/stats/query_stats.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors

package types

// QueryStats tracks statistics about the execution of a single query.
//
// It is not safe to use this type from multiple goroutines simultaneously.
type QueryStats struct {
// The total number of samples processed during the query.
//
// In the case of range vector selectors, each sample is counted once for each time step it appears in.
// For example, if a query is running with a step of 30s with a range vector selector with range 45s,
// then samples in the overlapping 15s are counted twice.
TotalSamples int64
}

0 comments on commit 344bfc7

Please sign in to comment.