Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT MERGE - Update GRPC #6399

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ require (
golang.org/x/net v0.32.0
golang.org/x/sync v0.10.0
golang.org/x/time v0.8.0
google.golang.org/grpc v1.68.0
google.golang.org/grpc v1.70.0-dev.0.20241210174008-e4d084a6ece3
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
)
Expand All @@ -92,7 +92,7 @@ require (
cloud.google.com/go v0.115.1 // indirect
cloud.google.com/go/auth v0.9.3 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.4 // indirect
cloud.google.com/go/compute/metadata v0.5.0 // indirect
cloud.google.com/go/compute/metadata v0.5.2 // indirect
cloud.google.com/go/iam v1.1.13 // indirect
cloud.google.com/go/storage v1.43.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0 // indirect
Expand Down Expand Up @@ -239,8 +239,8 @@ require (
gonum.org/v1/gonum v0.15.0 // indirect
google.golang.org/api v0.195.0 // indirect
google.golang.org/genproto v0.0.0-20240823204242-4ba0660f739c // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a // indirect
gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect
gopkg.in/telebot.v3 v3.2.1 // indirect
k8s.io/apimachinery v0.31.1 // indirect
Expand Down Expand Up @@ -275,6 +275,3 @@ replace github.com/google/gnostic => github.com/googleapis/gnostic v0.6.9
// Same replace used by thanos: (may be removed in the future)
// https://github.com/thanos-io/thanos/blob/fdeea3917591fc363a329cbe23af37c6fff0b5f0/go.mod#L265
replace gopkg.in/alecthomas/kingpin.v2 => github.com/alecthomas/kingpin v1.3.8-0.20210301060133-17f40c25f497

// gRPC 1.66 introduced memory pooling which breaks Cortex queries. Pin 1.65.0 until we have a fix.
replace google.golang.org/grpc => google.golang.org/grpc v1.65.0
1,216 changes: 68 additions & 1,148 deletions go.sum

Large diffs are not rendered by default.

233 changes: 233 additions & 0 deletions integration/grpc_server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
//go:build requires_docker
// +build requires_docker

package integration

import (
"context"
"flag"
"fmt"
"math/rand"
"net"
"strconv"
"sync"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/server"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/distributor/distributorpb"
ingester_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
)

type mockGprcServer struct {
ingester_client.IngesterServer
}

func (m mockGprcServer) QueryStream(_ *ingester_client.QueryRequest, streamServer ingester_client.Ingester_QueryStreamServer) error {
md, _ := metadata.FromIncomingContext(streamServer.Context())
i, _ := strconv.Atoi(md["i"][0])
return streamServer.Send(createStreamResponse(i))
}

func (m mockGprcServer) Push(ctx context.Context, request *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
time.Sleep(time.Duration(rand.Int31n(100)) * time.Millisecond)
md, _ := metadata.FromIncomingContext(ctx)
i, _ := strconv.Atoi(md["i"][0])
expected := createRequest(i)

if expected.String() != request.String() {
return nil, fmt.Errorf("expected %v, got %v", expected, request)
}
return nil, nil
}

func run(t *testing.T, cfg server.Config, register func(s *grpc.Server), validate func(t *testing.T, con *grpc.ClientConn)) {
savedRegistry := prometheus.DefaultRegisterer
prometheus.DefaultRegisterer = prometheus.NewRegistry()
defer func() {
prometheus.DefaultRegisterer = savedRegistry
}()

grpcPort, closeGrpcPort, err := getLocalHostPort()
require.NoError(t, err)
httpPort, closeHTTPPort, err := getLocalHostPort()
require.NoError(t, err)

err = closeGrpcPort()
require.NoError(t, err)
err = closeHTTPPort()
require.NoError(t, err)

cfg.HTTPListenPort = httpPort
cfg.GRPCListenPort = grpcPort

serv, err := server.New(cfg)
require.NoError(t, err)
register(serv.GRPC)

go func() {
err := serv.Run()
require.NoError(t, err)
}()

defer serv.Shutdown()

grpcHost := fmt.Sprintf("localhost:%d", grpcPort)

clientConfig := grpcclient.Config{}
clientConfig.RegisterFlags(flag.NewFlagSet("fake", flag.ContinueOnError))

dialOptions, err := clientConfig.DialOption(nil, nil)
assert.NoError(t, err)
dialOptions = append([]grpc.DialOption{grpc.WithDefaultCallOptions(clientConfig.CallOptions()...)}, dialOptions...)

conn, err := grpc.NewClient(grpcHost, dialOptions...)
assert.NoError(t, err)
validate(t, conn)
}

func TestConcurrentGrpcCalls(t *testing.T) {
cfg := server.Config{}
(&cfg).RegisterFlags(flag.NewFlagSet("fake", flag.ContinueOnError))

tc := map[string]struct {
cfg server.Config
register func(s *grpc.Server)
validate func(t *testing.T, con *grpc.ClientConn)
}{
"distributor": {
cfg: cfg,
register: func(s *grpc.Server) {
d := &mockGprcServer{}
distributorpb.RegisterDistributorServer(s, d)
},
validate: func(t *testing.T, conn *grpc.ClientConn) {
client := distributorpb.NewDistributorClient(conn)
wg := sync.WaitGroup{}
n := 10000
wg.Add(n)
for i := 0; i < n; i++ {
go func(i int) {
defer wg.Done()
ctx := context.Background()
ctx = metadata.NewOutgoingContext(ctx, metadata.MD{"i": []string{strconv.Itoa(i)}})
_, err := client.Push(ctx, createRequest(i))
require.NoError(t, err)
}(i)
}

wg.Wait()
},
},
"ingester": {
cfg: cfg,
register: func(s *grpc.Server) {
d := &mockGprcServer{}
ingester_client.RegisterIngesterServer(s, d)
},
validate: func(t *testing.T, conn *grpc.ClientConn) {
client := ingester_client.NewIngesterClient(conn)
wg := sync.WaitGroup{}
n := 10000
wg.Add(n)
for i := 0; i < n; i++ {
go func(i int) {
defer wg.Done()
ctx := context.Background()
ctx = metadata.NewOutgoingContext(ctx, metadata.MD{"i": []string{strconv.Itoa(i)}})
s, err := client.QueryStream(ctx, &ingester_client.QueryRequest{})
require.NoError(t, err)
resp, err := s.Recv()
require.NoError(t, err)
expected := createStreamResponse(i)
require.Equal(t, expected.String(), resp.String())
}(i)
}

wg.Wait()
},
},
}

for name, c := range tc {
t.Run(name, func(t *testing.T) {
run(t, c.cfg, c.register, c.validate)
})
}
}

func createStreamResponse(i int) *ingester_client.QueryStreamResponse {
return &ingester_client.QueryStreamResponse{Chunkseries: []ingester_client.TimeSeriesChunk{
{
FromIngesterId: strconv.Itoa(i),
Labels: createLabels(i),
Chunks: []ingester_client.Chunk{
{
StartTimestampMs: int64(i),
EndTimestampMs: int64(i),
Encoding: int32(i),
Data: []byte(strconv.Itoa(i)),
},
},
},
}}
}

func createRequest(i int) *cortexpb.WriteRequest {
labels := createLabels(i)
return &cortexpb.WriteRequest{
Timeseries: []cortexpb.PreallocTimeseries{
{
TimeSeries: &cortexpb.TimeSeries{
Labels: labels,
Samples: []cortexpb.Sample{
{TimestampMs: int64(i), Value: float64(i)},
},
Exemplars: []cortexpb.Exemplar{
{
Labels: labels,
Value: float64(i),
TimestampMs: int64(i),
},
},
},
},
},
}
}

func createLabels(i int) []cortexpb.LabelPair {
labels := make([]cortexpb.LabelPair, 0, 100)
for j := 0; j < 100; j++ {
labels = append(labels, cortexpb.LabelPair{
Name: fmt.Sprintf("test%d_%d", i, j),
Value: fmt.Sprintf("test%d_%d", i, j),
})
}
return labels
}

func getLocalHostPort() (int, func() error, error) {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
return 0, nil, err
}

l, err := net.ListenTCP("tcp", addr)
if err != nil {
return 0, nil, err
}

closePort := func() error {
return l.Close()
}
return l.Addr().(*net.TCPAddr).Port, closePort, nil
}
20 changes: 10 additions & 10 deletions pkg/cortexpb/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ func (w *WriteRequest) AddHistogramTimeSeries(lbls []labels.Labels, histograms [
//
// Note: while resulting labels.Labels is supposedly sorted, this function
// doesn't enforce that. If input is not sorted, output will be wrong.
func FromLabelAdaptersToLabels(ls []LabelAdapter) labels.Labels {
func FromLabelAdaptersToLabels(ls []LabelPair) labels.Labels {
return *(*labels.Labels)(unsafe.Pointer(&ls))
}

// FromLabelAdaptersToLabelsWithCopy converts []LabelAdapter to labels.Labels.
// Do NOT use unsafe to convert between data types because this function may
// get in input labels whose data structure is reused.
func FromLabelAdaptersToLabelsWithCopy(input []LabelAdapter) labels.Labels {
func FromLabelAdaptersToLabelsWithCopy(input []LabelPair) labels.Labels {
return CopyLabels(FromLabelAdaptersToLabels(input))
}

Expand Down Expand Up @@ -107,29 +107,29 @@ func copyStringToBuffer(in string, buf []byte) (string, []byte) {
// FromLabelsToLabelAdapters casts labels.Labels to []LabelAdapter.
// It uses unsafe, but as LabelAdapter == labels.Label this should be safe.
// This allows us to use labels.Labels directly in protos.
func FromLabelsToLabelAdapters(ls labels.Labels) []LabelAdapter {
return *(*[]LabelAdapter)(unsafe.Pointer(&ls))
func FromLabelsToLabelAdapters(ls labels.Labels) []LabelPair {
return *(*[]LabelPair)(unsafe.Pointer(&ls))
}

// FromLabelAdaptersToMetric converts []LabelAdapter to a model.Metric.
// Don't do this on any performance sensitive paths.
func FromLabelAdaptersToMetric(ls []LabelAdapter) model.Metric {
func FromLabelAdaptersToMetric(ls []LabelPair) model.Metric {
return util.LabelsToMetric(FromLabelAdaptersToLabels(ls))
}

// FromLabelAdaptersToMetric converts []LabelAdapter to a model.Metric with copy.
// Don't do this on any performance sensitive paths.
func FromLabelAdaptersToMetricWithCopy(ls []LabelAdapter) model.Metric {
func FromLabelAdaptersToMetricWithCopy(ls []LabelPair) model.Metric {
return util.LabelsToMetric(FromLabelAdaptersToLabelsWithCopy(ls))
}

// FromMetricsToLabelAdapters converts model.Metric to []LabelAdapter.
// Don't do this on any performance sensitive paths.
// The result is sorted.
func FromMetricsToLabelAdapters(metric model.Metric) []LabelAdapter {
result := make([]LabelAdapter, 0, len(metric))
func FromMetricsToLabelAdapters(metric model.Metric) []LabelPair {
result := make([]LabelPair, 0, len(metric))
for k, v := range metric {
result = append(result, LabelAdapter{
result = append(result, LabelPair{
Name: string(k),
Value: string(v),
})
Expand Down Expand Up @@ -162,7 +162,7 @@ func FromExemplarProtosToExemplars(es []Exemplar) []exemplar.Exemplar {
return result
}

type byLabel []LabelAdapter
type byLabel []LabelPair

func (s byLabel) Len() int { return len(s) }
func (s byLabel) Less(i, j int) bool { return strings.Compare(s[i].Name, s[j].Name) < 0 }
Expand Down
6 changes: 3 additions & 3 deletions pkg/cortexpb/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestMetricMetadataToMetricTypeToMetricType(t *testing.T) {
}

func TestFromLabelAdaptersToLabels(t *testing.T) {
input := []LabelAdapter{{Name: "hello", Value: "world"}}
input := []LabelPair{{Name: "hello", Value: "world"}}
expected := labels.Labels{labels.Label{Name: "hello", Value: "world"}}
actual := FromLabelAdaptersToLabels(input)

Expand All @@ -115,7 +115,7 @@ func TestFromLabelAdaptersToLabels(t *testing.T) {
}

func TestFromLabelAdaptersToLabelsWithCopy(t *testing.T) {
input := []LabelAdapter{{Name: "hello", Value: "world"}}
input := []LabelPair{{Name: "hello", Value: "world"}}
expected := labels.Labels{labels.Label{Name: "hello", Value: "world"}}
actual := FromLabelAdaptersToLabelsWithCopy(input)

Expand All @@ -127,7 +127,7 @@ func TestFromLabelAdaptersToLabelsWithCopy(t *testing.T) {
}

func BenchmarkFromLabelAdaptersToLabelsWithCopy(b *testing.B) {
input := []LabelAdapter{
input := []LabelPair{
{Name: "hello", Value: "world"},
{Name: "some label", Value: "and its value"},
{Name: "long long long long long label name", Value: "perhaps even longer label value, but who's counting anyway?"}}
Expand Down
Loading
Loading