Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query-tee: Add an option to skip samples for comparison before a given timestamp #9515

Merged
merged 12 commits into from
Oct 9, 2024
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@

### Query-tee

* [FEATURE] Added `-proxy.compare-skip-samples-before` to skip samples before the given time when comparing responses. The time can be in RFC3339 format (or) RFC3339 without the timezone and seconds (or) date only. #9515

### Documentation

### Tools
Expand Down
3 changes: 3 additions & 0 deletions cmd/query-tee/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import (
"fmt"
"io"
"os"
"time"

"github.com/go-kit/log/level"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/log"
"github.com/grafana/dskit/tracing"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/common/model"
jaegercfg "github.com/uber/jaeger-client-go/config"

"github.com/grafana/mimir/pkg/util/instrumentation"
Expand Down Expand Up @@ -106,6 +108,7 @@ func mimirReadRoutes(cfg Config) []querytee.Route {
Tolerance: cfg.ProxyConfig.ValueComparisonTolerance,
UseRelativeError: cfg.ProxyConfig.UseRelativeError,
SkipRecentSamples: cfg.ProxyConfig.SkipRecentSamples,
SkipSamplesBefore: model.Time(time.Time(cfg.ProxyConfig.SkipSamplesBefore).UnixMilli()),
RequireExactErrorMatch: cfg.ProxyConfig.RequireExactErrorMatch,
})

Expand Down
3 changes: 3 additions & 0 deletions tools/querytee/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/server"
"github.com/grafana/dskit/spanlogger"
"github.com/pkg/errors"
Expand All @@ -39,6 +40,7 @@ type ProxyConfig struct {
UseRelativeError bool
PassThroughNonRegisteredRoutes bool
SkipRecentSamples time.Duration
SkipSamplesBefore flagext.Time
RequireExactErrorMatch bool
BackendSkipTLSVerify bool
AddMissingTimeParamToInstantQueries bool
Expand All @@ -65,6 +67,7 @@ func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) {
f.Float64Var(&cfg.ValueComparisonTolerance, "proxy.value-comparison-tolerance", 0.000001, "The tolerance to apply when comparing floating point values in the responses. 0 to disable tolerance and require exact match (not recommended).")
f.BoolVar(&cfg.UseRelativeError, "proxy.compare-use-relative-error", false, "Use relative error tolerance when comparing floating point values.")
f.DurationVar(&cfg.SkipRecentSamples, "proxy.compare-skip-recent-samples", 2*time.Minute, "The window from now to skip comparing samples. 0 to disable.")
f.Var(&cfg.SkipSamplesBefore, "proxy.compare-skip-samples-before", "Skip the samples before the given time for comparison. The time can be in RFC3339 format (or) RFC3339 without the timezone and seconds (or) date only.")
f.BoolVar(&cfg.RequireExactErrorMatch, "proxy.compare-exact-error-matching", false, "If true, errors will be considered the same only if they are exactly the same. If false, errors will be considered the same if they are considered equivalent.")
f.BoolVar(&cfg.PassThroughNonRegisteredRoutes, "proxy.passthrough-non-registered-routes", false, "Passthrough requests for non-registered routes to preferred backend.")
f.BoolVar(&cfg.AddMissingTimeParamToInstantQueries, "proxy.add-missing-time-parameter-to-instant-queries", true, "Add a 'time' parameter to proxied instant query requests if they do not have one.")
Expand Down
105 changes: 90 additions & 15 deletions tools/querytee/response_comparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type SampleComparisonOptions struct {
Tolerance float64
UseRelativeError bool
SkipRecentSamples time.Duration
SkipSamplesBefore model.Time
RequireExactErrorMatch bool
}

Expand Down Expand Up @@ -220,7 +221,7 @@ func compareMatrix(expectedRaw, actualRaw json.RawMessage, queryEvaluationTime t
return err
}

if allMatrixSamplesWithinRecentSampleWindow(expected, queryEvaluationTime, opts) && allMatrixSamplesWithinRecentSampleWindow(actual, queryEvaluationTime, opts) {
if allMatrixSamplesOutsideComparableWindow(expected, queryEvaluationTime, opts) && allMatrixSamplesOutsideComparableWindow(actual, queryEvaluationTime, opts) {
return nil
}

Expand Down Expand Up @@ -250,6 +251,19 @@ func compareMatrix(expectedRaw, actualRaw json.RawMessage, queryEvaluationTime t
}

func compareMatrixSamples(expected, actual *model.SampleStream, queryEvaluationTime time.Time, opts SampleComparisonOptions) error {
expected.Values = trimBeginning(expected.Values, func(p model.SamplePair) bool {
return p.Timestamp < opts.SkipSamplesBefore
})
actual.Values = trimBeginning(actual.Values, func(p model.SamplePair) bool {
return p.Timestamp < opts.SkipSamplesBefore
})
expected.Histograms = trimBeginning(expected.Histograms, func(p model.SampleHistogramPair) bool {
return p.Timestamp < opts.SkipSamplesBefore
})
actual.Histograms = trimBeginning(actual.Histograms, func(p model.SampleHistogramPair) bool {
return p.Timestamp < opts.SkipSamplesBefore
})

expectedSamplesTail, actualSamplesTail, err := comparePairs(expected.Values, actual.Values, func(p1 model.SamplePair, p2 model.SamplePair) error {
err := compareSamplePair(p1, p2, queryEvaluationTime, opts)
if err != nil {
Expand Down Expand Up @@ -281,15 +295,15 @@ func compareMatrixSamples(expected, actual *model.SampleStream, queryEvaluationT
return nil
}

skipAllRecentFloatSamples := canSkipAllSamples(func(p model.SamplePair) bool {
return queryEvaluationTime.Sub(p.Timestamp.Time())-opts.SkipRecentSamples < 0
skipAllFloatSamples := canSkipAllSamples(func(p model.SamplePair) bool {
return queryEvaluationTime.Sub(p.Timestamp.Time())-opts.SkipRecentSamples < 0 || p.Timestamp < opts.SkipSamplesBefore
}, expectedSamplesTail, actualSamplesTail)

skipAllRecentHistogramSamples := canSkipAllSamples(func(p model.SampleHistogramPair) bool {
return queryEvaluationTime.Sub(p.Timestamp.Time())-opts.SkipRecentSamples < 0
skipAllHistogramSamples := canSkipAllSamples(func(p model.SampleHistogramPair) bool {
return queryEvaluationTime.Sub(p.Timestamp.Time())-opts.SkipRecentSamples < 0 || p.Timestamp < opts.SkipSamplesBefore
}, expectedHistogramSamplesTail, actualHistogramSamplesTail)

if skipAllRecentFloatSamples && skipAllRecentHistogramSamples {
if skipAllFloatSamples && skipAllHistogramSamples {
return nil
}

Expand Down Expand Up @@ -345,6 +359,28 @@ func comparePairs[S ~[]M, M any](s1, s2 S, compare func(M, M) error) (S, S, erro
return s1[i:], s2[i:], nil
}

// trimBeginning returns s without the prefix that satisfies skip().
func trimBeginning[S ~[]M, M any](s S, skip func(M) bool) S {
var i int
for ; i < len(s); i++ {
if !skip(s[i]) {
break
}
}
return s[i:]
}

// filterSlice returns a new slice with elements from s removed that satisfy skip().
func filterSlice[S ~[]M, M any](s S, skip func(M) bool) S {
res := make(S, 0, len(s))
for i := 0; i < len(s); i++ {
if !skip(s[i]) {
res = append(res, s[i])
}
}
return res
}

func canSkipAllSamples[S ~[]M, M any](skip func(M) bool, ss ...S) bool {
for _, s := range ss {
for _, p := range s {
Expand All @@ -356,20 +392,22 @@ func canSkipAllSamples[S ~[]M, M any](skip func(M) bool, ss ...S) bool {
return true
}

func allMatrixSamplesWithinRecentSampleWindow(m model.Matrix, queryEvaluationTime time.Time, opts SampleComparisonOptions) bool {
if opts.SkipRecentSamples == 0 {
func allMatrixSamplesOutsideComparableWindow(m model.Matrix, queryEvaluationTime time.Time, opts SampleComparisonOptions) bool {
if opts.SkipRecentSamples == 0 && opts.SkipSamplesBefore == 0 {
return false
}

for _, series := range m {
for _, sample := range series.Values {
if queryEvaluationTime.Sub(sample.Timestamp.Time()) > opts.SkipRecentSamples {
st := sample.Timestamp
if queryEvaluationTime.Sub(st.Time()) > opts.SkipRecentSamples && st >= opts.SkipSamplesBefore {
return false
}
}

for _, sample := range series.Histograms {
if queryEvaluationTime.Sub(sample.Timestamp.Time()) > opts.SkipRecentSamples {
st := sample.Timestamp
if queryEvaluationTime.Sub(st.Time()) > opts.SkipRecentSamples && st >= opts.SkipSamplesBefore {
return false
}
}
Expand All @@ -378,7 +416,7 @@ func allMatrixSamplesWithinRecentSampleWindow(m model.Matrix, queryEvaluationTim
return true
}

func compareVector(expectedRaw, actualRaw json.RawMessage, queryEvaluationTime time.Time, opts SampleComparisonOptions) error {
func compareVector(expectedRaw, actualRaw json.RawMessage, queryEvaluationTime time.Time, opts SampleComparisonOptions) (retErr error) {
var expected, actual model.Vector

err := json.Unmarshal(expectedRaw, &expected)
Expand All @@ -391,10 +429,36 @@ func compareVector(expectedRaw, actualRaw json.RawMessage, queryEvaluationTime t
return err
}

if allVectorSamplesWithinRecentSampleWindow(expected, queryEvaluationTime, opts) && allVectorSamplesWithinRecentSampleWindow(actual, queryEvaluationTime, opts) {
if allVectorSamplesOutsideComparableWindow(expected, queryEvaluationTime, opts) && allVectorSamplesOutsideComparableWindow(actual, queryEvaluationTime, opts) {
return nil
}

if opts.SkipSamplesBefore > 0 {
// Warning: filtering samples can give out confusing error messages. For example if the actual response had
// matching series, but all sample timestamps were before SkipSamplesBefore, while expected response had samples
// after SkipSamplesBefore: instead of saying mismatch, it will instead say that some metrics is missing, because
// we filter them here.
eOrgLen, aOrgLen := len(expected), len(actual)
expected = filterSlice(expected, func(p *model.Sample) bool { return p.Timestamp < opts.SkipSamplesBefore })
actual = filterSlice(actual, func(p *model.Sample) bool { return p.Timestamp < opts.SkipSamplesBefore })
eChanged, aChanged := len(expected) != eOrgLen, len(actual) != aOrgLen
defer func() {
if retErr != nil {
warning := ""
if eChanged && aChanged {
warning = " (also, some samples were filtered out from the expected and actual response due to the 'skip samples before'; if all samples have been filtered out, this could cause the check on the expected number of metrics to fail)"
} else if aChanged {
warning = " (also, some samples were filtered out from the actual response due to the 'skip samples before'; if all samples have been filtered out, this could cause the check on the expected number of metrics to fail)"
} else if eChanged {
warning = " (also, some samples were filtered out from the expected response due to the 'skip samples before'; if all samples have been filtered out, this could cause the check on the expected number of metrics to fail)"
}
if warning != "" {
retErr = fmt.Errorf("%w%s", retErr, warning)
}
}
}()
}

if len(expected) != len(actual) {
return fmt.Errorf("expected %d metrics but got %d", len(expected), len(actual))
}
Expand Down Expand Up @@ -454,13 +518,14 @@ func compareVector(expectedRaw, actualRaw json.RawMessage, queryEvaluationTime t
return nil
}

func allVectorSamplesWithinRecentSampleWindow(v model.Vector, queryEvaluationTime time.Time, opts SampleComparisonOptions) bool {
if opts.SkipRecentSamples == 0 {
func allVectorSamplesOutsideComparableWindow(v model.Vector, queryEvaluationTime time.Time, opts SampleComparisonOptions) bool {
if opts.SkipRecentSamples == 0 && opts.SkipSamplesBefore == 0 {
return false
}

for _, sample := range v {
if queryEvaluationTime.Sub(sample.Timestamp.Time()) > opts.SkipRecentSamples {
st := sample.Timestamp
if queryEvaluationTime.Sub(st.Time()) > opts.SkipRecentSamples && st >= opts.SkipSamplesBefore {
return false
}
}
Expand Down Expand Up @@ -495,6 +560,12 @@ func compareScalar(expectedRaw, actualRaw json.RawMessage, queryEvaluationTime t
}

func compareSamplePair(expected, actual model.SamplePair, queryEvaluationTime time.Time, opts SampleComparisonOptions) error {
// If the timestamp is before the configured SkipSamplesBefore then we don't even check if the timestamp is correct.
// The reason is that the SkipSamplesBefore feature may be used to compare queries hitting a different storage and one of two storages has no historical data.
if expected.Timestamp < opts.SkipSamplesBefore && actual.Timestamp < opts.SkipSamplesBefore {
return nil
}
codesome marked this conversation as resolved.
Show resolved Hide resolved

if expected.Timestamp != actual.Timestamp {
return fmt.Errorf("expected timestamp %v but got %v", expected.Timestamp, actual.Timestamp)
}
Expand Down Expand Up @@ -523,6 +594,10 @@ func compareSampleValue(first, second float64, opts SampleComparisonOptions) boo
}

func compareSampleHistogramPair(expected, actual model.SampleHistogramPair, queryEvaluationTime time.Time, opts SampleComparisonOptions) error {
if expected.Timestamp < opts.SkipSamplesBefore && actual.Timestamp < opts.SkipSamplesBefore {
return nil
}

if expected.Timestamp != actual.Timestamp {
return fmt.Errorf("expected timestamp %v but got %v", expected.Timestamp, actual.Timestamp)
}
Expand Down
Loading
Loading