Skip to content

Commit

Permalink
Use slice pooling to populate the query stream response (#6466)
Browse files Browse the repository at this point in the history
Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot authored Dec 31, 2024
1 parent 5d593f5 commit 9d9d4bf
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 25 deletions.
58 changes: 38 additions & 20 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/prometheus/prometheus/util/zeropool"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/shipper"
Expand Down Expand Up @@ -95,6 +96,8 @@ const (
var (
errExemplarRef = errors.New("exemplars not ingested because series not already present")
errIngesterStopping = errors.New("ingester stopping")

tsChunksPool zeropool.Pool[[]client.TimeSeriesChunk]
)

// Config for an Ingester.
Expand Down Expand Up @@ -2055,7 +2058,8 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
return 0, 0, 0, 0, ss.Err()
}

chunkSeries := make([]client.TimeSeriesChunk, 0, queryStreamBatchSize)
chunkSeries := getTimeSeriesChunksSlice()
defer putTimeSeriesChunksSlice(chunkSeries)
batchSizeBytes := 0
var it chunks.Iterator
for ss.Next() {
Expand Down Expand Up @@ -3072,6 +3076,31 @@ func (i *Ingester) ModeHandler(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(respMsg))
}

func (i *Ingester) getInstanceLimits() *InstanceLimits {
// Don't apply any limits while starting. We especially don't want to apply series in memory limit while replaying WAL.
if i.State() == services.Starting {
return nil
}

if i.cfg.InstanceLimitsFn == nil {
return defaultInstanceLimits
}

l := i.cfg.InstanceLimitsFn()
if l == nil {
return defaultInstanceLimits
}

return l
}

// stopIncomingRequests is called during the shutdown process.
func (i *Ingester) stopIncomingRequests() {
i.stoppedMtx.Lock()
defer i.stoppedMtx.Unlock()
i.stopped = true
}

// metadataQueryRange returns the best range to query for metadata queries based on the timerange in the ingester.
func metadataQueryRange(queryStart, queryEnd int64, db *userTSDB, queryIngestersWithin time.Duration) (mint, maxt int64, err error) {
if queryIngestersWithin > 0 {
Expand Down Expand Up @@ -3129,27 +3158,16 @@ func wrappedTSDBIngestExemplarErr(ingestErr error, timestamp model.Time, seriesL
)
}

func (i *Ingester) getInstanceLimits() *InstanceLimits {
// Don't apply any limits while starting. We especially don't want to apply series in memory limit while replaying WAL.
if i.State() == services.Starting {
return nil
}

if i.cfg.InstanceLimitsFn == nil {
return defaultInstanceLimits
func getTimeSeriesChunksSlice() []client.TimeSeriesChunk {
if p := tsChunksPool.Get(); p != nil {
return p
}

l := i.cfg.InstanceLimitsFn()
if l == nil {
return defaultInstanceLimits
}

return l
return make([]client.TimeSeriesChunk, 0, queryStreamBatchSize)
}

// stopIncomingRequests is called during the shutdown process.
func (i *Ingester) stopIncomingRequests() {
i.stoppedMtx.Lock()
defer i.stoppedMtx.Unlock()
i.stopped = true
func putTimeSeriesChunksSlice(p []client.TimeSeriesChunk) {
if p != nil {
tsChunksPool.Put(p[:0])
}
}
24 changes: 19 additions & 5 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3445,10 +3445,25 @@ func (m *mockQueryStreamServer) Context() context.Context {
}

func BenchmarkIngester_QueryStream_Chunks(b *testing.B) {
benchmarkQueryStream(b)
tc := []struct {
samplesCount, seriesCount int
}{
{samplesCount: 10, seriesCount: 10},
{samplesCount: 10, seriesCount: 50},
{samplesCount: 10, seriesCount: 100},
{samplesCount: 50, seriesCount: 10},
{samplesCount: 50, seriesCount: 50},
{samplesCount: 50, seriesCount: 100},
}

for _, c := range tc {
b.Run(fmt.Sprintf("samplesCount=%v; seriesCount=%v", c.samplesCount, c.seriesCount), func(b *testing.B) {
benchmarkQueryStream(b, c.samplesCount, c.seriesCount)
})
}
}

func benchmarkQueryStream(b *testing.B) {
func benchmarkQueryStream(b *testing.B, samplesCount, seriesCount int) {
cfg := defaultIngesterTestConfig(b)

// Create ingester.
Expand All @@ -3465,7 +3480,6 @@ func benchmarkQueryStream(b *testing.B) {
// Push series.
ctx := user.InjectOrgID(context.Background(), userID)

const samplesCount = 1000
samples := make([]cortexpb.Sample, 0, samplesCount)

for i := 0; i < samplesCount; i++ {
Expand All @@ -3475,15 +3489,14 @@ func benchmarkQueryStream(b *testing.B) {
})
}

const seriesCount = 100
for s := 0; s < seriesCount; s++ {
_, err = i.Push(ctx, writeRequestSingleSeries(labels.Labels{{Name: labels.MetricName, Value: "foo"}, {Name: "l", Value: strconv.Itoa(s)}}, samples))
require.NoError(b, err)
}

req := &client.QueryRequest{
StartTimestampMs: 0,
EndTimestampMs: samplesCount + 1,
EndTimestampMs: int64(samplesCount + 1),

Matchers: []*client.LabelMatcher{{
Type: client.EQUAL,
Expand All @@ -3495,6 +3508,7 @@ func benchmarkQueryStream(b *testing.B) {
mockStream := &mockQueryStreamServer{ctx: ctx}

b.ResetTimer()
b.ReportAllocs()

for ix := 0; ix < b.N; ix++ {
err := i.QueryStream(req, mockStream)
Expand Down

0 comments on commit 9d9d4bf

Please sign in to comment.