Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compatibility query fuzz test with results cache enabled #6313

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,23 @@ func NewClient(
alertmanagerAddress string,
rulerAddress string,
orgID string,
) (*Client, error) {
return NewClientWithByPassResultsCache(distributorAddress, querierAddress, alertmanagerAddress, rulerAddress, orgID, false)
}

// NewClientWithByPassResultsCache makes a new Cortex client but allows to bypass query results cache if skipResultsCache set to true.
func NewClientWithByPassResultsCache(
distributorAddress string,
querierAddress string,
alertmanagerAddress string,
rulerAddress string,
orgID string,
skipResultsCache bool,
) (*Client, error) {
// Create querier API client
querierAPIClient, err := promapi.NewClient(promapi.Config{
Address: "http://" + querierAddress + "/api/prom",
RoundTripper: &addOrgIDRoundTripper{orgID: orgID, next: http.DefaultTransport},
RoundTripper: &cacheControlRoundTripper{skipResultsCache: skipResultsCache, next: &addOrgIDRoundTripper{orgID: orgID, next: http.DefaultTransport}},
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -587,6 +599,19 @@ func (r *addOrgIDRoundTripper) RoundTrip(req *http.Request) (*http.Response, err
return r.next.RoundTrip(req)
}

type cacheControlRoundTripper struct {
skipResultsCache bool
next http.RoundTripper
}

func (r *cacheControlRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
if r.skipResultsCache {
req.Header.Set("Cache-Control", "no-store")
}

return r.next.RoundTrip(req)
}

// ServerStatus represents a Alertmanager status response
// TODO: Upgrade to Alertmanager v0.20.0+ and utilize vendored structs
type ServerStatus struct {
Expand Down
187 changes: 181 additions & 6 deletions integration/query_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/thanos-io/thanos/pkg/store/storepb"

"github.com/cortexproject/cortex/integration/e2e"
e2ecache "github.com/cortexproject/cortex/integration/e2e/cache"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
"github.com/cortexproject/cortex/pkg/storage/bucket"
Expand Down Expand Up @@ -143,7 +144,7 @@ func TestDisableChunkTrimmingFuzz(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

waitUntilReady(t, context.Background(), c1, c2, `{job="test"}`, start, now)
waitUntilBothServersReady(t, context.Background(), c1, c2, `{job="test"}`, start, now)

rnd := rand.New(rand.NewSource(now.Unix()))
opts := []promqlsmith.Option{
Expand Down Expand Up @@ -308,7 +309,7 @@ func TestVerticalShardingFuzz(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

waitUntilReady(t, context.Background(), c1, c2, `{job="test"}`, start, end)
waitUntilBothServersReady(t, context.Background(), c1, c2, `{job="test"}`, start, end)

rnd := rand.New(rand.NewSource(now.Unix()))
opts := []promqlsmith.Option{
Expand Down Expand Up @@ -858,7 +859,7 @@ func TestBackwardCompatibilityQueryFuzz(t *testing.T) {
require.Equal(t, 200, res.StatusCode)

ctx := context.Background()
waitUntilReady(t, ctx, c1, c2, `{job="test"}`, start, end)
waitUntilBothServersReady(t, ctx, c1, c2, `{job="test"}`, start, end)

rnd := rand.New(rand.NewSource(now.Unix()))
opts := []promqlsmith.Option{
Expand Down Expand Up @@ -972,7 +973,7 @@ func TestPrometheusCompatibilityQueryFuzz(t *testing.T) {
c2, err := e2ecortex.NewPromQueryClient(prom.HTTPEndpoint())
require.NoError(t, err)

waitUntilReady(t, ctx, c1, c2, `{job="test"}`, start, end)
waitUntilBothServersReady(t, ctx, c1, c2, `{job="test"}`, start, end)

opts := []promqlsmith.Option{
promqlsmith.WithEnableOffset(true),
Expand All @@ -984,8 +985,182 @@ func TestPrometheusCompatibilityQueryFuzz(t *testing.T) {
runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 100)
}

// waitUntilReady is a helper function to wait and check if both servers to test load the expected data.
func waitUntilReady(t *testing.T, ctx context.Context, c1, c2 *e2ecortex.Client, query string, start, end time.Time) {
// TestResultsCacheBackwardCompatibilityQueryFuzz performs following steps:
// 1. Run a range query with time [start, start + 30m] against a Cortex container with previous release image to fill results cache.
// 2. Run the same range query with time [start, end] against a Cortex container with the latest image.
// 3. Run the same range query with time [start, end] but with results cache bypassed and compare the query result got from last step.
func TestResultsCacheBackwardCompatibilityQueryFuzz(t *testing.T) {
// TODO: expose the image tag to be passed from Makefile or Github Action Config.
previousCortexReleaseImage := "quay.io/cortexproject/cortex:v1.18.1"
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul1 := e2edb.NewConsulWithName("consul-1")
consul2 := e2edb.NewConsulWithName("consul-2")
memcached := e2ecache.NewMemcached()
require.NoError(t, s.StartAndWaitReady(consul1, consul2, memcached))

baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
flags := mergeFlags(
baseFlags,
map[string]string{
"-blocks-storage.tsdb.head-compaction-interval": "4m",
"-blocks-storage.tsdb.block-ranges-period": "2h",
"-blocks-storage.tsdb.ship-interval": "1h",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.tsdb.retention-period": "24h",
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
"-querier.query-store-for-labels-enabled": "true",
// Ingester.
"-ring.store": "consul",
// Distributor.
"-distributor.replication-factor": "1",
// Store-gateway.
"-store-gateway.sharding-enabled": "false",
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
"-frontend.query-vertical-shard-size": "2",
"-frontend.max-cache-freshness": "1m",
// Enable results cache.
"-querier.cache-results": "true",
"-querier.split-queries-by-interval": "24h",
"-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
},
)
flags1 := mergeFlags(flags, map[string]string{
"-consul.hostname": consul1.NetworkHTTPEndpoint(),
})
flags2 := mergeFlags(flags, map[string]string{
"-consul.hostname": consul2.NetworkHTTPEndpoint(),
})
// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

cortex1 := e2ecortex.NewSingleBinary("cortex-1", flags1, previousCortexReleaseImage)
cortex2 := e2ecortex.NewSingleBinary("cortex-2", flags2, "yeya24/cortex:disable-chunk-trimming-0663e40bc-arm64")
require.NoError(t, s.StartAndWaitReady(cortex1, cortex2))

// Wait until Cortex replicas have updated the ring state.
require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))

// Check if we're discovering memcache or not.
require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(1), "cortex_memcache_client_servers"))
require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(1), "cortex_memcache_client_servers"))

now := time.Now()
start := now.Add(-time.Hour * 2)
end := now.Add(-time.Hour)
numSeries := 10
numSamples := 60
lbls := make([]labels.Labels, 0, numSeries*2)
scrapeInterval := time.Minute
statusCodes := []string{"200", "400", "404", "500", "502"}
for i := 0; i < numSeries; i++ {
lbls = append(lbls, labels.Labels{
{Name: labels.MetricName, Value: "test_series_a"},
{Name: "job", Value: "test"},
{Name: "series", Value: strconv.Itoa(i % 3)},
{Name: "status_code", Value: statusCodes[i%5]},
})

lbls = append(lbls, labels.Labels{
{Name: labels.MetricName, Value: "test_series_b"},
{Name: "job", Value: "test"},
{Name: "series", Value: strconv.Itoa((i + 1) % 3)},
{Name: "status_code", Value: statusCodes[(i+1)%5]},
})
}

ctx := context.Background()
rnd := rand.New(rand.NewSource(time.Now().Unix()))

dir := filepath.Join(s.SharedDir(), "data")
err = os.MkdirAll(dir, os.ModePerm)
require.NoError(t, err)
storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, err)
bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil)
id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10)
require.NoError(t, err)
err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc)
require.NoError(t, err)

c1, err := e2ecortex.NewClient("", cortex1.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)
c2, err := e2ecortex.NewClient("", cortex2.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)
c2ByPassCache, err := e2ecortex.NewClientWithByPassResultsCache("", cortex2.HTTPEndpoint(), "", "", "user-1", true)
require.NoError(t, err)

waitUntilBothServersReady(t, context.Background(), c1, c2, `{job="test"}`, start, now)

opts := []promqlsmith.Option{
promqlsmith.WithEnableOffset(true),
promqlsmith.WithEnableAtModifier(true),
promqlsmith.WithEnabledFunctions(enabledFunctions),
}
ps := promqlsmith.New(rnd, lbls, opts...)

type testCase struct {
query string
res1, res2 model.Value
err1, err2 error
}

run := 100
step := 5 * time.Minute
cases := make([]*testCase, 0, run)
var (
expr parser.Expr
query string
)
for i := 0; i < run; i++ {
for {
expr = ps.WalkRangeQuery()
if isValidQuery(expr, 5) {
query = expr.Pretty(0)
break
}
}
// We don't care the query results here. Just to fill the cache with partial result.
_, _ = c1.QueryRange(query, start.Add(-2*step), start.Add(time.Minute*30), step)
cases = append(cases, &testCase{
query: query,
})
}

for i := 0; i < run; i++ {
// If not bypassing cache, we expect this query to fetch part of the query results as well as reusing some existing cached result.
cases[i].res1, cases[i].err1 = c2.QueryRange(cases[i].query, start, end, step)
cases[i].res2, cases[i].err2 = c2ByPassCache.QueryRange(cases[i].query, start, end, step)
}

failures := 0
qt := "range query"
for i, tc := range cases {
if tc.err1 != nil || tc.err2 != nil {
if !cmp.Equal(tc.err1, tc.err2) {
t.Logf("case %d error mismatch.\n%s: %s\nerr1: %v\nerr2: %v\n", i, qt, tc.query, tc.err1, tc.err2)
failures++
}
} else if !cmp.Equal(tc.res1, tc.res2, comparer) {
t.Logf("case %d results mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, qt, tc.query, tc.res1.String(), tc.res2.String())
failures++
}
}
if failures > 0 {
require.Failf(t, "finished query fuzzing tests", "%d test cases failed", failures)
}
}

// waitUntilBothServersReady is a helper function to wait and check if both servers to test load the expected data.
func waitUntilBothServersReady(t *testing.T, ctx context.Context, c1, c2 *e2ecortex.Client, query string, start, end time.Time) {
retries := backoff.New(ctx, backoff.Config{
MinBackoff: 5 * time.Second,
MaxBackoff: 10 * time.Second,
Expand Down
Loading