Skip to content

Commit

Permalink
Support metadata federated query
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 committed Dec 26, 2024
1 parent 8a46d20 commit f2e51c1
Show file tree
Hide file tree
Showing 7 changed files with 271 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
* [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256
* [FEATURE] Ruler: Add support for per-user external labels #6340
* [FEATURE] Query Frontend: Support a metadata federated query when `-tenant-federation.enabled=true`. #6461
* [ENHANCEMENT] Querier: Add a `-tenant-federation.max-concurrent` flags to configure the number of worker processing federated query and add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query. #6449
* [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423
* [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388
Expand Down
6 changes: 3 additions & 3 deletions pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func NewQuerierHandler(
queryable storage.SampleAndChunkQueryable,
exemplarQueryable storage.ExemplarQueryable,
engine promql.QueryEngine,
distributor Distributor,
metadataQuerier querier.MetadataQuerier,
reg prometheus.Registerer,
logger log.Logger,
) http.Handler {
Expand Down Expand Up @@ -266,7 +266,7 @@ func NewQuerierHandler(

// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
// https://github.com/prometheus/prometheus/pull/7125/files
router.Path(path.Join(prefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(distributor))
router.Path(path.Join(prefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
router.Path(path.Join(prefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger))
router.Path(path.Join(prefix, "/api/v1/read")).Methods("POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(promRouter)
Expand All @@ -279,7 +279,7 @@ func NewQuerierHandler(

// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
// https://github.com/prometheus/prometheus/pull/7125/files
router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(distributor))
router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger))
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Methods("POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(legacyPromRouter)
Expand Down
1 change: 1 addition & 0 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ type Cortex struct {
RuntimeConfig *runtimeconfig.Manager
QuerierQueryable prom_storage.SampleAndChunkQueryable
ExemplarQueryable prom_storage.ExemplarQueryable
MetadataQuerier querier.MetadataQuerier
QuerierEngine promql.QueryEngine
QueryFrontendTripperware tripperware.Tripperware

Expand Down
7 changes: 6 additions & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ func (t *Cortex) initQueryable() (serv services.Service, err error) {
// Create a querier queryable and PromQL engine
t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, querierRegisterer, util_log.Logger)

// Use distributor as default MetadataQuerier
t.MetadataQuerier = t.Distributor

// Register the default endpoints that are always enabled for the querier module
t.API.RegisterQueryable(t.QuerierQueryable, t.Distributor)

Expand All @@ -274,6 +277,8 @@ func (t *Cortex) initTenantFederation() (serv services.Service, err error) {
// federation.
byPassForSingleQuerier := true
t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, t.Cfg.TenantFederation.MaxConcurrent, byPassForSingleQuerier, prometheus.DefaultRegisterer))
t.MetadataQuerier = tenantfederation.NewMetadataQuerier(t.MetadataQuerier, t.Cfg.TenantFederation.MaxConcurrent, prometheus.DefaultRegisterer)

}
return nil, nil
}
Expand Down Expand Up @@ -335,7 +340,7 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
t.QuerierQueryable,
t.ExemplarQueryable,
t.QuerierEngine,
t.Distributor,
t.MetadataQuerier,
prometheus.DefaultRegisterer,
util_log.Logger,
)
Expand Down
11 changes: 9 additions & 2 deletions pkg/querier/metadata_handler.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package querier

import (
"context"
"net/http"

"github.com/prometheus/prometheus/scrape"

"github.com/cortexproject/cortex/pkg/util"
)

type MetadataQuerier interface {
MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error)
}

type metricMetadata struct {
Type string `json:"type"`
Help string `json:"help"`
Expand All @@ -25,9 +32,9 @@ type metadataResult struct {

// MetadataHandler returns metric metadata held by Cortex for a given tenant.
// It is kept and returned as a set.
func MetadataHandler(d Distributor) http.Handler {
func MetadataHandler(m MetadataQuerier) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
resp, err := d.MetricsMetadata(r.Context())
resp, err := m.MetricsMetadata(r.Context())
if err != nil {
w.WriteHeader(http.StatusBadRequest)
util.WriteJSONResponse(w, metadataResult{Status: statusError, Error: err.Error()})
Expand Down
107 changes: 107 additions & 0 deletions pkg/querier/tenantfederation/metadata_merge_querier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package tenantfederation

import (
"context"
"fmt"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/scrape"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util/concurrency"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
)

func NewMetadataQuerier(upstream querier.MetadataQuerier, maxConcurrent int, reg prometheus.Registerer) querier.MetadataQuerier {
return &mergeMetadataQuerier{
upstream: upstream,
maxConcurrent: maxConcurrent,

tenantsPerMetadataQuery: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "querier_federated_tenants_per_metadata_query",
Help: "Number of tenants per metadata query.",
Buckets: []float64{1, 2, 4, 8, 16, 32, 64},
}),
}
}

type mergeMetadataQuerier struct {
maxConcurrent int
tenantsPerMetadataQuery prometheus.Histogram
upstream querier.MetadataQuerier
}

type metadataSelectJob struct {
pos int
querier querier.MetadataQuerier
id string
}

// MetricsMetadata returns aggregated metadata for multiple tenants
func (m *mergeMetadataQuerier) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) {
log, ctx := spanlogger.New(ctx, "mergeMetadataQuerier.MetricsMetadata")
defer log.Span.Finish()

tenantIds, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, err
}

m.tenantsPerMetadataQuery.Observe(float64(len(tenantIds)))

if len(tenantIds) == 1 {
//
return m.upstream.MetricsMetadata(ctx)
}

jobs := make([]interface{}, len(tenantIds))
results := make([][]scrape.MetricMetadata, len(tenantIds))

var jobPos int
for _, tenantId := range tenantIds {
jobs[jobPos] = &metadataSelectJob{
pos: jobPos,
querier: m.upstream,
id: tenantId,
}
jobPos++
}

run := func(ctx context.Context, jobIntf interface{}) error {
job, ok := jobIntf.(*metadataSelectJob)
if !ok {
return fmt.Errorf("unexpected type %T", jobIntf)
}

res, err := job.querier.MetricsMetadata(user.InjectOrgID(ctx, job.id))
if err != nil {
return errors.Wrapf(err, "error exemplars querying %s %s", job.id, err)
}

results[job.pos] = res
return nil
}

err = concurrency.ForEach(ctx, jobs, m.maxConcurrent, run)
if err != nil {
return nil, err
}

var ret []scrape.MetricMetadata
deduplicated := make(map[scrape.MetricMetadata]struct{})
for _, metadata := range results {
for _, m := range metadata {
if _, ok := deduplicated[m]; !ok {
ret = append(ret, m)
deduplicated[m] = struct{}{}
}
}
}

return ret, nil
}
144 changes: 144 additions & 0 deletions pkg/querier/tenantfederation/metadata_merge_querier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package tenantfederation

import (
"context"
"fmt"
"strings"
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/scrape"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/tenant"
)

var (
expectedSingleTenantsMetadataMetrics = `
# HELP cortex_querier_federated_tenants_per_metadata_query Number of tenants per metadata query.
# TYPE cortex_querier_federated_tenants_per_metadata_query histogram
cortex_querier_federated_tenants_per_metadata_query_bucket{le="1"} 1
cortex_querier_federated_tenants_per_metadata_query_bucket{le="2"} 1
cortex_querier_federated_tenants_per_metadata_query_bucket{le="4"} 1
cortex_querier_federated_tenants_per_metadata_query_bucket{le="8"} 1
cortex_querier_federated_tenants_per_metadata_query_bucket{le="16"} 1
cortex_querier_federated_tenants_per_metadata_query_bucket{le="32"} 1
cortex_querier_federated_tenants_per_metadata_query_bucket{le="64"} 1
cortex_querier_federated_tenants_per_metadata_query_bucket{le="+Inf"} 1
cortex_querier_federated_tenants_per_metadata_query_sum 1
cortex_querier_federated_tenants_per_metadata_query_count 1
`

expectedTwoTenantsMetadataMetrics = `
# HELP cortex_querier_federated_tenants_per_metadata_query Number of tenants per metadata query.
# TYPE cortex_querier_federated_tenants_per_metadata_query histogram
cortex_querier_federated_tenants_per_metadata_query_bucket{le="1"} 0
cortex_querier_federated_tenants_per_metadata_query_bucket{le="2"} 1
cortex_querier_federated_tenants_per_metadata_query_bucket{le="4"} 1
cortex_querier_federated_tenants_per_metadata_query_bucket{le="8"} 1
cortex_querier_federated_tenants_per_metadata_query_bucket{le="16"} 1
cortex_querier_federated_tenants_per_metadata_query_bucket{le="32"} 1
cortex_querier_federated_tenants_per_metadata_query_bucket{le="64"} 1
cortex_querier_federated_tenants_per_metadata_query_bucket{le="+Inf"} 1
cortex_querier_federated_tenants_per_metadata_query_sum 2
cortex_querier_federated_tenants_per_metadata_query_count 1
`
)

type mockMetadataQuerier struct {
tenantIdToMetadata map[string][]scrape.MetricMetadata
}

func (m *mockMetadataQuerier) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) {
tenantId, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

if res, ok := m.tenantIdToMetadata[tenantId]; !ok {
return nil, fmt.Errorf("tenant not found, tenantId: %s", tenantId)
} else {
return res, nil
}
}

func Test_mergeMetadataQuerier_MetricsMetadata(t *testing.T) {
// set a multi tenant resolver
tenant.WithDefaultResolver(tenant.NewMultiResolver())

tests := []struct {
name string
tenantIdToMetadata map[string][]scrape.MetricMetadata
orgId string
expectedResults []scrape.MetricMetadata
expectedMetrics string
}{
{
name: "single tenant",
tenantIdToMetadata: map[string][]scrape.MetricMetadata{
"user-1": {
{Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""},
},
},
orgId: "user-1",
expectedResults: []scrape.MetricMetadata{
{Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""},
},
expectedMetrics: expectedSingleTenantsMetadataMetrics,
},
{
name: "should be merged two tenants results",
tenantIdToMetadata: map[string][]scrape.MetricMetadata{
"user-1": {
{Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""},
},
"user-2": {
{Metric: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""},
{Metric: "metadata3", Help: "metadata3 help", Type: "gauge", Unit: ""},
},
},
orgId: "user-1|user-2",
expectedResults: []scrape.MetricMetadata{
{Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""},
{Metric: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""},
{Metric: "metadata3", Help: "metadata3 help", Type: "gauge", Unit: ""},
},
expectedMetrics: expectedTwoTenantsMetadataMetrics,
},
{
name: "should be deduplicated when the same metadata exist",
tenantIdToMetadata: map[string][]scrape.MetricMetadata{
"user-1": {
{Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""},
{Metric: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""},
},
"user-2": {
{Metric: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""},
},
},
orgId: "user-1|user-2",
expectedResults: []scrape.MetricMetadata{
{Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""},
{Metric: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""},
},
expectedMetrics: expectedTwoTenantsMetadataMetrics,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
upstream := mockMetadataQuerier{
tenantIdToMetadata: test.tenantIdToMetadata,
}

mergeMetadataQuerier := NewMetadataQuerier(&upstream, defaultMaxConcurrency, reg)
metadata, err := mergeMetadataQuerier.MetricsMetadata(user.InjectOrgID(context.Background(), test.orgId))
require.NoError(t, err)
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(test.expectedMetrics), "cortex_querier_federated_tenants_per_metadata_query"))
require.Equal(t, test.expectedResults, metadata)
})
}
}

0 comments on commit f2e51c1

Please sign in to comment.