Skip to content

Commit

Permalink
Improve streaming on MetricsForLabelMatchersStream method (#6436)
Browse files Browse the repository at this point in the history
* Improve streaming on MetricsForLabelMatchersStream method

Signed-off-by: alanprot <[email protected]>

* Update pkg/ingester/ingester.go

Co-authored-by: SungJin1212 <[email protected]>
Signed-off-by: Alan Protasio <[email protected]>

---------

Signed-off-by: alanprot <[email protected]>
Signed-off-by: Alan Protasio <[email protected]>
Co-authored-by: SungJin1212 <[email protected]>
  • Loading branch information
alanprot and SungJin1212 authored Dec 31, 2024
1 parent c243963 commit 5d593f5
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 30 deletions.
70 changes: 40 additions & 30 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1699,27 +1699,42 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR

// MetricsForLabelMatchers returns all the metrics which match a set of matchers.
func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (*client.MetricsForLabelMatchersResponse, error) {
result, cleanup, err := i.metricsForLabelMatchersCommon(ctx, req)
result := &client.MetricsForLabelMatchersResponse{}
cleanup, err := i.metricsForLabelMatchersCommon(ctx, req, func(l labels.Labels) error {
result.Metric = append(result.Metric, &cortexpb.Metric{
Labels: cortexpb.FromLabelsToLabelAdapters(l),
})
return nil
})
defer cleanup()
return result, err
}

func (i *Ingester) MetricsForLabelMatchersStream(req *client.MetricsForLabelMatchersRequest, stream client.Ingester_MetricsForLabelMatchersStreamServer) error {
result, cleanup, err := i.metricsForLabelMatchersCommon(stream.Context(), req)
result := &client.MetricsForLabelMatchersStreamResponse{}

cleanup, err := i.metricsForLabelMatchersCommon(stream.Context(), req, func(l labels.Labels) error {
result.Metric = append(result.Metric, &cortexpb.Metric{
Labels: cortexpb.FromLabelsToLabelAdapters(l),
})

if len(result.Metric) >= metadataStreamBatchSize {
err := client.SendMetricsForLabelMatchersStream(stream, result)
if err != nil {
return err
}
result.Metric = result.Metric[:0]
}
return nil
})
defer cleanup()
if err != nil {
return err
}

for i := 0; i < len(result.Metric); i += metadataStreamBatchSize {
j := i + metadataStreamBatchSize
if j > len(result.Metric) {
j = len(result.Metric)
}
resp := &client.MetricsForLabelMatchersStreamResponse{
Metric: result.Metric[i:j],
}
err := client.SendMetricsForLabelMatchersStream(stream, resp)
// Send last batch
if len(result.Metric) > 0 {
err := client.SendMetricsForLabelMatchersStream(stream, result)
if err != nil {
return err
}
Expand All @@ -1731,36 +1746,36 @@ func (i *Ingester) MetricsForLabelMatchersStream(req *client.MetricsForLabelMatc
// metricsForLabelMatchersCommon returns all the metrics which match a set of matchers.
// this should be used by MetricsForLabelMatchers and MetricsForLabelMatchersStream.
// the cleanup function should be called in order to close the querier
func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (*client.MetricsForLabelMatchersResponse, func(), error) {
func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *client.MetricsForLabelMatchersRequest, acc func(labels.Labels) error) (func(), error) {
cleanup := func() {}
if err := i.checkRunning(); err != nil {
return nil, cleanup, err
return cleanup, err
}

userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, cleanup, err
return cleanup, err
}

db := i.getTSDB(userID)
if db == nil {
return &client.MetricsForLabelMatchersResponse{}, cleanup, nil
return cleanup, nil
}

// Parse the request
_, _, limit, matchersSet, err := client.FromMetricsForLabelMatchersRequest(req)
if err != nil {
return nil, cleanup, err
return cleanup, err
}

mint, maxt, err := metadataQueryRange(req.StartTimestampMs, req.EndTimestampMs, db, i.cfg.QueryIngestersWithin)
if err != nil {
return nil, cleanup, err
return cleanup, err
}

q, err := db.Querier(mint, maxt)
if err != nil {
return nil, cleanup, err
return cleanup, err
}

cleanup = func() {
Expand All @@ -1783,7 +1798,7 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien
for _, matchers := range matchersSet {
// Interrupt if the context has been canceled.
if ctx.Err() != nil {
return nil, cleanup, ctx.Err()
return cleanup, ctx.Err()
}

seriesSet := q.Select(ctx, true, hints, matchers...)
Expand All @@ -1794,28 +1809,23 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien
mergedSet = q.Select(ctx, false, hints, matchersSet[0]...)
}

// Generate the response merging all series sets.
result := &client.MetricsForLabelMatchersResponse{
Metric: make([]*cortexpb.Metric, 0),
}

cnt := 0
for mergedSet.Next() {
cnt++
// Interrupt if the context has been canceled.
if cnt%util.CheckContextEveryNIterations == 0 && ctx.Err() != nil {
return nil, cleanup, ctx.Err()
return cleanup, ctx.Err()
}
if err := acc(mergedSet.At().Labels()); err != nil {
return cleanup, err
}

result.Metric = append(result.Metric, &cortexpb.Metric{
Labels: cortexpb.FromLabelsToLabelAdapters(mergedSet.At().Labels()),
})
if limit > 0 && len(result.Metric) >= limit {
if limit > 0 && cnt >= limit {
break
}
}

return result, cleanup, nil
return cleanup, nil
}

// MetricsMetadata returns all the metric metadata of a user.
Expand Down
21 changes: 21 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3073,6 +3073,12 @@ func Test_Ingester_MetricsForLabelMatchers(t *testing.T) {
res, err := i.MetricsForLabelMatchers(ctx, req)
require.NoError(t, err)
assert.ElementsMatch(t, testData.expected, res.Metric)

// Stream
ss := mockMetricsForLabelMatchersStreamServer{ctx: ctx}
err = i.MetricsForLabelMatchersStream(req, &ss)
require.NoError(t, err)
assert.ElementsMatch(t, testData.expected, ss.res.Metric)
})
}
}
Expand Down Expand Up @@ -3407,6 +3413,21 @@ func writeRequestSingleSeries(lbls labels.Labels, samples []cortexpb.Sample) *co
return req
}

type mockMetricsForLabelMatchersStreamServer struct {
grpc.ServerStream
ctx context.Context
res client.MetricsForLabelMatchersStreamResponse
}

func (m *mockMetricsForLabelMatchersStreamServer) Send(response *client.MetricsForLabelMatchersStreamResponse) error {
m.res.Metric = append(m.res.Metric, response.Metric...)
return nil
}

func (m *mockMetricsForLabelMatchersStreamServer) Context() context.Context {
return m.ctx
}

type mockQueryStreamServer struct {
grpc.ServerStream
ctx context.Context
Expand Down

0 comments on commit 5d593f5

Please sign in to comment.