Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support metadata federated query #6461

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
* [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256
* [FEATURE] Ruler: Add support for per-user external labels #6340
* [FEATURE] Query Frontend: Support a metadata federated query when `-tenant-federation.enabled=true`. #6461
* [ENHANCEMENT] Querier: Add a `-tenant-federation.max-concurrent` flags to configure the number of worker processing federated query and add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query. #6449
* [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423
* [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388
Expand Down
6 changes: 3 additions & 3 deletions pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func NewQuerierHandler(
queryable storage.SampleAndChunkQueryable,
exemplarQueryable storage.ExemplarQueryable,
engine promql.QueryEngine,
distributor Distributor,
metadataQuerier querier.MetadataQuerier,
reg prometheus.Registerer,
logger log.Logger,
) http.Handler {
Expand Down Expand Up @@ -266,7 +266,7 @@ func NewQuerierHandler(

// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
// https://github.com/prometheus/prometheus/pull/7125/files
router.Path(path.Join(prefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(distributor))
router.Path(path.Join(prefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
router.Path(path.Join(prefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger))
router.Path(path.Join(prefix, "/api/v1/read")).Methods("POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(promRouter)
Expand All @@ -279,7 +279,7 @@ func NewQuerierHandler(

// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
// https://github.com/prometheus/prometheus/pull/7125/files
router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(distributor))
router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger))
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Methods("POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(legacyPromRouter)
Expand Down
1 change: 1 addition & 0 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ type Cortex struct {
RuntimeConfig *runtimeconfig.Manager
QuerierQueryable prom_storage.SampleAndChunkQueryable
ExemplarQueryable prom_storage.ExemplarQueryable
MetadataQuerier querier.MetadataQuerier
QuerierEngine promql.QueryEngine
QueryFrontendTripperware tripperware.Tripperware

Expand Down
7 changes: 6 additions & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ func (t *Cortex) initQueryable() (serv services.Service, err error) {
// Create a querier queryable and PromQL engine
t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, querierRegisterer, util_log.Logger)

// Use distributor as default MetadataQuerier
t.MetadataQuerier = t.Distributor

// Register the default endpoints that are always enabled for the querier module
t.API.RegisterQueryable(t.QuerierQueryable, t.Distributor)

Expand All @@ -274,6 +277,8 @@ func (t *Cortex) initTenantFederation() (serv services.Service, err error) {
// federation.
byPassForSingleQuerier := true
t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, t.Cfg.TenantFederation.MaxConcurrent, byPassForSingleQuerier, prometheus.DefaultRegisterer))
t.MetadataQuerier = tenantfederation.NewMetadataQuerier(t.MetadataQuerier, t.Cfg.TenantFederation.MaxConcurrent, prometheus.DefaultRegisterer)

}
return nil, nil
}
Expand Down Expand Up @@ -335,7 +340,7 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
t.QuerierQueryable,
t.ExemplarQueryable,
t.QuerierEngine,
t.Distributor,
t.MetadataQuerier,
prometheus.DefaultRegisterer,
util_log.Logger,
)
Expand Down
11 changes: 9 additions & 2 deletions pkg/querier/metadata_handler.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package querier

import (
"context"
"net/http"

"github.com/prometheus/prometheus/scrape"

"github.com/cortexproject/cortex/pkg/util"
)

type MetadataQuerier interface {
MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error)
}

type metricMetadata struct {
Type string `json:"type"`
Help string `json:"help"`
Expand All @@ -25,9 +32,9 @@ type metadataResult struct {

// MetadataHandler returns metric metadata held by Cortex for a given tenant.
// It is kept and returned as a set.
func MetadataHandler(d Distributor) http.Handler {
func MetadataHandler(m MetadataQuerier) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
resp, err := d.MetricsMetadata(r.Context())
resp, err := m.MetricsMetadata(r.Context())
if err != nil {
w.WriteHeader(http.StatusBadRequest)
util.WriteJSONResponse(w, metadataResult{Status: statusError, Error: err.Error()})
Expand Down
109 changes: 109 additions & 0 deletions pkg/querier/tenantfederation/metadata_merge_querier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package tenantfederation

import (
"context"
"fmt"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/scrape"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util/concurrency"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
)

// NewMetadataQuerier returns a MetadataQuerier that merges metric
// metadata for multiple tenants.
func NewMetadataQuerier(upstream querier.MetadataQuerier, maxConcurrent int, reg prometheus.Registerer) querier.MetadataQuerier {
return &mergeMetadataQuerier{
upstream: upstream,
maxConcurrent: maxConcurrent,

tenantsPerMetadataQuery: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "querier_federated_tenants_per_metadata_query",
Help: "Number of tenants per metadata query.",
Buckets: []float64{1, 2, 4, 8, 16, 32, 64},
}),
}
}

type mergeMetadataQuerier struct {
maxConcurrent int
tenantsPerMetadataQuery prometheus.Histogram
upstream querier.MetadataQuerier
}

type metadataSelectJob struct {
pos int
querier querier.MetadataQuerier
id string
}

// MetricsMetadata returns aggregated metadata for multiple tenants
func (m *mergeMetadataQuerier) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) {
log, ctx := spanlogger.New(ctx, "mergeMetadataQuerier.MetricsMetadata")
defer log.Span.Finish()

tenantIds, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, err
}

m.tenantsPerMetadataQuery.Observe(float64(len(tenantIds)))

if len(tenantIds) == 1 {
return m.upstream.MetricsMetadata(ctx)
}

jobs := make([]interface{}, len(tenantIds))
results := make([][]scrape.MetricMetadata, len(tenantIds))

var jobPos int
for _, tenantId := range tenantIds {
jobs[jobPos] = &metadataSelectJob{
pos: jobPos,
querier: m.upstream,
id: tenantId,
}
jobPos++
}

run := func(ctx context.Context, jobIntf interface{}) error {
job, ok := jobIntf.(*metadataSelectJob)
if !ok {
return fmt.Errorf("unexpected type %T", jobIntf)
}

res, err := job.querier.MetricsMetadata(user.InjectOrgID(ctx, job.id))
if err != nil {
return errors.Wrapf(err, "error exemplars querying %s %s", job.id, err)
}

results[job.pos] = res
return nil
}

err = concurrency.ForEach(ctx, jobs, m.maxConcurrent, run)
if err != nil {
return nil, err
}

// deduplicate for the same MetricMetadata across all tenants
var ret []scrape.MetricMetadata
deduplicated := make(map[scrape.MetricMetadata]struct{})
for _, metadata := range results {
for _, m := range metadata {
if _, ok := deduplicated[m]; !ok {
ret = append(ret, m)
deduplicated[m] = struct{}{}
}
}
}

return ret, nil
}
146 changes: 146 additions & 0 deletions pkg/querier/tenantfederation/metadata_merge_querier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package tenantfederation

import (
"context"
"fmt"
"strings"
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/scrape"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/tenant"
)

var (
expectedSingleTenantsMetadataMetrics = `
# HELP cortex_querier_federated_tenants_per_metadata_query Number of tenants per metadata query.
# TYPE cortex_querier_federated_tenants_per_metadata_query histogram
cortex_querier_federated_tenants_per_metadata_query_bucket{le="1"} 1
cortex_querier_federated_tenants_per_metadata_query_bucket{le="2"} 1
cortex_querier_federated_tenants_per_metadata_query_bucket{le="4"} 1
cortex_querier_federated_tenants_per_metadata_query_bucket{le="8"} 1
cortex_querier_federated_tenants_per_metadata_query_bucket{le="16"} 1
cortex_querier_federated_tenants_per_metadata_query_bucket{le="32"} 1
cortex_querier_federated_tenants_per_metadata_query_bucket{le="64"} 1
cortex_querier_federated_tenants_per_metadata_query_bucket{le="+Inf"} 1
cortex_querier_federated_tenants_per_metadata_query_sum 1
cortex_querier_federated_tenants_per_metadata_query_count 1
`

expectedTwoTenantsMetadataMetrics = `
# HELP cortex_querier_federated_tenants_per_metadata_query Number of tenants per metadata query.
# TYPE cortex_querier_federated_tenants_per_metadata_query histogram
cortex_querier_federated_tenants_per_metadata_query_bucket{le="1"} 0
cortex_querier_federated_tenants_per_metadata_query_bucket{le="2"} 1
cortex_querier_federated_tenants_per_metadata_query_bucket{le="4"} 1
cortex_querier_federated_tenants_per_metadata_query_bucket{le="8"} 1
cortex_querier_federated_tenants_per_metadata_query_bucket{le="16"} 1
cortex_querier_federated_tenants_per_metadata_query_bucket{le="32"} 1
cortex_querier_federated_tenants_per_metadata_query_bucket{le="64"} 1
cortex_querier_federated_tenants_per_metadata_query_bucket{le="+Inf"} 1
cortex_querier_federated_tenants_per_metadata_query_sum 2
cortex_querier_federated_tenants_per_metadata_query_count 1
`
)

type mockMetadataQuerier struct {
tenantIdToMetadata map[string][]scrape.MetricMetadata
}

func (m *mockMetadataQuerier) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) {
// Due to lint check for `ensure the query path is supporting multiple tenants`
ids, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, err
}

id := ids[0]
if res, ok := m.tenantIdToMetadata[id]; !ok {
return nil, fmt.Errorf("tenant not found, tenantId: %s", id)
} else {
return res, nil
}
}

func Test_mergeMetadataQuerier_MetricsMetadata(t *testing.T) {
// set a multi tenant resolver
tenant.WithDefaultResolver(tenant.NewMultiResolver())

tests := []struct {
name string
tenantIdToMetadata map[string][]scrape.MetricMetadata
orgId string
expectedResults []scrape.MetricMetadata
expectedMetrics string
}{
{
name: "single tenant",
tenantIdToMetadata: map[string][]scrape.MetricMetadata{
"user-1": {
{Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""},
},
},
orgId: "user-1",
expectedResults: []scrape.MetricMetadata{
{Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""},
},
expectedMetrics: expectedSingleTenantsMetadataMetrics,
},
{
name: "should be merged two tenants results",
tenantIdToMetadata: map[string][]scrape.MetricMetadata{
"user-1": {
{Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""},
},
"user-2": {
{Metric: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""},
{Metric: "metadata3", Help: "metadata3 help", Type: "gauge", Unit: ""},
},
},
orgId: "user-1|user-2",
expectedResults: []scrape.MetricMetadata{
{Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""},
{Metric: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""},
{Metric: "metadata3", Help: "metadata3 help", Type: "gauge", Unit: ""},
},
expectedMetrics: expectedTwoTenantsMetadataMetrics,
},
{
name: "should be deduplicated when the same metadata exist",
tenantIdToMetadata: map[string][]scrape.MetricMetadata{
"user-1": {
{Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""},
{Metric: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""},
},
"user-2": {
{Metric: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""},
},
},
orgId: "user-1|user-2",
expectedResults: []scrape.MetricMetadata{
{Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""},
{Metric: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""},
},
expectedMetrics: expectedTwoTenantsMetadataMetrics,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
upstream := mockMetadataQuerier{
tenantIdToMetadata: test.tenantIdToMetadata,
}

mergeMetadataQuerier := NewMetadataQuerier(&upstream, defaultMaxConcurrency, reg)
metadata, err := mergeMetadataQuerier.MetricsMetadata(user.InjectOrgID(context.Background(), test.orgId))
require.NoError(t, err)
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(test.expectedMetrics), "cortex_querier_federated_tenants_per_metadata_query"))
require.Equal(t, test.expectedResults, metadata)
})
}
}
Loading