Skip to content

Commit

Permalink
Add native histogram for tracking push requests size
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 committed Dec 4, 2024
1 parent 320e475 commit 98b4613
Show file tree
Hide file tree
Showing 11 changed files with 145 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256
* [FEATURE] Ruler: Add support for per-user external labels #6340
* [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388
* [ENHANCEMENT] Distributor: Add a `cortex_distributor_push_requests_uncompressed_bytes` native histogram to track uncompressed push requests in bytes for tenant and format. #6384
* [ENHANCEMENT] Ingester: Add metrics to track succeed/failed native histograms. #6370
* [ENHANCEMENT] Query Frontend/Querier: Add an experimental flag `-querier.enable-promql-experimental-functions` to enable experimental promQL functions. #6355
* [ENHANCEMENT] OTLP: Add `-distributor.otlp-max-recv-msg-size` flag to limit OTLP request size in bytes. #6333
Expand Down
10 changes: 5 additions & 5 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc) {
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, overrides *validation.Overrides) {
distributorpb.RegisterDistributorServer(a.server.GRPC, d)

a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d), d.PushHandlerMetrics), true, "POST")
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d), d.PushHandlerMetrics), true, "POST")

a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status")
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/all_user_stats", "Usage Statistics")
Expand All @@ -289,7 +289,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false, "GET")

// Legacy Routes
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d), d.PushHandlerMetrics), true, "POST")
a.RegisterRoute("/all_user_stats", http.HandlerFunc(d.AllUserStatsHandler), false, "GET")
a.RegisterRoute("/ha-tracker", d.HATracker, false, "GET")
}
Expand Down Expand Up @@ -322,12 +322,12 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/all_user_stats", http.HandlerFunc(i.AllUserStatsHandler), false, "GET")
a.RegisterRoute("/ingester/mode", http.HandlerFunc(i.ModeHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push, nil), true, "POST") // For testing and debugging.

// Legacy Routes
a.RegisterRoute("/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST")
a.RegisterRoute("/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST")
a.RegisterRoute("/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
a.RegisterRoute("/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push, nil), true, "POST") // For testing and debugging.
}

func (a *API) RegisterTenantDeletion(api *purger.TenantDeletionAPI) {
Expand Down
34 changes: 33 additions & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ type Distributor struct {
latestSeenSampleTimestampPerUser *prometheus.GaugeVec

validateMetrics *validation.ValidateMetrics

// metrics passed to push handler
PushHandlerMetrics *PushHandlerMetrics
}

// Config contains the configuration required to
Expand Down Expand Up @@ -167,6 +170,32 @@ type Config struct {
OTLPConfig OTLPConfig `yaml:"otlp"`
}

type PushHandlerMetrics struct {
pushRequestSizeBytes *prometheus.HistogramVec
}

func NewPushHandlerMetrics(reg prometheus.Registerer) *PushHandlerMetrics {
return &PushHandlerMetrics{
pushRequestSizeBytes: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_distributor_push_requests_uncompressed_bytes",
Help: "Histogram of push request's uncompressed size in bytes",
NativeHistogramBucketFactor: 1.1,
NativeHistogramMinResetDuration: 1 * time.Hour,
NativeHistogramMaxBucketNumber: 100,
}, []string{"user", "format"}),
}
}

func (m *PushHandlerMetrics) ObservePushRequestSize(user, format string, size float64) {
if m != nil {
m.pushRequestSizeBytes.WithLabelValues(user, format).Observe(size)
}
}

func (m *PushHandlerMetrics) deleteUserMetrics(user string) {
m.pushRequestSizeBytes.DeleteLabelValues(user)
}

type InstanceLimits struct {
MaxIngestionRate float64 `yaml:"max_ingestion_rate"`
MaxInflightPushRequests int `yaml:"max_inflight_push_requests"`
Expand Down Expand Up @@ -365,7 +394,8 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
Help: "Unix timestamp of latest received sample per user.",
}, []string{"user"}),

validateMetrics: validation.NewValidateMetrics(reg),
validateMetrics: validation.NewValidateMetrics(reg),
PushHandlerMetrics: NewPushHandlerMetrics(reg),
}

promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Expand Down Expand Up @@ -468,6 +498,8 @@ func (d *Distributor) cleanupInactiveUser(userID string) {
d.nonHASamples.DeleteLabelValues(userID)
d.latestSeenSampleTimestampPerUser.DeleteLabelValues(userID)

d.PushHandlerMetrics.deleteUserMetrics(userID)

if err := util.DeleteMatchingLabels(d.dedupedSamples, map[string]string{"user": userID}); err != nil {
level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_deduped_samples_total metric for user", "user", userID, "err", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ func TestMarshall(t *testing.T) {
plentySize = 1024 * 1024
)
req := cortexpb.WriteRequest{}
err := util.ParseProtoReader(context.Background(), recorder.Body, recorder.Body.Len(), tooSmallSize, &req, util.RawSnappy)
_, err := util.ParseProtoReader(context.Background(), recorder.Body, recorder.Body.Len(), tooSmallSize, &req, util.RawSnappy)
require.Error(t, err)
err = util.ParseProtoReader(context.Background(), recorder.Body, recorder.Body.Len(), plentySize, &req, util.RawSnappy)
_, err = util.ParseProtoReader(context.Background(), recorder.Body, recorder.Body.Len(), plentySize, &req, util.RawSnappy)
require.NoError(t, err)
require.Equal(t, numSeries, len(req.Timeseries))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/remote_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func RemoteReadHandler(q storage.Queryable, logger log.Logger) http.Handler {
ctx := r.Context()
var req client.ReadRequest
logger := util_log.WithContext(r.Context(), logger)
if err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRemoteReadQuerySize, &req, util.RawSnappy); err != nil {
if _, err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRemoteReadQuerySize, &req, util.RawSnappy); err != nil {
level.Error(logger).Log("msg", "failed to parse proto", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
Expand Down
8 changes: 4 additions & 4 deletions pkg/util/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,14 @@ const (
)

// ParseProtoReader parses a compressed proto from an io.Reader.
func ParseProtoReader(ctx context.Context, reader io.Reader, expectedSize, maxSize int, req proto.Message, compression CompressionType) error {
func ParseProtoReader(ctx context.Context, reader io.Reader, expectedSize, maxSize int, req proto.Message, compression CompressionType) (int, error) {
sp := opentracing.SpanFromContext(ctx)
if sp != nil {
sp.LogFields(otlog.String("event", "util.ParseProtoRequest[start reading]"))
}
body, err := decompressRequest(reader, expectedSize, maxSize, compression, sp)
if err != nil {
return err
return 0, err
}

if sp != nil {
Expand All @@ -171,10 +171,10 @@ func ParseProtoReader(ctx context.Context, reader io.Reader, expectedSize, maxSi
err = proto.NewBuffer(body).Unmarshal(req)
}
if err != nil {
return err
return 0, err
}

return nil
return len(body), nil
}

func decompressRequest(reader io.Reader, expectedSize, maxSize int, compression CompressionType, sp opentracing.Span) (body []byte, err error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func TestParseProtoReader(t *testing.T) {
reader = bytesBuffered{Buffer: &buf}
}

err := util.ParseProtoReader(context.Background(), reader, 0, tt.maxSize, &fromWire, tt.compression)
_, err := util.ParseProtoReader(context.Background(), reader, 0, tt.maxSize, &fromWire, tt.compression)
if tt.expectErr {
assert.NotNil(t, err)
return
Expand Down
24 changes: 19 additions & 5 deletions pkg/util/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const (
)

// OTLPHandler is a http.Handler which accepts OTLP metrics.
func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distributor.OTLPConfig, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distributor.OTLPConfig, sourceIPs *middleware.SourceIPExtractor, push Func, metrics *distributor.PushHandlerMetrics) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
logger := util_log.WithContext(ctx, util_log.Logger)
Expand All @@ -51,7 +51,7 @@ func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distri
return
}

req, err := decodeOTLPWriteRequest(ctx, r, maxRecvMsgSize)
req, err := decodeOTLPWriteRequest(ctx, r, maxRecvMsgSize, userID, metrics)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down Expand Up @@ -99,7 +99,7 @@ func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distri
})
}

func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) (pmetricotlp.ExportRequest, error) {
func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int, userID string, metrics *distributor.PushHandlerMetrics) (pmetricotlp.ExportRequest, error) {
expectedSize := int(r.ContentLength)
if expectedSize > maxSize {
return pmetricotlp.NewExportRequest(), fmt.Errorf("received message larger than max (%d vs %d)", expectedSize, maxSize)
Expand All @@ -124,7 +124,17 @@ func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) (
decoderFunc = func(reader io.Reader) (pmetricotlp.ExportRequest, error) {
req := pmetricotlp.NewExportRequest()
otlpReqProto := otlpProtoMessage{req: &req}
return req, util.ParseProtoReader(ctx, reader, expectedSize, maxSize, otlpReqProto, compressionType)

bodySize, err := util.ParseProtoReader(ctx, reader, expectedSize, maxSize, otlpReqProto, compressionType)
if err != nil {
return req, err
}

if metrics != nil {
metrics.ObservePushRequestSize(userID, formatOTLP, float64(bodySize))
}

return req, nil
}
case jsonContentType:
decoderFunc = func(reader io.Reader) (pmetricotlp.ExportRequest, error) {
Expand All @@ -143,11 +153,15 @@ func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) (
if expectedSize > 0 {
buf.Grow(expectedSize + bytes.MinRead) // extra space guarantees no reallocation
}
_, err := buf.ReadFrom(reader)
bodySize, err := buf.ReadFrom(reader)
if err != nil {
return req, err
}

if metrics != nil {
metrics.ObservePushRequestSize(userID, formatOTLP, float64(bodySize))
}

return req, req.UnmarshalJSON(buf.Bytes())
}
default:
Expand Down
39 changes: 38 additions & 1 deletion pkg/util/push/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -236,12 +239,20 @@ func TestOTLPWriteHandler(t *testing.T) {
expectedErrMsg string
gzipCompression bool
encodingType string
expectedMetrics string
}{
{
description: "Test proto format write with no compression",
maxRecvMsgSize: 10000,
format: pbContentType,
expectedStatusCode: http.StatusOK,
expectedMetrics: `
# HELP cortex_distributor_push_requests_uncompressed_bytes Histogram of push request's uncompressed size in bytes
# TYPE cortex_distributor_push_requests_uncompressed_bytes histogram
cortex_distributor_push_requests_uncompressed_bytes_bucket{format="otlp",user="user-1",le="+Inf"} 1
cortex_distributor_push_requests_uncompressed_bytes_sum{format="otlp",user="user-1"} 665
cortex_distributor_push_requests_uncompressed_bytes_count{format="otlp",user="user-1"} 1
`,
},
{
description: "Test proto format write with gzip",
Expand All @@ -250,12 +261,26 @@ func TestOTLPWriteHandler(t *testing.T) {
expectedStatusCode: http.StatusOK,
encodingType: "gzip",
gzipCompression: true,
expectedMetrics: `
# HELP cortex_distributor_push_requests_uncompressed_bytes Histogram of push request's uncompressed size in bytes
# TYPE cortex_distributor_push_requests_uncompressed_bytes histogram
cortex_distributor_push_requests_uncompressed_bytes_bucket{format="otlp",user="user-1",le="+Inf"} 1
cortex_distributor_push_requests_uncompressed_bytes_sum{format="otlp",user="user-1"} 665
cortex_distributor_push_requests_uncompressed_bytes_count{format="otlp",user="user-1"} 1
`,
},
{
description: "Test json format write with no compression",
maxRecvMsgSize: 10000,
format: jsonContentType,
expectedStatusCode: http.StatusOK,
expectedMetrics: `
# HELP cortex_distributor_push_requests_uncompressed_bytes Histogram of push request's uncompressed size in bytes
# TYPE cortex_distributor_push_requests_uncompressed_bytes histogram
cortex_distributor_push_requests_uncompressed_bytes_bucket{format="otlp",user="user-1",le="+Inf"} 1
cortex_distributor_push_requests_uncompressed_bytes_sum{format="otlp",user="user-1"} 1568
cortex_distributor_push_requests_uncompressed_bytes_count{format="otlp",user="user-1"} 1
`,
},
{
description: "Test json format write with gzip",
Expand All @@ -264,6 +289,13 @@ func TestOTLPWriteHandler(t *testing.T) {
expectedStatusCode: http.StatusOK,
encodingType: "gzip",
gzipCompression: true,
expectedMetrics: `
# HELP cortex_distributor_push_requests_uncompressed_bytes Histogram of push request's uncompressed size in bytes
# TYPE cortex_distributor_push_requests_uncompressed_bytes histogram
cortex_distributor_push_requests_uncompressed_bytes_bucket{format="otlp",user="user-1",le="+Inf"} 1
cortex_distributor_push_requests_uncompressed_bytes_sum{format="otlp",user="user-1"} 1568
cortex_distributor_push_requests_uncompressed_bytes_count{format="otlp",user="user-1"} 1
`,
},
{
description: "request too big than maxRecvMsgSize (proto) with no compression",
Expand Down Expand Up @@ -351,14 +383,19 @@ func TestOTLPWriteHandler(t *testing.T) {
push := verifyOTLPWriteRequestHandler(t, cortexpb.API)
overrides, err := validation.NewOverrides(querier.DefaultLimitsConfig(), nil)
require.NoError(t, err)
handler := OTLPHandler(test.maxRecvMsgSize, overrides, cfg, nil, push)
reg := prometheus.NewRegistry()
handler := OTLPHandler(test.maxRecvMsgSize, overrides, cfg, nil, push, distributor.NewPushHandlerMetrics(reg))

recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)

resp := recorder.Result()
require.Equal(t, test.expectedStatusCode, resp.StatusCode)

if test.expectedMetrics != "" {
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(test.expectedMetrics), "cortex_distributor_push_requests"))
}

if test.expectedErrMsg != "" {
b, err := io.ReadAll(resp.Body)
require.NoError(t, err)
Expand Down
21 changes: 19 additions & 2 deletions pkg/util/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,22 @@ import (
"github.com/weaveworks/common/middleware"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/distributor"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/log"
)

const (
formatRemoteWrite1 = "prw1"
formatOTLP = "otlp"
)

// Func defines the type of the push. It is similar to http.HandlerFunc.
type Func func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)

// Handler is a http.Handler which accepts WriteRequests.
func Handler(maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
func Handler(maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push Func, pushMetrics *distributor.PushHandlerMetrics) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
logger := log.WithContext(ctx, log.Logger)
Expand All @@ -28,14 +35,24 @@ func Handler(maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push F
logger = log.WithSourceIPs(source, logger)
}
}

userID, err := tenant.TenantID(ctx)
if err != nil {
return
}

var req cortexpb.PreallocWriteRequest
err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
bodySize, err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

if pushMetrics != nil {
pushMetrics.ObservePushRequestSize(userID, formatRemoteWrite1, float64(bodySize))
}

req.SkipLabelNameValidation = false
if req.Source == 0 {
req.Source = cortexpb.API
Expand Down
Loading

0 comments on commit 98b4613

Please sign in to comment.