diff --git a/CHANGELOG.md b/CHANGELOG.md index 8ddc55794d8..16c92bba1d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -86,6 +86,7 @@ * [BUGFIX] Fix pooling buffer reuse logic when `-distributor.max-request-pool-buffer-size` is set. #9666 * [BUGFIX] Fix issue when using the experimental `-ruler.max-independent-rule-evaluation-concurrency` feature, where the ruler could panic as it updates a running ruleset or shutdowns. #9726 * [BUGFIX] Always return unknown hint for first sample in non-gauge native histograms chunk to avoid incorrect counter reset hints when merging chunks from different sources. #10033 +* [BUGFIX] Ensure native histograms counter reset hints are corrected when merging results from different sources. #9909 * [BUGFIX] Ingester: Fix race condition in per-tenant TSDB creation. #9708 * [BUGFIX] Ingester: Fix race condition in exemplar adding. #9765 * [BUGFIX] Ingester: Fix race condition in native histogram appending. #9765 diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index fea6b13f764..e92b19b7f75 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -44,6 +44,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" @@ -5749,6 +5750,182 @@ func TestIngester_QueryExemplars(t *testing.T) { }) } +// This test shows a single ingester returns compacted OOO and in-order chunks separately after compaction, even if they overlap. +func TestIngester_QueryStream_CounterResets(t *testing.T) { + // Create ingester. + cfg := defaultIngesterTestConfig(t) + cfg.BlocksStorageConfig.TSDB.HeadCompactionInterval = 1 * time.Hour // Long enough to not be reached during the test. + cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout = 1 * time.Second + cfg.BlocksStorageConfig.TSDB.HeadCompactionIntervalJitterEnabled = false + cfg.TSDBConfigUpdatePeriod = 1 * time.Second + + // Set the OOO window to 30 minutes and enable native histograms. + limits := map[string]*validation.Limits{ + userID: { + OutOfOrderTimeWindow: model.Duration(30 * time.Minute), + OOONativeHistogramsIngestionEnabled: true, + NativeHistogramsIngestionEnabled: true, + }, + } + override, err := validation.NewOverrides(defaultLimitsTestConfig(), validation.NewMockTenantLimits(limits)) + require.NoError(t, err) + + i, err := prepareIngesterWithBlockStorageAndOverrides(t, cfg, override, nil, "", "", nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Wait until it's healthy. + test.Poll(t, 1*time.Second, 1, func() interface{} { + return i.lifecycler.HealthyInstancesCount() + }) + + // Push series. + ctx := user.InjectOrgID(context.Background(), userID) + + histLbls := labels.FromStrings(labels.MetricName, "foo", "series_id", strconv.Itoa(0), "type", "histogram") + histReq := mockHistogramWriteRequest(histLbls, int64(0), 4, false) + _, err = i.Push(ctx, histReq) + require.NoError(t, err) + + histReq = mockHistogramWriteRequest(histLbls, int64(2), 6, false) + _, err = i.Push(ctx, histReq) + require.NoError(t, err) + + histReq = mockHistogramWriteRequest(histLbls, int64(4), 8, false) + _, err = i.Push(ctx, histReq) + require.NoError(t, err) + + histReq = mockHistogramWriteRequest(histLbls, int64(1), 2, false) + _, err = i.Push(ctx, histReq) + require.NoError(t, err) + + histReq = mockHistogramWriteRequest(histLbls, int64(3), 3, false) + _, err = i.Push(ctx, histReq) + require.NoError(t, err) + + // Create a GRPC server used to query back the data. + serv := grpc.NewServer(grpc.StreamInterceptor(middleware.StreamServerUserHeaderInterceptor)) + defer serv.GracefulStop() + client.RegisterIngesterServer(serv, i) + + listener, err := net.Listen("tcp", "localhost:0") + require.NoError(t, err) + + go func() { + require.NoError(t, serv.Serve(listener)) + }() + + inst := ring.InstanceDesc{Id: "test", Addr: listener.Addr().String()} + c, err := client.MakeIngesterClient(inst, defaultClientTestConfig(), client.NewMetrics(nil)) + require.NoError(t, err) + defer c.Close() + + runQuery := func() ([]chunkenc.CounterResetHeader, [][]sample) { + s, err := c.QueryStream(ctx, &client.QueryRequest{ + StartTimestampMs: 0, + EndTimestampMs: 5, + + Matchers: []*client.LabelMatcher{{ + Type: client.EQUAL, + Name: model.MetricNameLabel, + Value: "foo", + }}, + }) + require.NoError(t, err) + + recvMsgs := 0 + + chunks := []client.Chunk{} + for { + resp, err := s.Recv() + if errors.Is(err, io.EOF) { + break + } + require.NoError(t, err) + + for _, c := range resp.Chunkseries { + chunks = append(chunks, c.Chunks...) + } + recvMsgs++ + } + + require.Equal(t, recvMsgs, 1) + // Sort chunks by time + sort.Slice(chunks, func(i, j int) bool { + return chunks[i].StartTimestampMs < chunks[j].StartTimestampMs + }) + + headers := []chunkenc.CounterResetHeader{} + var samples [][]sample + for _, c := range chunks { + require.Equal(t, c.Encoding, int32(chunk.PrometheusHistogramChunk)) + chk, err := chunkenc.FromData(chunkenc.EncHistogram, c.Data) + require.NoError(t, err) + + s := []sample{} + it := chk.Iterator(nil) + for it.Next() != chunkenc.ValNone { + ts, h := it.AtHistogram(nil) + s = append(s, sample{t: ts, h: h}) + } + samples = append(samples, s) + headers = append(headers, chk.(*chunkenc.HistogramChunk).GetCounterResetHeader()) + } + return headers, samples + } + + // Check samples before compaction (OOO and in-order samples are merged when both are in the head). + actHeaders, actSamples := runQuery() + require.Equal(t, []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset, chunkenc.CounterReset}, actHeaders) + require.Equal(t, [][]sample{ + { + {t: 0, h: histogramWithHint(4, histogram.UnknownCounterReset)}, + }, + { + {t: 1, h: histogramWithHint(2, histogram.UnknownCounterReset)}, + {t: 2, h: histogramWithHint(6, histogram.NotCounterReset)}, + }, + { + {t: 3, h: histogramWithHint(3, histogram.UnknownCounterReset)}, + {t: 4, h: histogramWithHint(8, histogram.NotCounterReset)}, + }, + }, actSamples) + + time.Sleep(time.Duration(float64(cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout) * (1 + compactionIdleTimeoutJitter))) + + // Compaction + i.compactBlocks(context.Background(), false, 0, nil) // Should be compacted because the TSDB is idle. + verifyCompactedHead(t, i, true) + + defer c.Close() + + actHeaders, actSamples = runQuery() + require.Equal(t, []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset}, actHeaders) + require.Equal(t, [][]sample{ + { + {t: 0, h: histogramWithHint(4, histogram.UnknownCounterReset)}, + {t: 2, h: histogramWithHint(6, histogram.NotCounterReset)}, + {t: 4, h: histogramWithHint(8, histogram.NotCounterReset)}, + }, + { + {t: 1, h: histogramWithHint(2, histogram.UnknownCounterReset)}, + {t: 3, h: histogramWithHint(3, histogram.NotCounterReset)}, + }, + }, actSamples) +} + +func histogramWithHint(idx int, hint histogram.CounterResetHint) *histogram.Histogram { + h := util_test.GenerateTestHistogram(idx) + h.CounterResetHint = hint + return h +} + +type sample struct { + t int64 + h *histogram.Histogram +} + func writeRequestSingleSeries(lbls labels.Labels, samples []mimirpb.Sample) *mimirpb.WriteRequest { req := &mimirpb.WriteRequest{ Source: mimirpb.API, diff --git a/pkg/querier/batch/chunk_test.go b/pkg/querier/batch/chunk_test.go index 99f8143935e..d28b227cf70 100644 --- a/pkg/querier/batch/chunk_test.go +++ b/pkg/querier/batch/chunk_test.go @@ -6,6 +6,7 @@ package batch import ( + "slices" "strconv" "testing" "time" @@ -82,7 +83,15 @@ func mkGenericChunk(t require.TestingT, from model.Time, points int, encoding ch return NewGenericChunk(int64(ck.From), int64(ck.Through), ck.Data.NewIterator) } -func testIter(t require.TestingT, points int, iter chunkenc.Iterator, encoding chunk.Encoding) { +type testBatchOptions uint + +const ( + // setNotCounterResetHintsAsUnknown can be used in cases where it's onerous to generate all the expected counter + // reset hints (e.g. merging lots of chunks together). + setNotCounterResetHintsAsUnknown testBatchOptions = iota +) + +func testIter(t require.TestingT, points int, iter chunkenc.Iterator, encoding chunk.Encoding, opts ...testBatchOptions) { nextExpectedTS := model.TimeFromUnix(0) var assertPoint func(i int) switch encoding { @@ -100,8 +109,15 @@ func testIter(t require.TestingT, points int, iter chunkenc.Iterator, encoding c ts, h := iter.AtHistogram(nil) require.EqualValues(t, int64(nextExpectedTS), ts, strconv.Itoa(i)) expH := test.GenerateTestHistogram(int(nextExpectedTS)) - if nextExpectedTS > 0 { - expH.CounterResetHint = histogram.NotCounterReset + if slices.Contains(opts, setNotCounterResetHintsAsUnknown) { + if h.CounterResetHint == histogram.NotCounterReset { + h.CounterResetHint = histogram.UnknownCounterReset + expH.CounterResetHint = histogram.UnknownCounterReset + } + } else { + if nextExpectedTS > 0 { + expH.CounterResetHint = histogram.NotCounterReset + } } test.RequireHistogramEqual(t, expH, h, strconv.Itoa(i)) nextExpectedTS = nextExpectedTS.Add(step) @@ -112,8 +128,15 @@ func testIter(t require.TestingT, points int, iter chunkenc.Iterator, encoding c ts, fh := iter.AtFloatHistogram(nil) require.EqualValues(t, int64(nextExpectedTS), ts, strconv.Itoa(i)) expFH := test.GenerateTestFloatHistogram(int(nextExpectedTS)) - if nextExpectedTS > 0 { - expFH.CounterResetHint = histogram.NotCounterReset + if slices.Contains(opts, setNotCounterResetHintsAsUnknown) { + if fh.CounterResetHint == histogram.NotCounterReset { + fh.CounterResetHint = histogram.UnknownCounterReset + expFH.CounterResetHint = histogram.UnknownCounterReset + } + } else { + if nextExpectedTS > 0 { + expFH.CounterResetHint = histogram.NotCounterReset + } } test.RequireFloatHistogramEqual(t, expFH, fh, strconv.Itoa(i)) nextExpectedTS = nextExpectedTS.Add(step) @@ -127,7 +150,7 @@ func testIter(t require.TestingT, points int, iter chunkenc.Iterator, encoding c require.Equal(t, chunkenc.ValNone, iter.Next()) } -func testSeek(t require.TestingT, points int, iter chunkenc.Iterator, encoding chunk.Encoding) { +func testSeek(t require.TestingT, points int, iter chunkenc.Iterator, encoding chunk.Encoding, opts ...testBatchOptions) { var assertPoint func(expectedTS int64, valType chunkenc.ValueType) switch encoding { case chunk.PrometheusXorChunk: @@ -144,8 +167,15 @@ func testSeek(t require.TestingT, points int, iter chunkenc.Iterator, encoding c ts, h := iter.AtHistogram(nil) require.EqualValues(t, expectedTS, ts) expH := test.GenerateTestHistogram(int(expectedTS)) - if expectedTS > 0 { - expH.CounterResetHint = histogram.NotCounterReset + if slices.Contains(opts, setNotCounterResetHintsAsUnknown) { + if h.CounterResetHint == histogram.NotCounterReset { + h.CounterResetHint = histogram.UnknownCounterReset + expH.CounterResetHint = histogram.UnknownCounterReset + } + } else { + if expectedTS > 0 { + expH.CounterResetHint = histogram.NotCounterReset + } } test.RequireHistogramEqual(t, expH, h) require.NoError(t, iter.Err()) @@ -156,8 +186,15 @@ func testSeek(t require.TestingT, points int, iter chunkenc.Iterator, encoding c ts, fh := iter.AtFloatHistogram(nil) require.EqualValues(t, expectedTS, ts) expFH := test.GenerateTestFloatHistogram(int(expectedTS)) - if expectedTS > 0 { - expFH.CounterResetHint = histogram.NotCounterReset + if slices.Contains(opts, setNotCounterResetHintsAsUnknown) { + if fh.CounterResetHint == histogram.NotCounterReset { + fh.CounterResetHint = histogram.UnknownCounterReset + expFH.CounterResetHint = histogram.UnknownCounterReset + } + } else { + if expectedTS > 0 { + expFH.CounterResetHint = histogram.NotCounterReset + } } test.RequireFloatHistogramEqual(t, expFH, fh) require.NoError(t, iter.Err()) diff --git a/pkg/querier/batch/merge.go b/pkg/querier/batch/merge.go index f723eed89a8..4642f1da508 100644 --- a/pkg/querier/batch/merge.go +++ b/pkg/querier/batch/merge.go @@ -50,7 +50,7 @@ func newMergeIterator(it iterator, cs []GenericChunk) *mergeIterator { c.batches = newBatchStream(len(c.its), &c.hPool, &c.fhPool) } for i, cs := range css { - c.its[i] = newNonOverlappingIterator(c.its[i], cs, &c.hPool, &c.fhPool) + c.its[i] = newNonOverlappingIterator(c.its[i], i, cs, &c.hPool, &c.fhPool) } for _, iter := range c.its { @@ -138,7 +138,7 @@ func (c *mergeIterator) buildNextBatch(size int) chunkenc.ValueType { // is before all iterators next entry. for len(c.h) > 0 && (c.batches.len() == 0 || c.nextBatchEndTime() >= c.h[0].AtTime()) { batch := c.h[0].Batch() - c.batches.merge(&batch, size) + c.batches.merge(&batch, size, c.h[0].id) if c.h[0].Next(size) != chunkenc.ValNone { heap.Fix(&c.h, 0) @@ -165,7 +165,7 @@ func (c *mergeIterator) Err() error { return c.currErr } -type iteratorHeap []iterator +type iteratorHeap []*nonOverlappingIterator func (h *iteratorHeap) Len() int { return len(*h) } func (h *iteratorHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] } @@ -177,7 +177,7 @@ func (h *iteratorHeap) Less(i, j int) bool { } func (h *iteratorHeap) Push(x interface{}) { - *h = append(*h, x.(iterator)) + *h = append(*h, x.(*nonOverlappingIterator)) } func (h *iteratorHeap) Pop() interface{} { diff --git a/pkg/querier/batch/merge_test.go b/pkg/querier/batch/merge_test.go index db8e80453a6..da146aa22e6 100644 --- a/pkg/querier/batch/merge_test.go +++ b/pkg/querier/batch/merge_test.go @@ -10,11 +10,13 @@ import ( "time" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/stretchr/testify/require" "github.com/grafana/mimir/pkg/storage/chunk" + "github.com/grafana/mimir/pkg/util/test" ) func TestMergeIter(t *testing.T) { @@ -27,15 +29,15 @@ func TestMergeIter(t *testing.T) { chunk5 := mkGenericChunk(t, model.TimeFromUnix(100), 100, enc) iter := NewGenericChunkMergeIterator(nil, labels.EmptyLabels(), []GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5}) - testIter(t, 200, iter, enc) + testIter(t, 200, iter, enc, setNotCounterResetHintsAsUnknown) iter = NewGenericChunkMergeIterator(nil, labels.EmptyLabels(), []GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5}) - testSeek(t, 200, iter, enc) + testSeek(t, 200, iter, enc, setNotCounterResetHintsAsUnknown) // Re-use iterator. iter = NewGenericChunkMergeIterator(iter, labels.EmptyLabels(), []GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5}) - testIter(t, 200, iter, enc) + testIter(t, 200, iter, enc, setNotCounterResetHintsAsUnknown) iter = NewGenericChunkMergeIterator(iter, labels.EmptyLabels(), []GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5}) - testSeek(t, 200, iter, enc) + testSeek(t, 200, iter, enc, setNotCounterResetHintsAsUnknown) }) } } @@ -56,10 +58,249 @@ func TestMergeHarder(t *testing.T) { from = from.Add(time.Duration(offset) * time.Second) } iter := newMergeIterator(nil, chunks) - testIter(t, offset*numChunks+samples-offset, newIteratorAdapter(nil, iter, labels.EmptyLabels()), enc) + testIter(t, offset*numChunks+samples-offset, newIteratorAdapter(nil, iter, labels.EmptyLabels()), enc, setNotCounterResetHintsAsUnknown) iter = newMergeIterator(nil, chunks) - testSeek(t, offset*numChunks+samples-offset, newIteratorAdapter(nil, iter, labels.EmptyLabels()), enc) + testSeek(t, offset*numChunks+samples-offset, newIteratorAdapter(nil, iter, labels.EmptyLabels()), enc, setNotCounterResetHintsAsUnknown) + }) + } +} + +type checkHintTestSample struct { + t int64 + v int + hint histogram.CounterResetHint + isFloat bool +} + +func TestMergeHistogramCheckHints(t *testing.T) { + for _, enc := range []chunk.Encoding{chunk.PrometheusHistogramChunk, chunk.PrometheusFloatHistogramChunk} { + addFunc := func(chk chunk.EncodedChunk, ts, val int) { + switch chk.Encoding() { + case chunk.PrometheusXorChunk: + _, err := chk.Add(model.SamplePair{Timestamp: model.Time(ts), Value: model.SampleValue(val)}) + require.NoError(t, err) + case chunk.PrometheusHistogramChunk: + overflow, err := chk.AddHistogram(int64(ts), test.GenerateTestHistogram(val)) + require.NoError(t, err) + require.Nil(t, overflow) + case chunk.PrometheusFloatHistogramChunk: + overflow, err := chk.AddFloatHistogram(int64(ts), test.GenerateTestFloatHistogram(val)) + require.NoError(t, err) + require.Nil(t, overflow) + default: + require.FailNow(t, "unhandled encoding type") + } + } + + t.Run(enc.String(), func(t *testing.T) { + for _, tc := range []struct { + name string + chunks []GenericChunk + expectedSamples []checkHintTestSample + }{ + { + name: "no overlapping iterators", + chunks: []GenericChunk{ + mkGenericChunk(t, 0, 5, enc), + mkGenericChunk(t, model.TimeFromUnix(5), 5, enc), + }, + expectedSamples: []checkHintTestSample{ + {t: 0, v: 0, hint: histogram.UnknownCounterReset}, + {t: 1000, v: 1000, hint: histogram.NotCounterReset}, + {t: 2000, v: 2000, hint: histogram.NotCounterReset}, + {t: 3000, v: 3000, hint: histogram.NotCounterReset}, + {t: 4000, v: 4000, hint: histogram.NotCounterReset}, + {t: 5000, v: 5000, hint: histogram.UnknownCounterReset}, + {t: 6000, v: 6000, hint: histogram.NotCounterReset}, + {t: 7000, v: 7000, hint: histogram.NotCounterReset}, + {t: 8000, v: 8000, hint: histogram.NotCounterReset}, + {t: 9000, v: 9000, hint: histogram.NotCounterReset}, + }, + }, + { + name: "duplicated chunks", + chunks: []GenericChunk{ + mkGenericChunk(t, 0, 10, enc), + mkGenericChunk(t, 0, 10, enc), + }, + expectedSamples: []checkHintTestSample{ + {t: 0, v: 0, hint: histogram.UnknownCounterReset}, // 1 sample from c0 + {t: 1000, v: 1000, hint: histogram.UnknownCounterReset}, // 1 sample from c1 + {t: 2000, v: 2000, hint: histogram.UnknownCounterReset}, // 2 samples from c0 + {t: 3000, v: 3000, hint: histogram.NotCounterReset}, + {t: 4000, v: 4000, hint: histogram.UnknownCounterReset}, // 4 samples from c1 + {t: 5000, v: 5000, hint: histogram.NotCounterReset}, + {t: 6000, v: 6000, hint: histogram.NotCounterReset}, + {t: 7000, v: 7000, hint: histogram.NotCounterReset}, + {t: 8000, v: 8000, hint: histogram.UnknownCounterReset}, // 2 samples from c0 + {t: 9000, v: 9000, hint: histogram.NotCounterReset}, + }, + }, + { + name: "overlapping chunks", + chunks: []GenericChunk{ + mkGenericChunk(t, 0, 11, enc), + mkGenericChunk(t, 3000, 7, enc), + }, + expectedSamples: []checkHintTestSample{ + {t: 0, v: 0, hint: histogram.UnknownCounterReset}, // 1 sample from c0 + {t: 1000, v: 1000, hint: histogram.NotCounterReset}, // 1 sample from c0, previous iterator was also c0, so keep the NCR hint + {t: 2000, v: 2000, hint: histogram.NotCounterReset}, // 2 samples from c0, previous iterator was also c0, so keep the NCR hint + {t: 3000, v: 3000, hint: histogram.NotCounterReset}, + {t: 4000, v: 4000, hint: histogram.UnknownCounterReset}, // 4 samples from c1 + {t: 5000, v: 5000, hint: histogram.NotCounterReset}, + {t: 6000, v: 6000, hint: histogram.NotCounterReset}, + {t: 7000, v: 7000, hint: histogram.NotCounterReset}, + {t: 8000, v: 8000, hint: histogram.UnknownCounterReset}, // 3 samples from c1 + {t: 9000, v: 9000, hint: histogram.NotCounterReset}, + {t: 10000, v: 10000, hint: histogram.NotCounterReset}, + }, + }, + { + name: "different values at same timestamp", + chunks: func() []GenericChunk { + chk1, err := chunk.NewForEncoding(enc) + require.NoError(t, err) + addFunc(chk1, 0, 0) + addFunc(chk1, 1, 1) + addFunc(chk1, 4, 2) + addFunc(chk1, 5, 3) + addFunc(chk1, 7, 4) + chk2, err := chunk.NewForEncoding(enc) + require.NoError(t, err) + addFunc(chk2, 0, 7) + addFunc(chk2, 1, 8) + addFunc(chk2, 2, 9) + addFunc(chk2, 3, 10) + addFunc(chk2, 4, 11) + return []GenericChunk{ + NewGenericChunk(0, 7, chunk.NewChunk(labels.FromStrings(model.MetricNameLabel, "foo"), chk1, 0, 7).Data.NewIterator), + NewGenericChunk(0, 4, chunk.NewChunk(labels.FromStrings(model.MetricNameLabel, "foo"), chk2, 0, 4).Data.NewIterator), + } + }(), + expectedSamples: []checkHintTestSample{ + {t: 0, v: 0, hint: histogram.UnknownCounterReset}, // 1 sample from c1 + {t: 1, v: 8, hint: histogram.UnknownCounterReset}, // 1 sample from c2 + {t: 2, v: 9, hint: histogram.NotCounterReset}, // 2 samples from c2, previous iterator was also c2, so keep the NCR hint + {t: 3, v: 10, hint: histogram.NotCounterReset}, + {t: 4, v: 11, hint: histogram.NotCounterReset}, // 1 sample from c2, previous iterator was also c2, so keep the NCR hint, also end of iterator + {t: 5, v: 3, hint: histogram.UnknownCounterReset}, // 2 samples from c1 + {t: 7, v: 4, hint: histogram.NotCounterReset}, + }, + }, + { + name: "overlapping and interleaved samples", + chunks: func() []GenericChunk { + chk1, err := chunk.NewForEncoding(enc) + require.NoError(t, err) + addFunc(chk1, 0, 0) + addFunc(chk1, 3, 1) + addFunc(chk1, 4, 2) + addFunc(chk1, 5, 3) + addFunc(chk1, 7, 4) + addFunc(chk1, 8, 5) + chk2, err := chunk.NewForEncoding(enc) + require.NoError(t, err) + addFunc(chk2, 1, 1) + addFunc(chk2, 2, 7) + addFunc(chk2, 5, 8) + addFunc(chk2, 6, 9) + addFunc(chk2, 7, 10) + addFunc(chk2, 9, 11) + return []GenericChunk{ + NewGenericChunk(0, 8, chunk.NewChunk(labels.FromStrings(model.MetricNameLabel, "foo"), chk1, 0, 8).Data.NewIterator), + NewGenericChunk(1, 9, chunk.NewChunk(labels.FromStrings(model.MetricNameLabel, "foo"), chk2, 1, 9).Data.NewIterator), + } + }(), + expectedSamples: []checkHintTestSample{ + {t: 0, v: 0, hint: histogram.UnknownCounterReset}, // c1 + {t: 1, v: 1, hint: histogram.UnknownCounterReset}, // c2 + {t: 2, v: 7, hint: histogram.NotCounterReset}, // c2 + {t: 3, v: 1, hint: histogram.UnknownCounterReset}, // c1 + {t: 4, v: 2, hint: histogram.NotCounterReset}, // c1 + {t: 5, v: 8, hint: histogram.UnknownCounterReset}, // c2 + // Next sample is from c2. This is consecutive, but this ends up as the first sample in a merged + // batch, and the c1 sample at ts 7 and 8 is added to the batch stream before this c2 sample, so + // the prevIteratorID is set to c1 rather than c2 when we append this sample. + {t: 6, v: 9, hint: histogram.UnknownCounterReset}, + {t: 7, v: 4, hint: histogram.UnknownCounterReset}, // c1 + {t: 8, v: 5, hint: histogram.NotCounterReset}, // c1 + {t: 9, v: 11, hint: histogram.UnknownCounterReset}, // c2 + }, + }, + { + name: "histogram and float samples", + chunks: func() []GenericChunk { + chk1, err := chunk.NewForEncoding(enc) + require.NoError(t, err) + addFunc(chk1, 0, 0) + addFunc(chk1, 3, 1) + addFunc(chk1, 5, 2) + addFunc(chk1, 6, 3) + addFunc(chk1, 7, 4) + chk2, err := chunk.NewForEncoding(enc) + require.NoError(t, err) + addFunc(chk2, 1, 1) + addFunc(chk2, 2, 7) + addFunc(chk2, 6, 8) + addFunc(chk2, 7, 9) + addFunc(chk2, 8, 10) + addFunc(chk2, 9, 11) + chk3, err := chunk.NewForEncoding(chunk.PrometheusXorChunk) + require.NoError(t, err) + addFunc(chk3, 4, 2) + return []GenericChunk{ + NewGenericChunk(0, 7, chunk.NewChunk(labels.FromStrings(model.MetricNameLabel, "foo"), chk1, 0, 7).Data.NewIterator), + NewGenericChunk(1, 9, chunk.NewChunk(labels.FromStrings(model.MetricNameLabel, "foo"), chk2, 1, 9).Data.NewIterator), + NewGenericChunk(4, 4, chunk.NewChunk(labels.FromStrings(model.MetricNameLabel, "foo"), chk3, 4, 4).Data.NewIterator), + } + }(), + expectedSamples: []checkHintTestSample{ + {t: 0, v: 0, hint: histogram.UnknownCounterReset}, // c1 + {t: 1, v: 1, hint: histogram.UnknownCounterReset}, // c2 + {t: 2, v: 7, hint: histogram.NotCounterReset}, // c2 + {t: 3, v: 1, hint: histogram.UnknownCounterReset}, // c1 + {t: 4, v: 2, isFloat: true}, // c3 + {t: 5, v: 2, hint: histogram.UnknownCounterReset}, // c1 + {t: 6, v: 8, hint: histogram.UnknownCounterReset}, // c2 + {t: 7, v: 4, hint: histogram.UnknownCounterReset}, // c1 + {t: 8, v: 10, hint: histogram.UnknownCounterReset}, // c2 + {t: 9, v: 11, hint: histogram.NotCounterReset}, // c2 + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + iter := NewGenericChunkMergeIterator(nil, labels.EmptyLabels(), tc.chunks) + for i, s := range tc.expectedSamples { + valType := iter.Next() + require.NotEqual(t, chunkenc.ValNone, valType, "expectedSamples has extra samples") + require.Nil(t, iter.Err()) + require.Equal(t, s.t, iter.AtT()) + switch valType { + case chunkenc.ValFloat: + require.True(t, s.isFloat) + _, v := iter.At() + require.Equal(t, float64(s.v), v) + case chunkenc.ValHistogram: + expH := test.GenerateTestHistogram(s.v) + expH.CounterResetHint = s.hint + _, actH := iter.AtHistogram(nil) + test.RequireHistogramEqual(t, expH, actH, "expected sample %d does not match", i) + case chunkenc.ValFloatHistogram: + expH := test.GenerateTestFloatHistogram(s.v) + expH.CounterResetHint = s.hint + _, actH := iter.AtFloatHistogram(nil) + test.RequireFloatHistogramEqual(t, expH, actH, "expected sample with idx %d does not match", i) + default: + t.Errorf("checkHints - internal error, unhandled expected type: %T", s) + } + } + require.Equal(t, chunkenc.ValNone, iter.Next(), "iter has extra samples") + require.Nil(t, iter.Err()) + }) + + } }) } } diff --git a/pkg/querier/batch/non_overlapping.go b/pkg/querier/batch/non_overlapping.go index 03e35553444..ab0743ed9b0 100644 --- a/pkg/querier/batch/non_overlapping.go +++ b/pkg/querier/batch/non_overlapping.go @@ -17,14 +17,17 @@ type nonOverlappingIterator struct { curr int chunks []GenericChunk iter chunkIterator + // id is used to detect when the iterator has changed when merging + id int } // newNonOverlappingIterator returns a single iterator over a slice of sorted, // non-overlapping iterators. -func newNonOverlappingIterator(it *nonOverlappingIterator, chunks []GenericChunk, hPool *zeropool.Pool[*histogram.Histogram], fhPool *zeropool.Pool[*histogram.FloatHistogram]) *nonOverlappingIterator { +func newNonOverlappingIterator(it *nonOverlappingIterator, id int, chunks []GenericChunk, hPool *zeropool.Pool[*histogram.Histogram], fhPool *zeropool.Pool[*histogram.FloatHistogram]) *nonOverlappingIterator { if it == nil { it = &nonOverlappingIterator{} } + it.id = id it.chunks = chunks it.curr = 0 it.iter.hPool = hPool diff --git a/pkg/querier/batch/non_overlapping_test.go b/pkg/querier/batch/non_overlapping_test.go index bfe63813b0b..032d508733d 100644 --- a/pkg/querier/batch/non_overlapping_test.go +++ b/pkg/querier/batch/non_overlapping_test.go @@ -19,16 +19,16 @@ func TestNonOverlappingIter(t *testing.T) { for i := int64(0); i < 100; i++ { cs = append(cs, mkGenericChunk(t, model.TimeFromUnix(i*10), 10, chunk.PrometheusXorChunk)) } - testIter(t, 10*100, newIteratorAdapter(nil, newNonOverlappingIterator(nil, cs, nil, nil), labels.EmptyLabels()), chunk.PrometheusXorChunk) - it := newNonOverlappingIterator(nil, cs, nil, nil) + testIter(t, 10*100, newIteratorAdapter(nil, newNonOverlappingIterator(nil, 0, cs, nil, nil), labels.EmptyLabels()), chunk.PrometheusXorChunk) + it := newNonOverlappingIterator(nil, 0, cs, nil, nil) adapter := newIteratorAdapter(nil, it, labels.EmptyLabels()) testSeek(t, 10*100, adapter, chunk.PrometheusXorChunk) // Do the same operations while re-using the iterators. - it = newNonOverlappingIterator(it, cs, nil, nil) + it = newNonOverlappingIterator(it, 0, cs, nil, nil) adapter = newIteratorAdapter(adapter.(*iteratorAdapter), it, labels.EmptyLabels()) testIter(t, 10*100, adapter, chunk.PrometheusXorChunk) - it = newNonOverlappingIterator(it, cs, nil, nil) + it = newNonOverlappingIterator(it, 0, cs, nil, nil) adapter = newIteratorAdapter(adapter.(*iteratorAdapter), it, labels.EmptyLabels()) testSeek(t, 10*100, adapter, chunk.PrometheusXorChunk) } @@ -42,6 +42,6 @@ func TestNonOverlappingIterSparse(t *testing.T) { mkGenericChunk(t, model.TimeFromUnix(95), 1, chunk.PrometheusXorChunk), mkGenericChunk(t, model.TimeFromUnix(96), 4, chunk.PrometheusXorChunk), } - testIter(t, 100, newIteratorAdapter(nil, newNonOverlappingIterator(nil, cs, nil, nil), labels.EmptyLabels()), chunk.PrometheusXorChunk) - testSeek(t, 100, newIteratorAdapter(nil, newNonOverlappingIterator(nil, cs, nil, nil), labels.EmptyLabels()), chunk.PrometheusXorChunk) + testIter(t, 100, newIteratorAdapter(nil, newNonOverlappingIterator(nil, 0, cs, nil, nil), labels.EmptyLabels()), chunk.PrometheusXorChunk) + testSeek(t, 100, newIteratorAdapter(nil, newNonOverlappingIterator(nil, 0, cs, nil, nil), labels.EmptyLabels()), chunk.PrometheusXorChunk) } diff --git a/pkg/querier/batch/stream.go b/pkg/querier/batch/stream.go index 0cbd67e41df..57c7031df64 100644 --- a/pkg/querier/batch/stream.go +++ b/pkg/querier/batch/stream.go @@ -20,6 +20,10 @@ type batchStream struct { batches []chunk.Batch batchesBuf []chunk.Batch + // prevIteratorID is the iterator id of the last sample appended to the batchStream from the last merge() call. + // This helps reduce the number of hints that are set to unknown across merge calls. + prevIteratorID int + hPool *zeropool.Pool[*histogram.Histogram] fhPool *zeropool.Pool[*histogram.FloatHistogram] } @@ -28,10 +32,11 @@ func newBatchStream(size int, hPool *zeropool.Pool[*histogram.Histogram], fhPool batches := make([]chunk.Batch, 0, size) batchesBuf := make([]chunk.Batch, size) return &batchStream{ - batches: batches, - batchesBuf: batchesBuf, - hPool: hPool, - fhPool: fhPool, + batches: batches, + batchesBuf: batchesBuf, + prevIteratorID: -1, + hPool: hPool, + fhPool: fhPool, } } @@ -58,6 +63,7 @@ func (bs *batchStream) empty() { bs.putPointerValuesToThePool(&bs.batches[i]) } bs.batches = bs.batches[:0] + bs.prevIteratorID = -1 } func (bs *batchStream) len() int { @@ -92,7 +98,7 @@ func (bs *batchStream) curr() *chunk.Batch { // merge merges this streams of chunk.Batch objects and the given chunk.Batch of the same series over time. // Samples are simply merged by time when they are the same type (float/histogram/...), with the left stream taking precedence if the timestamps are equal. // When sample are different type, batches are not merged. In case of equal timestamps, histograms take precedence since they have more information. -func (bs *batchStream) merge(batch *chunk.Batch, size int) { +func (bs *batchStream) merge(batch *chunk.Batch, size int, iteratorID int) { // We store this at the beginning to avoid additional allocations. // Namely, the merge method will go through all the batches from bs.batch, // check whether their elements should be kept (and copy them to the result) @@ -129,7 +135,9 @@ func (bs *batchStream) merge(batch *chunk.Batch, size int) { b.ValueType = valueType } - populate := func(batch *chunk.Batch, valueType chunkenc.ValueType) { + prevIteratorID := bs.prevIteratorID + + populate := func(batch *chunk.Batch, valueType chunkenc.ValueType, itID int) { if b.Index == 0 { // Starting to write this Batch, it is safe to set the value type b.ValueType = valueType @@ -144,26 +152,51 @@ func (bs *batchStream) merge(batch *chunk.Batch, size int) { b.Timestamps[b.Index], b.Values[b.Index] = batch.At() case chunkenc.ValHistogram: b.Timestamps[b.Index], b.PointerValues[b.Index] = batch.AtHistogram() + if itID == -1 { // This means the sample is already in the batch stream. We get its original iterator id. + itID = batch.GetIteratorID() + } + if prevIteratorID != itID && prevIteratorID != -1 { + // We switched non overlapping iterators, so if the next sample coming + // from a different place or time we should reset the hint. + h := (*histogram.Histogram)(b.PointerValues[b.Index]) + if h.CounterResetHint != histogram.GaugeType && h.CounterResetHint != histogram.UnknownCounterReset { + h.CounterResetHint = histogram.UnknownCounterReset + } + } + b.SetIteratorID(itID) case chunkenc.ValFloatHistogram: b.Timestamps[b.Index], b.PointerValues[b.Index] = batch.AtFloatHistogram() + if itID == -1 { // This means the sample is already in the batch stream. We get its original iterator id. + itID = batch.GetIteratorID() + } + if prevIteratorID != itID && prevIteratorID != -1 { + // We switched non overlapping iterators, so if the next sample coming + // from a different place or time we should reset the hint. + h := (*histogram.FloatHistogram)(b.PointerValues[b.Index]) + if h.CounterResetHint != histogram.GaugeType && h.CounterResetHint != histogram.UnknownCounterReset { + h.CounterResetHint = histogram.UnknownCounterReset + } + } + b.SetIteratorID(itID) } + prevIteratorID = itID b.Index++ } for lt, rt := bs.hasNext(), batch.HasNext(); lt != chunkenc.ValNone && rt != chunkenc.ValNone; lt, rt = bs.hasNext(), batch.HasNext() { t1, t2 := bs.curr().AtTime(), batch.AtTime() if t1 < t2 { - populate(bs.curr(), lt) + populate(bs.curr(), lt, -1) bs.next() } else if t1 > t2 { - populate(batch, rt) + populate(batch, rt, iteratorID) batch.Next() } else { if (rt == chunkenc.ValHistogram || rt == chunkenc.ValFloatHistogram) && lt == chunkenc.ValFloat { // Prefer histograms than floats. Take left side if both have histograms. - populate(batch, rt) + populate(batch, rt, iteratorID) } else { - populate(bs.curr(), lt) + populate(bs.curr(), lt, -1) // if bs.hPool is not nil, we put there the discarded histogram.Histogram object from batch, so it can be reused. if rt == chunkenc.ValHistogram && bs.hPool != nil { _, h := batch.AtHistogram() @@ -181,12 +214,12 @@ func (bs *batchStream) merge(batch *chunk.Batch, size int) { } for t := bs.hasNext(); t != chunkenc.ValNone; t = bs.hasNext() { - populate(bs.curr(), t) + populate(bs.curr(), t, -1) bs.next() } for t := batch.HasNext(); t != chunkenc.ValNone; t = batch.HasNext() { - populate(batch, t) + populate(batch, t, iteratorID) batch.Next() } @@ -194,6 +227,9 @@ func (bs *batchStream) merge(batch *chunk.Batch, size int) { // has to be appended, hence it tells the length. b.Length = b.Index + // Store the last iterator id. + bs.prevIteratorID = prevIteratorID + bs.batches = append(origBatches, bs.batchesBuf[:resultLen]...) bs.reset() } diff --git a/pkg/querier/batch/stream_test.go b/pkg/querier/batch/stream_test.go index 14b28b141a2..74d58a50806 100644 --- a/pkg/querier/batch/stream_test.go +++ b/pkg/querier/batch/stream_test.go @@ -92,7 +92,7 @@ func TestBatchStream_Merge(t *testing.T) { s.batches = tc.batches for i := range tc.newBatches { - s.merge(&tc.newBatches[i], chunk.BatchSize) + s.merge(&tc.newBatches[i], chunk.BatchSize, 0) } require.Equal(t, len(tc.output), len(s.batches)) diff --git a/pkg/querier/block_test.go b/pkg/querier/block_test.go index 1e7130c6a82..55ac10b5cd0 100644 --- a/pkg/querier/block_test.go +++ b/pkg/querier/block_test.go @@ -481,6 +481,39 @@ func createAggrChunk(minTime, maxTime int64, samples ...promql.FPoint) storepb.A } } +func createAggrChunkWithFloatHistogramSamples(samples ...promql.HPoint) storepb.AggrChunk { + return createAggrFloatHistogramChunk(samples[0].T, samples[len(samples)-1].T, samples...) +} + +func createAggrFloatHistogramChunk(minTime, maxTime int64, samples ...promql.HPoint) storepb.AggrChunk { + // Ensure samples are sorted by timestamp. + sort.Slice(samples, func(i, j int) bool { + return samples[i].T < samples[j].T + }) + + chunk := chunkenc.NewFloatHistogramChunk() + appender, err := chunk.Appender() + if err != nil { + panic(err) + } + + for _, s := range samples { + _, _, appender, err = appender.AppendFloatHistogram(nil, s.T, s.H, true) + if err != nil { + panic(err) + } + } + + return storepb.AggrChunk{ + MinTime: minTime, + MaxTime: maxTime, + Raw: storepb.Chunk{ + Type: storepb.Chunk_FloatHistogram, + Data: chunk.Bytes(), + }, + } +} + func mkZLabels(s ...string) []mimirpb.LabelAdapter { var result []mimirpb.LabelAdapter diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 70d8ddc488f..0f8dc3cfe8b 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -33,10 +33,12 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/tsdbutil" v1 "github.com/prometheus/prometheus/web/api/v1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -81,8 +83,9 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { ) type valueResult struct { - t int64 - v float64 + t int64 + v float64 + fh *histogram.FloatHistogram } type seriesResult struct { @@ -1510,6 +1513,88 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { ` }, }, + "histograms with counter resets in overlapping chunks": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponseWithFloatHistogramSamples(metricNameLabel, + promql.HPoint{T: minT + 1, H: test.GenerateTestFloatHistogram(40)}, + promql.HPoint{T: minT + 7, H: test.GenerateTestFloatHistogram(50)}, + ), + mockSeriesResponseWithFloatHistogramSamples(metricNameLabel, + promql.HPoint{T: minT + 2, H: test.GenerateTestFloatHistogram(20)}, + promql.HPoint{T: minT + 3, H: test.GenerateTestFloatHistogram(60)}, + promql.HPoint{T: minT + 4, H: test.GenerateTestFloatHistogram(70)}, + promql.HPoint{T: minT + 5, H: test.GenerateTestFloatHistogram(80)}, + promql.HPoint{T: minT + 6, H: test.GenerateTestFloatHistogram(90)}, + ), + mockHintsResponse(block1, block2), + mockStatsResponse(50), + }}: {block1, block2}, + }, + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedSeries: []seriesResult{ + { + lbls: metricNameLabel, + values: []valueResult{ + {t: minT + 1, fh: test.GenerateTestFloatHistogram(40)}, + {t: minT + 2, fh: test.GenerateTestFloatHistogram(20)}, + {t: minT + 3, fh: tsdbutil.SetFloatHistogramNotCounterReset(test.GenerateTestFloatHistogram(60))}, + {t: minT + 4, fh: tsdbutil.SetFloatHistogramNotCounterReset(test.GenerateTestFloatHistogram(70))}, + {t: minT + 5, fh: tsdbutil.SetFloatHistogramNotCounterReset(test.GenerateTestFloatHistogram(80))}, + {t: minT + 6, fh: tsdbutil.SetFloatHistogramNotCounterReset(test.GenerateTestFloatHistogram(90))}, + {t: minT + 7, fh: test.GenerateTestFloatHistogram(50)}, + }, + }, + }, + }, + "histograms with counter resets with partially matching chunks": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponseWithFloatHistogramSamples(metricNameLabel, + promql.HPoint{T: minT + 1, H: test.GenerateTestFloatHistogram(40)}, + ), + mockSeriesResponseWithFloatHistogramSamples(metricNameLabel, + promql.HPoint{T: minT + 3, H: test.GenerateTestFloatHistogram(20)}, + ), + mockSeriesResponseWithFloatHistogramSamples(metricNameLabel, + promql.HPoint{T: minT + 1, H: test.GenerateTestFloatHistogram(40)}, + ), + mockSeriesResponseWithFloatHistogramSamples(metricNameLabel, + promql.HPoint{T: minT + 2, H: test.GenerateTestFloatHistogram(30)}, + ), + mockSeriesResponseWithFloatHistogramSamples(metricNameLabel, + promql.HPoint{T: minT + 3, H: test.GenerateTestFloatHistogram(20)}, + ), + mockHintsResponse(block1, block2), + mockStatsResponse(50), + }}: {block1, block2}, + }, + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedSeries: []seriesResult{ + { + lbls: metricNameLabel, + values: []valueResult{ + {t: minT + 1, fh: test.GenerateTestFloatHistogram(40)}, + {t: minT + 2, fh: test.GenerateTestFloatHistogram(30)}, + {t: minT + 3, fh: test.GenerateTestFloatHistogram(20)}, + }, + }, + }, + }, } for testName, testData := range tests { @@ -1611,12 +1696,22 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { it = set.At().Iterator(it) for valType := it.Next(); valType != chunkenc.ValNone; valType = it.Next() { - assert.Equal(t, valType, chunkenc.ValFloat) - t, v := it.At() - actualValues = append(actualValues, valueResult{ - t: t, - v: v, - }) + switch valType { + case chunkenc.ValFloat: + t, v := it.At() + actualValues = append(actualValues, valueResult{ + t: t, + v: v, + }) + case chunkenc.ValFloatHistogram: + t, fh := it.AtFloatHistogram(nil) + actualValues = append(actualValues, valueResult{ + t: t, + fh: fh, + }) + default: + require.FailNow(t, "unhandled type") + } } require.NoError(t, it.Err()) @@ -3133,6 +3228,10 @@ func mockSeriesResponseWithSamples(lbls labels.Labels, samples ...promql.FPoint) return mockSeriesResponseWithChunks(lbls, createAggrChunkWithSamples(samples...)) } +func mockSeriesResponseWithFloatHistogramSamples(lbls labels.Labels, samples ...promql.HPoint) *storepb.SeriesResponse { + return mockSeriesResponseWithChunks(lbls, createAggrChunkWithFloatHistogramSamples(samples...)) +} + func mockSeriesResponseWithChunks(lbls labels.Labels, chunks ...storepb.AggrChunk) *storepb.SeriesResponse { return &storepb.SeriesResponse{ Result: &storepb.SeriesResponse_Series{ diff --git a/pkg/querier/distributor_queryable_test.go b/pkg/querier/distributor_queryable_test.go index e9b3499ecc7..28a7775190a 100644 --- a/pkg/querier/distributor_queryable_test.go +++ b/pkg/querier/distributor_queryable_test.go @@ -241,8 +241,8 @@ func TestDistributorQuerier_Select_MixedChunkseriesTimeseriesAndStreamingResults } streamReader := createTestStreamReader([]client.QueryStreamSeriesChunks{ - {SeriesIndex: 0, Chunks: convertToChunks(t, samplesToInterface(s4))}, - {SeriesIndex: 1, Chunks: convertToChunks(t, samplesToInterface(s3))}, + {SeriesIndex: 0, Chunks: convertToChunks(t, samplesToInterface(s4), false)}, + {SeriesIndex: 1, Chunks: convertToChunks(t, samplesToInterface(s3), false)}, }) d := &mockDistributor{} @@ -251,11 +251,11 @@ func TestDistributorQuerier_Select_MixedChunkseriesTimeseriesAndStreamingResults Chunkseries: []client.TimeSeriesChunk{ { Labels: []mimirpb.LabelAdapter{{Name: labels.MetricName, Value: "one"}}, - Chunks: convertToChunks(t, samplesToInterface(s1)), + Chunks: convertToChunks(t, samplesToInterface(s1), false), }, { Labels: []mimirpb.LabelAdapter{{Name: labels.MetricName, Value: "two"}}, - Chunks: convertToChunks(t, samplesToInterface(s1)), + Chunks: convertToChunks(t, samplesToInterface(s1), false), }, }, @@ -377,11 +377,11 @@ func TestDistributorQuerier_Select_MixedFloatAndIntegerHistograms(t *testing.T) Chunkseries: []client.TimeSeriesChunk{ { Labels: []mimirpb.LabelAdapter{{Name: labels.MetricName, Value: "one"}}, - Chunks: convertToChunks(t, histogramsToInterface(s1)), + Chunks: convertToChunks(t, histogramsToInterface(s1), false), }, { Labels: []mimirpb.LabelAdapter{{Name: labels.MetricName, Value: "two"}}, - Chunks: convertToChunks(t, histogramsToInterface(s1)), + Chunks: convertToChunks(t, histogramsToInterface(s1), false), }, }, @@ -476,11 +476,11 @@ func TestDistributorQuerier_Select_MixedHistogramsAndFloatSamples(t *testing.T) Chunkseries: []client.TimeSeriesChunk{ { Labels: []mimirpb.LabelAdapter{{Name: labels.MetricName, Value: "one"}}, - Chunks: convertToChunks(t, append(samplesToInterface(s1), histogramsToInterface(h1)...)), + Chunks: convertToChunks(t, append(samplesToInterface(s1), histogramsToInterface(h1)...), false), }, { Labels: []mimirpb.LabelAdapter{{Name: labels.MetricName, Value: "two"}}, - Chunks: convertToChunks(t, append(samplesToInterface(s1), histogramsToInterface(h1)...)), + Chunks: convertToChunks(t, append(samplesToInterface(s1), histogramsToInterface(h1)...), false), }, }, @@ -520,6 +520,123 @@ func TestDistributorQuerier_Select_MixedHistogramsAndFloatSamples(t *testing.T) require.NoError(t, seriesSet.Err()) } +func TestDistributorQuerier_Select_CounterResets(t *testing.T) { + makeHistogram := func(ts, val int64, hint mimirpb.Histogram_ResetHint) mimirpb.Histogram { + return mimirpb.Histogram{ + Count: &mimirpb.Histogram_CountInt{CountInt: uint64(val)}, + Sum: float64(val), + PositiveSpans: []mimirpb.BucketSpan{ + {Offset: 0, Length: 1}, + }, + PositiveDeltas: []int64{val}, + Timestamp: ts, + ResetHint: hint, + } + } + + for _, tc := range []struct { + name string + chunks []client.Chunk + queryStart, queryEnd int64 + expectedSamples []mimirpb.Histogram + }{ + { + // This might happen is when an in-order chunk and OOO chunk are returned from the same ingester. + name: "overlapping chunks", + chunks: append( + convertToChunks(t, histogramsToInterface([]mimirpb.Histogram{ + makeHistogram(100, 40, mimirpb.Histogram_UNKNOWN), + makeHistogram(700, 50, mimirpb.Histogram_NO), + }), false), + convertToChunks(t, histogramsToInterface([]mimirpb.Histogram{ + makeHistogram(200, 20, mimirpb.Histogram_UNKNOWN), + makeHistogram(300, 60, mimirpb.Histogram_NO), + makeHistogram(400, 70, mimirpb.Histogram_NO), + makeHistogram(500, 80, mimirpb.Histogram_NO), + makeHistogram(600, 90, mimirpb.Histogram_NO), + }), false)...), + expectedSamples: []mimirpb.Histogram{ + makeHistogram(100, 40, mimirpb.Histogram_UNKNOWN), + makeHistogram(200, 20, mimirpb.Histogram_UNKNOWN), + makeHistogram(300, 60, mimirpb.Histogram_NO), + makeHistogram(400, 70, mimirpb.Histogram_NO), + makeHistogram(500, 80, mimirpb.Histogram_NO), + makeHistogram(600, 90, mimirpb.Histogram_NO), + makeHistogram(700, 50, mimirpb.Histogram_UNKNOWN), + }, + }, + { + // This might happen when one of the ingesters hasn't ingested all the samples. + name: "duplicate samples in separate chunks", + chunks: append( + convertToChunks(t, histogramsToInterface([]mimirpb.Histogram{ + makeHistogram(100, 40, mimirpb.Histogram_UNKNOWN), + makeHistogram(300, 20, mimirpb.Histogram_NO), + }), true), + convertToChunks(t, histogramsToInterface([]mimirpb.Histogram{ + makeHistogram(100, 40, mimirpb.Histogram_UNKNOWN), + makeHistogram(200, 30, mimirpb.Histogram_NO), + makeHistogram(300, 20, mimirpb.Histogram_NO), + }), true)...), + expectedSamples: []mimirpb.Histogram{ + makeHistogram(100, 40, mimirpb.Histogram_UNKNOWN), + makeHistogram(200, 30, mimirpb.Histogram_UNKNOWN), + makeHistogram(300, 20, mimirpb.Histogram_UNKNOWN), + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + responseTypes := map[string]struct { + combinedResponse client.CombinedQueryStreamResponse + }{ + "chunkseries": { + combinedResponse: client.CombinedQueryStreamResponse{ + Chunkseries: []client.TimeSeriesChunk{ + { + Labels: []mimirpb.LabelAdapter{{Name: labels.MetricName, Value: "one"}}, + Chunks: tc.chunks, + }, + }, + }, + }, + "streamingseries": { + combinedResponse: client.CombinedQueryStreamResponse{ + StreamingSeries: []client.StreamingSeries{ + { + Labels: labels.FromStrings(labels.MetricName, "one"), + Sources: []client.StreamingSeriesSource{ + {SeriesIndex: 0, StreamReader: createTestStreamReader([]client.QueryStreamSeriesChunks{ + {SeriesIndex: 0, Chunks: tc.chunks}, + })}, + }, + }, + }, + }, + }, + } + + for responseName, responseType := range responseTypes { + t.Run(responseName, func(t *testing.T) { + d := &mockDistributor{} + d.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(responseType.combinedResponse, nil) + + ctx := user.InjectOrgID(context.Background(), "0") + queryable := NewDistributorQueryable(d, newMockConfigProvider(0), stats.NewQueryMetrics(prometheus.NewPedanticRegistry()), log.NewNopLogger()) + querier, err := queryable.Querier(tc.queryStart, tc.queryEnd) + require.NoError(t, err) + + seriesSet := querier.Select(ctx, true, &storage.SelectHints{Start: tc.queryStart, End: tc.queryEnd}, labels.MustNewMatcher(labels.MatchRegexp, labels.MetricName, ".*")) + require.True(t, seriesSet.Next()) + require.NoError(t, seriesSet.Err()) + + verifySeries(t, seriesSet.At(), labels.FromStrings(labels.MetricName, "one"), histogramsToInterface(tc.expectedSamples)) + require.False(t, seriesSet.Next()) + }) + } + }) + } +} + func TestDistributorQuerier_LabelNames(t *testing.T) { const mint, maxt = 0, 10 @@ -652,21 +769,23 @@ func verifySeries(t *testing.T, series storage.Series, l labels.Labels, samples require.Nil(t, it.Err()) } -func convertToChunks(t *testing.T, samples []interface{}) []client.Chunk { +func convertToChunks(t *testing.T, samples []interface{}, allowOverflow bool) []client.Chunk { var ( overflow chunk.EncodedChunk + enc chunk.Encoding + ts int64 err error ) chunks := []chunk.Chunk{} - ensureChunk := func(enc chunk.Encoding, ts int64) { - if len(chunks) == 0 || chunks[len(chunks)-1].Data.Encoding() != enc { - c, err := chunk.NewForEncoding(enc) + ensureChunk := func(reqEnc chunk.Encoding, reqTs int64) { + enc = reqEnc + ts = reqTs + if len(chunks) == 0 || chunks[len(chunks)-1].Data.Encoding() != reqEnc { + c, err := chunk.NewForEncoding(reqEnc) require.NoError(t, err) - chunks = append(chunks, chunk.NewChunk(labels.EmptyLabels(), c, model.Time(ts), model.Time(ts))) - return + chunks = append(chunks, chunk.NewChunk(labels.EmptyLabels(), c, model.Time(reqTs), model.Time(reqTs))) } - chunks[len(chunks)-1].Through = model.Time(ts) } for _, s := range samples { @@ -686,7 +805,18 @@ func convertToChunks(t *testing.T, samples []interface{}) []client.Chunk { t.Errorf("convertToChunks - unhandled type: %T", s) } require.NoError(t, err) - require.Nil(t, overflow) + if overflow == nil { + chunks[len(chunks)-1].Through = model.Time(ts) + continue + } + if !allowOverflow { + require.Nil(t, overflow) + continue + } + c, err := chunk.NewForEncoding(enc) + require.NoError(t, err) + chunks = append(chunks, chunk.NewChunk(labels.EmptyLabels(), c, model.Time(ts), model.Time(ts))) + chunks[len(chunks)-1].Data = overflow } clientChunks, err := client.ToChunks(chunks) diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index c3ded9fd672..23905d68b1e 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -277,7 +277,7 @@ func TestQuerier_QueryableReturnsChunksOutsideQueriedRange(t *testing.T) { mimirpb.Sample{TimestampMs: queryStart.Add(-9*time.Minute).Unix() * 1000, Value: 1}, mimirpb.Sample{TimestampMs: queryStart.Add(-8*time.Minute).Unix() * 1000, Value: 1}, mimirpb.Sample{TimestampMs: queryStart.Add(-7*time.Minute).Unix() * 1000, Value: 1}, - }), + }, false), }, // Series with data points before and after queryStart, but before queryEnd. { @@ -295,7 +295,7 @@ func TestQuerier_QueryableReturnsChunksOutsideQueriedRange(t *testing.T) { mimirpb.Sample{TimestampMs: queryStart.Add(+0*time.Minute).Unix() * 1000, Value: 29}, mimirpb.Sample{TimestampMs: queryStart.Add(+1*time.Minute).Unix() * 1000, Value: 31}, mimirpb.Sample{TimestampMs: queryStart.Add(+2*time.Minute).Unix() * 1000, Value: 37}, - }), + }, false), }, // Series with data points after queryEnd. { @@ -305,7 +305,7 @@ func TestQuerier_QueryableReturnsChunksOutsideQueriedRange(t *testing.T) { mimirpb.Sample{TimestampMs: queryStart.Add(+5*time.Minute).Unix() * 1000, Value: 43}, mimirpb.Sample{TimestampMs: queryStart.Add(+6*time.Minute).Unix() * 1000, Value: 47}, mimirpb.Sample{TimestampMs: queryStart.Add(+7*time.Minute).Unix() * 1000, Value: 53}, - }), + }, false), }, }, }, @@ -374,8 +374,8 @@ func TestBatchMergeChunks(t *testing.T) { } } - c1 := convertToChunks(t, samplesToInterface(s1)) - c2 := convertToChunks(t, samplesToInterface(s2)) + c1 := convertToChunks(t, samplesToInterface(s1), false) + c2 := convertToChunks(t, samplesToInterface(s2), false) chunks12 := []client.Chunk{} chunks12 = append(chunks12, c1...) chunks12 = append(chunks12, c2...) diff --git a/pkg/storage/chunk/chunk.go b/pkg/storage/chunk/chunk.go index 03e8c746c30..99fa5085163 100644 --- a/pkg/storage/chunk/chunk.go +++ b/pkg/storage/chunk/chunk.go @@ -102,6 +102,11 @@ const BatchSize = 12 type Batch struct { Timestamps [BatchSize]int64 // Values stores float values related to this batch if ValueType is chunkenc.ValFloat. + // If ValueType is chunkenc.ValHistogram or chunkenc.ValFloatHistogram, it is used to store the iteratorID the + // pointer value at the same index comes from. The iteratorID is required to ensure the counter reset is calculated + // properly - if we switch between different iterators, we cannot trust the previously calculated counter reset for + // the next sample is calculated correctly. See https://github.com/prometheus/prometheus/issues/15346 for more + // information. Values is reused in two different scenarios to save space. Values [BatchSize]float64 // PointerValues store pointers to non-float complex values like histograms, float histograms or future additions. // Since Batch is expected to be passed by value, the array needs to be constant sized, @@ -140,6 +145,18 @@ func (b *Batch) AtFloatHistogram() (int64, unsafe.Pointer) { return b.Timestamps[b.Index], b.PointerValues[b.Index] } +// GetIteratorID retrieves the non overlapping iterator id. +// Only call when type is chunkenc.ValHistogram or chunkenc.ValFloatHistogram. +func (b *Batch) GetIteratorID() int { + return int(b.Values[b.Index]) +} + +// SetIteratorID stores the non overlapping iterator id. +// Only call when type is chunkenc.ValHistogram or chunkenc.ValFloatHistogram. +func (b *Batch) SetIteratorID(id int) { + b.Values[b.Index] = float64(id) +} + // Chunk contains encoded timeseries data type Chunk struct { From model.Time `json:"from"` diff --git a/pkg/storage/chunk/prometheus_chunk.go b/pkg/storage/chunk/prometheus_chunk.go index ff9d9baed78..c33223b920b 100644 --- a/pkg/storage/chunk/prometheus_chunk.go +++ b/pkg/storage/chunk/prometheus_chunk.go @@ -117,8 +117,7 @@ func (p *prometheusHistogramChunk) AddFloatHistogram(_ int64, _ *histogram.Float } // AddHistogram adds another histogram to the chunk. While AddHistogram works, it is only implemented to make tests -// work, and should not be used in production. In particular, it appends all histograms to single chunk, and uses new -// Appender for each invocation. +// work, and should not be used in production. In particular, it uses a new Appender for each invocation. func (p *prometheusHistogramChunk) AddHistogram(timestamp int64, h *histogram.Histogram) (EncodedChunk, error) { if p.chunk == nil { p.chunk = chunkenc.NewHistogramChunk() @@ -129,8 +128,13 @@ func (p *prometheusHistogramChunk) AddHistogram(timestamp int64, h *histogram.Hi return nil, err } - _, _, _, err = app.AppendHistogram(nil, timestamp, h, true) - return nil, err + c, recoded, _, err := app.AppendHistogram(nil, timestamp, h, false) + if err != nil || c == nil || recoded { + return nil, err + } + oP := newPrometheusHistogramChunk() + oP.chunk = c + return oP, nil } func (p *prometheusHistogramChunk) UnmarshalFromBuf(bytes []byte) error { diff --git a/pkg/util/test/histogram.go b/pkg/util/test/histogram.go index 5b7112cc299..cb6e73a6420 100644 --- a/pkg/util/test/histogram.go +++ b/pkg/util/test/histogram.go @@ -99,10 +99,10 @@ func GenerateTestSampleHistogram(i int) *model.SampleHistogram { // RequireHistogramEqual requires the two histograms to be equal. func RequireHistogramEqual(t require.TestingT, expected, actual *histogram.Histogram, msgAndArgs ...interface{}) { - require.EqualValues(t, expected, actual, msgAndArgs) + require.EqualValues(t, expected, actual, msgAndArgs...) } // RequireFloatHistogramEqual requires the two float histograms to be equal. func RequireFloatHistogramEqual(t require.TestingT, expected, actual *histogram.FloatHistogram, msgAndArgs ...interface{}) { - require.EqualValues(t, expected, actual, msgAndArgs) + require.EqualValues(t, expected, actual, msgAndArgs...) }