Skip to content

Commit

Permalink
Ensure native histograms counter reset hints are corrected when mergi…
Browse files Browse the repository at this point in the history
…ng results from different sources (#9909)

Prometheus will clear the counter reset flag from all chunks since prometheus/prometheus#15343, thus mixing in-order and out-of-order chunks does not lead to over- or under-detecting counter resets. 

However chunks may be overlapping and the merge algorithm in Mimir has to also fix counter reset over- and
under-detection in that case as well. The strategy is to keep track of which non-overlapping iterator a chunk
comes from and set the reset hint to unknown whenever the iterator is changed. Similar to what the chained
sample iterator does in Prometheus, but a little more complicated.
https://github.com/prometheus/prometheus/blob/cd1f8ac129a289be8e7d98b6de57a9ba5814c406/storage/merge.go#L490

* Add krajo's test (currently failing)
* Add test showing single ingester return
* add failing test
* Fix but overdetecting unknowns
* Link batches from same iterator
* clarify comment
* Update pkg/querier/batch/stream.go
* Update pkg/querier/batch/stream.go
* Fix stream test
* Fix nh counter resets when merging overlapping chunks
* Set UCR when NCR for some tests
* Pass id as constructor arg
* Remove unused function
* Add tests for checking hints
* More complex reset tests
* Add interleaving test
* Clean up ingester test
* Refactor ingester querying tests and add streaming case
* Add store-gw tests

* Update CHANGELOG

Signed-off-by: György Krajcsovits <[email protected]>
Co-authored-by: George Krajcsovits <[email protected]>
  • Loading branch information
fionaliao and krajorama authored Dec 2, 2024
1 parent d43cc31 commit 3ed38a7
Show file tree
Hide file tree
Showing 16 changed files with 853 additions and 75 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
* [BUGFIX] Fix pooling buffer reuse logic when `-distributor.max-request-pool-buffer-size` is set. #9666
* [BUGFIX] Fix issue when using the experimental `-ruler.max-independent-rule-evaluation-concurrency` feature, where the ruler could panic as it updates a running ruleset or shutdowns. #9726
* [BUGFIX] Always return unknown hint for first sample in non-gauge native histograms chunk to avoid incorrect counter reset hints when merging chunks from different sources. #10033
* [BUGFIX] Ensure native histograms counter reset hints are corrected when merging results from different sources. #9909
* [BUGFIX] Ingester: Fix race condition in per-tenant TSDB creation. #9708
* [BUGFIX] Ingester: Fix race condition in exemplar adding. #9765
* [BUGFIX] Ingester: Fix race condition in native histogram appending. #9765
Expand Down
177 changes: 177 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
Expand Down Expand Up @@ -5749,6 +5750,182 @@ func TestIngester_QueryExemplars(t *testing.T) {
})
}

// This test shows a single ingester returns compacted OOO and in-order chunks separately after compaction, even if they overlap.
func TestIngester_QueryStream_CounterResets(t *testing.T) {
// Create ingester.
cfg := defaultIngesterTestConfig(t)
cfg.BlocksStorageConfig.TSDB.HeadCompactionInterval = 1 * time.Hour // Long enough to not be reached during the test.
cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout = 1 * time.Second
cfg.BlocksStorageConfig.TSDB.HeadCompactionIntervalJitterEnabled = false
cfg.TSDBConfigUpdatePeriod = 1 * time.Second

// Set the OOO window to 30 minutes and enable native histograms.
limits := map[string]*validation.Limits{
userID: {
OutOfOrderTimeWindow: model.Duration(30 * time.Minute),
OOONativeHistogramsIngestionEnabled: true,
NativeHistogramsIngestionEnabled: true,
},
}
override, err := validation.NewOverrides(defaultLimitsTestConfig(), validation.NewMockTenantLimits(limits))
require.NoError(t, err)

i, err := prepareIngesterWithBlockStorageAndOverrides(t, cfg, override, nil, "", "", nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Wait until it's healthy.
test.Poll(t, 1*time.Second, 1, func() interface{} {
return i.lifecycler.HealthyInstancesCount()
})

// Push series.
ctx := user.InjectOrgID(context.Background(), userID)

histLbls := labels.FromStrings(labels.MetricName, "foo", "series_id", strconv.Itoa(0), "type", "histogram")
histReq := mockHistogramWriteRequest(histLbls, int64(0), 4, false)
_, err = i.Push(ctx, histReq)
require.NoError(t, err)

histReq = mockHistogramWriteRequest(histLbls, int64(2), 6, false)
_, err = i.Push(ctx, histReq)
require.NoError(t, err)

histReq = mockHistogramWriteRequest(histLbls, int64(4), 8, false)
_, err = i.Push(ctx, histReq)
require.NoError(t, err)

histReq = mockHistogramWriteRequest(histLbls, int64(1), 2, false)
_, err = i.Push(ctx, histReq)
require.NoError(t, err)

histReq = mockHistogramWriteRequest(histLbls, int64(3), 3, false)
_, err = i.Push(ctx, histReq)
require.NoError(t, err)

// Create a GRPC server used to query back the data.
serv := grpc.NewServer(grpc.StreamInterceptor(middleware.StreamServerUserHeaderInterceptor))
defer serv.GracefulStop()
client.RegisterIngesterServer(serv, i)

listener, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)

go func() {
require.NoError(t, serv.Serve(listener))
}()

inst := ring.InstanceDesc{Id: "test", Addr: listener.Addr().String()}
c, err := client.MakeIngesterClient(inst, defaultClientTestConfig(), client.NewMetrics(nil))
require.NoError(t, err)
defer c.Close()

runQuery := func() ([]chunkenc.CounterResetHeader, [][]sample) {
s, err := c.QueryStream(ctx, &client.QueryRequest{
StartTimestampMs: 0,
EndTimestampMs: 5,

Matchers: []*client.LabelMatcher{{
Type: client.EQUAL,
Name: model.MetricNameLabel,
Value: "foo",
}},
})
require.NoError(t, err)

recvMsgs := 0

chunks := []client.Chunk{}
for {
resp, err := s.Recv()
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)

for _, c := range resp.Chunkseries {
chunks = append(chunks, c.Chunks...)
}
recvMsgs++
}

require.Equal(t, recvMsgs, 1)
// Sort chunks by time
sort.Slice(chunks, func(i, j int) bool {
return chunks[i].StartTimestampMs < chunks[j].StartTimestampMs
})

headers := []chunkenc.CounterResetHeader{}
var samples [][]sample
for _, c := range chunks {
require.Equal(t, c.Encoding, int32(chunk.PrometheusHistogramChunk))
chk, err := chunkenc.FromData(chunkenc.EncHistogram, c.Data)
require.NoError(t, err)

s := []sample{}
it := chk.Iterator(nil)
for it.Next() != chunkenc.ValNone {
ts, h := it.AtHistogram(nil)
s = append(s, sample{t: ts, h: h})
}
samples = append(samples, s)
headers = append(headers, chk.(*chunkenc.HistogramChunk).GetCounterResetHeader())
}
return headers, samples
}

// Check samples before compaction (OOO and in-order samples are merged when both are in the head).
actHeaders, actSamples := runQuery()
require.Equal(t, []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset, chunkenc.CounterReset}, actHeaders)
require.Equal(t, [][]sample{
{
{t: 0, h: histogramWithHint(4, histogram.UnknownCounterReset)},
},
{
{t: 1, h: histogramWithHint(2, histogram.UnknownCounterReset)},
{t: 2, h: histogramWithHint(6, histogram.NotCounterReset)},
},
{
{t: 3, h: histogramWithHint(3, histogram.UnknownCounterReset)},
{t: 4, h: histogramWithHint(8, histogram.NotCounterReset)},
},
}, actSamples)

time.Sleep(time.Duration(float64(cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout) * (1 + compactionIdleTimeoutJitter)))

// Compaction
i.compactBlocks(context.Background(), false, 0, nil) // Should be compacted because the TSDB is idle.
verifyCompactedHead(t, i, true)

defer c.Close()

actHeaders, actSamples = runQuery()
require.Equal(t, []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset}, actHeaders)
require.Equal(t, [][]sample{
{
{t: 0, h: histogramWithHint(4, histogram.UnknownCounterReset)},
{t: 2, h: histogramWithHint(6, histogram.NotCounterReset)},
{t: 4, h: histogramWithHint(8, histogram.NotCounterReset)},
},
{
{t: 1, h: histogramWithHint(2, histogram.UnknownCounterReset)},
{t: 3, h: histogramWithHint(3, histogram.NotCounterReset)},
},
}, actSamples)
}

func histogramWithHint(idx int, hint histogram.CounterResetHint) *histogram.Histogram {
h := util_test.GenerateTestHistogram(idx)
h.CounterResetHint = hint
return h
}

type sample struct {
t int64
h *histogram.Histogram
}

func writeRequestSingleSeries(lbls labels.Labels, samples []mimirpb.Sample) *mimirpb.WriteRequest {
req := &mimirpb.WriteRequest{
Source: mimirpb.API,
Expand Down
57 changes: 47 additions & 10 deletions pkg/querier/batch/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package batch

import (
"slices"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -82,7 +83,15 @@ func mkGenericChunk(t require.TestingT, from model.Time, points int, encoding ch
return NewGenericChunk(int64(ck.From), int64(ck.Through), ck.Data.NewIterator)
}

func testIter(t require.TestingT, points int, iter chunkenc.Iterator, encoding chunk.Encoding) {
type testBatchOptions uint

const (
// setNotCounterResetHintsAsUnknown can be used in cases where it's onerous to generate all the expected counter
// reset hints (e.g. merging lots of chunks together).
setNotCounterResetHintsAsUnknown testBatchOptions = iota
)

func testIter(t require.TestingT, points int, iter chunkenc.Iterator, encoding chunk.Encoding, opts ...testBatchOptions) {
nextExpectedTS := model.TimeFromUnix(0)
var assertPoint func(i int)
switch encoding {
Expand All @@ -100,8 +109,15 @@ func testIter(t require.TestingT, points int, iter chunkenc.Iterator, encoding c
ts, h := iter.AtHistogram(nil)
require.EqualValues(t, int64(nextExpectedTS), ts, strconv.Itoa(i))
expH := test.GenerateTestHistogram(int(nextExpectedTS))
if nextExpectedTS > 0 {
expH.CounterResetHint = histogram.NotCounterReset
if slices.Contains(opts, setNotCounterResetHintsAsUnknown) {
if h.CounterResetHint == histogram.NotCounterReset {
h.CounterResetHint = histogram.UnknownCounterReset
expH.CounterResetHint = histogram.UnknownCounterReset
}
} else {
if nextExpectedTS > 0 {
expH.CounterResetHint = histogram.NotCounterReset
}
}
test.RequireHistogramEqual(t, expH, h, strconv.Itoa(i))
nextExpectedTS = nextExpectedTS.Add(step)
Expand All @@ -112,8 +128,15 @@ func testIter(t require.TestingT, points int, iter chunkenc.Iterator, encoding c
ts, fh := iter.AtFloatHistogram(nil)
require.EqualValues(t, int64(nextExpectedTS), ts, strconv.Itoa(i))
expFH := test.GenerateTestFloatHistogram(int(nextExpectedTS))
if nextExpectedTS > 0 {
expFH.CounterResetHint = histogram.NotCounterReset
if slices.Contains(opts, setNotCounterResetHintsAsUnknown) {
if fh.CounterResetHint == histogram.NotCounterReset {
fh.CounterResetHint = histogram.UnknownCounterReset
expFH.CounterResetHint = histogram.UnknownCounterReset
}
} else {
if nextExpectedTS > 0 {
expFH.CounterResetHint = histogram.NotCounterReset
}
}
test.RequireFloatHistogramEqual(t, expFH, fh, strconv.Itoa(i))
nextExpectedTS = nextExpectedTS.Add(step)
Expand All @@ -127,7 +150,7 @@ func testIter(t require.TestingT, points int, iter chunkenc.Iterator, encoding c
require.Equal(t, chunkenc.ValNone, iter.Next())
}

func testSeek(t require.TestingT, points int, iter chunkenc.Iterator, encoding chunk.Encoding) {
func testSeek(t require.TestingT, points int, iter chunkenc.Iterator, encoding chunk.Encoding, opts ...testBatchOptions) {
var assertPoint func(expectedTS int64, valType chunkenc.ValueType)
switch encoding {
case chunk.PrometheusXorChunk:
Expand All @@ -144,8 +167,15 @@ func testSeek(t require.TestingT, points int, iter chunkenc.Iterator, encoding c
ts, h := iter.AtHistogram(nil)
require.EqualValues(t, expectedTS, ts)
expH := test.GenerateTestHistogram(int(expectedTS))
if expectedTS > 0 {
expH.CounterResetHint = histogram.NotCounterReset
if slices.Contains(opts, setNotCounterResetHintsAsUnknown) {
if h.CounterResetHint == histogram.NotCounterReset {
h.CounterResetHint = histogram.UnknownCounterReset
expH.CounterResetHint = histogram.UnknownCounterReset
}
} else {
if expectedTS > 0 {
expH.CounterResetHint = histogram.NotCounterReset
}
}
test.RequireHistogramEqual(t, expH, h)
require.NoError(t, iter.Err())
Expand All @@ -156,8 +186,15 @@ func testSeek(t require.TestingT, points int, iter chunkenc.Iterator, encoding c
ts, fh := iter.AtFloatHistogram(nil)
require.EqualValues(t, expectedTS, ts)
expFH := test.GenerateTestFloatHistogram(int(expectedTS))
if expectedTS > 0 {
expFH.CounterResetHint = histogram.NotCounterReset
if slices.Contains(opts, setNotCounterResetHintsAsUnknown) {
if fh.CounterResetHint == histogram.NotCounterReset {
fh.CounterResetHint = histogram.UnknownCounterReset
expFH.CounterResetHint = histogram.UnknownCounterReset
}
} else {
if expectedTS > 0 {
expFH.CounterResetHint = histogram.NotCounterReset
}
}
test.RequireFloatHistogramEqual(t, expFH, fh)
require.NoError(t, iter.Err())
Expand Down
8 changes: 4 additions & 4 deletions pkg/querier/batch/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func newMergeIterator(it iterator, cs []GenericChunk) *mergeIterator {
c.batches = newBatchStream(len(c.its), &c.hPool, &c.fhPool)
}
for i, cs := range css {
c.its[i] = newNonOverlappingIterator(c.its[i], cs, &c.hPool, &c.fhPool)
c.its[i] = newNonOverlappingIterator(c.its[i], i, cs, &c.hPool, &c.fhPool)
}

for _, iter := range c.its {
Expand Down Expand Up @@ -138,7 +138,7 @@ func (c *mergeIterator) buildNextBatch(size int) chunkenc.ValueType {
// is before all iterators next entry.
for len(c.h) > 0 && (c.batches.len() == 0 || c.nextBatchEndTime() >= c.h[0].AtTime()) {
batch := c.h[0].Batch()
c.batches.merge(&batch, size)
c.batches.merge(&batch, size, c.h[0].id)

if c.h[0].Next(size) != chunkenc.ValNone {
heap.Fix(&c.h, 0)
Expand All @@ -165,7 +165,7 @@ func (c *mergeIterator) Err() error {
return c.currErr
}

type iteratorHeap []iterator
type iteratorHeap []*nonOverlappingIterator

func (h *iteratorHeap) Len() int { return len(*h) }
func (h *iteratorHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] }
Expand All @@ -177,7 +177,7 @@ func (h *iteratorHeap) Less(i, j int) bool {
}

func (h *iteratorHeap) Push(x interface{}) {
*h = append(*h, x.(iterator))
*h = append(*h, x.(*nonOverlappingIterator))
}

func (h *iteratorHeap) Pop() interface{} {
Expand Down
Loading

0 comments on commit 3ed38a7

Please sign in to comment.