Skip to content

Commit

Permalink
Support exemplar federated query
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 committed Dec 24, 2024
1 parent ea848b6 commit 1015a30
Show file tree
Hide file tree
Showing 4 changed files with 587 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
* [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256
* [FEATURE] Ruler: Add support for per-user external labels #6340
* [FEATURE] Query Frontend: Support an exemplar federated query when `-tenant-federation.enabled=true`. #6455
* [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423
* [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388
* [ENHANCEMENT] Ingester: Add metrics to track succeed/failed native histograms. #6370
Expand Down
1 change: 1 addition & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ func (t *Cortex) initTenantFederation() (serv services.Service, err error) {
// federation.
byPassForSingleQuerier := true
t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, byPassForSingleQuerier))
t.ExemplarQueryable = tenantfederation.NewExemplarQueryable(t.ExemplarQueryable, byPassForSingleQuerier, prometheus.DefaultRegisterer)
}
return nil, nil
}
Expand Down
211 changes: 211 additions & 0 deletions pkg/querier/tenantfederation/exemplar_merge_queryable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
package tenantfederation

import (
"context"
"fmt"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util/concurrency"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
)

// NewExemplarQueryable returns a exemplarQueryable that iterates through all the
// tenant IDs that are part of the request and aggregates the results from each
// tenant's ExemplarQuerier by sending of subsequent requests.
// By setting byPassWithSingleQuerier to true the mergeExemplarQuerier gets by-passed
// and results for request with a single exemplar querier will not contain the
// "__tenant_id__" label. This allows a smoother transition, when enabling
// tenant federation in a cluster.
// The result contains a label "__tenant_id__" to identify the tenant ID that
// it originally resulted from.
// If the label "__tenant_id__" is already existing, its value is overwritten
// by the tenant ID and the previous value is exposed through a new label
// prefixed with "original_". This behaviour is not implemented recursively.
func NewExemplarQueryable(upstream storage.ExemplarQueryable, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.ExemplarQueryable {
return NewMergeExemplarQueryable(defaultTenantLabel, tenantExemplarQuerierCallback(upstream), byPassWithSingleQuerier, reg)
}

func tenantExemplarQuerierCallback(exemplarQueryable storage.ExemplarQueryable) MergeExemplarQuerierCallback {
return func(ctx context.Context) ([]string, []storage.ExemplarQuerier, error) {
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, nil, err
}

var queriers = make([]storage.ExemplarQuerier, len(tenantIDs))
for pos, tenantID := range tenantIDs {
q, err := exemplarQueryable.ExemplarQuerier(user.InjectOrgID(ctx, tenantID))
if err != nil {
return nil, nil, err
}
queriers[pos] = q
}

return tenantIDs, queriers, nil
}
}

// MergeExemplarQuerierCallback returns the underlying exemplar queriers and their
// IDs relevant for the query.
type MergeExemplarQuerierCallback func(ctx context.Context) (ids []string, queriers []storage.ExemplarQuerier, err error)

// NewMergeExemplarQueryable returns a queryable that merges results from multiple
// underlying ExemplarQueryables.
// By setting byPassWithSingleQuerier to true the mergeExemplarQuerier gets by-passed
// and results for request with a single exemplar querier will not contain the
// "__tenant_id__" label. This allows a smoother transition, when enabling
// tenant federation in a cluster.
// Results contain a label `idLabelName` to identify the underlying exemplar queryable
// that it originally resulted from.
// If the label `idLabelName` is already existing, its value is overwritten and
// the previous value is exposed through a new label prefixed with "original_".
// This behaviour is not implemented recursively.
func NewMergeExemplarQueryable(idLabelName string, callback MergeExemplarQuerierCallback, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.ExemplarQueryable {
return &mergeExemplarQueryable{
idLabelName: idLabelName,
byPassWithSingleQuerier: byPassWithSingleQuerier,
callback: callback,

tenantsPerExemplarQuery: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "querier_federated_tenants_per_exemplar_query",
Help: "Number of tenants per exemplar query.",
Buckets: []float64{1, 2, 4, 8, 16, 32, 64},
}),
}
}

type mergeExemplarQueryable struct {
idLabelName string
byPassWithSingleQuerier bool
callback MergeExemplarQuerierCallback
tenantsPerExemplarQuery prometheus.Histogram
}

// ExemplarQuerier returns a new mergeExemplarQuerier which aggregates results from
// multiple exemplar queriers into a single result.
func (m *mergeExemplarQueryable) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) {
ids, queriers, err := m.callback(ctx)
if err != nil {
return nil, err
}

m.tenantsPerExemplarQuery.Observe(float64(len(ids)))

if m.byPassWithSingleQuerier && len(queriers) == 1 {
return queriers[0], nil
}

return &mergeExemplarQuerier{
ctx: ctx,
idLabelName: m.idLabelName,
tenantIds: ids,
queriers: queriers,
byPassWithSingleQuerier: m.byPassWithSingleQuerier,
}, nil
}

// mergeExemplarQuerier aggregates the results from underlying exemplar queriers
// and adds a label `idLabelName` to identify the exemplar queryable that
// `seriesLabels` resulted from.
// If the label `idLabelName` is already existing, its value is overwritten and
// the previous value is exposed through a new label prefixed with "original_".
// This behaviour is not implemented recursively.
type mergeExemplarQuerier struct {
ctx context.Context
idLabelName string
tenantIds []string
queriers []storage.ExemplarQuerier
byPassWithSingleQuerier bool
}

type exemplarSelectJob struct {
pos int
querier storage.ExemplarQuerier
id string
}

// Select returns aggregated exemplars within given time range for multiple tenants.
func (m mergeExemplarQuerier) Select(start, end int64, matchers ...[]*labels.Matcher) ([]exemplar.QueryResult, error) {
log, ctx := spanlogger.New(m.ctx, "mergeExemplarQuerier.Select")
defer log.Span.Finish()

// filter out tenants to query and unrelated matchers
allMatchedTenantIds, allUnrelatedMatchers := filterAllTenantsAndMatchers(m.idLabelName, m.tenantIds, matchers)
jobs := make([]interface{}, len(allMatchedTenantIds))
results := make([][]exemplar.QueryResult, len(allMatchedTenantIds))

var jobPos int
for idx, tenantId := range m.tenantIds {
if _, ok := allMatchedTenantIds[tenantId]; !ok {
// skip tenantIds that should not be queried
continue
}

jobs[jobPos] = &exemplarSelectJob{
pos: jobPos,
querier: m.queriers[idx],
id: tenantId,
}
jobPos++
}

run := func(ctx context.Context, jobIntf interface{}) error {
job, ok := jobIntf.(*exemplarSelectJob)
if !ok {
return fmt.Errorf("unexpected type %T", jobIntf)
}

res, err := job.querier.Select(start, end, allUnrelatedMatchers...)
if err != nil {
return errors.Wrapf(err, "error exemplars querying %s %s", rewriteLabelName(m.idLabelName), job.id)
}

// append __tenant__ label to `seriesLabels` to identify each tenants per exemplar query result
for i, e := range res {
e.SeriesLabels = setLabelsRetainExisting(e.SeriesLabels, labels.Label{
Name: m.idLabelName,
Value: job.id,
})
res[i] = e
}

results[job.pos] = res
return nil
}

err := concurrency.ForEach(ctx, jobs, maxConcurrency, run)
if err != nil {
return nil, err
}

var ret []exemplar.QueryResult
for _, exemplars := range results {
ret = append(ret, exemplars...)
}

return ret, nil
}

func filterAllTenantsAndMatchers(idLabelName string, tenantIds []string, allMatchers [][]*labels.Matcher) (map[string]struct{}, [][]*labels.Matcher) {
allMatchedTenantIds := make(map[string]struct{})
allUnrelatedMatchers := make([][]*labels.Matcher, len(allMatchers))

for idx, matchers := range allMatchers {
matchedTenantIds, unrelatedMatchers := filterValuesByMatchers(idLabelName, tenantIds, matchers...)
for tenantId := range matchedTenantIds {
allMatchedTenantIds[tenantId] = struct{}{}
}
allUnrelatedMatchers[idx] = unrelatedMatchers
}

return allMatchedTenantIds, allUnrelatedMatchers
}
Loading

0 comments on commit 1015a30

Please sign in to comment.