Skip to content

Commit

Permalink
Removing LabelAdapter
Browse files Browse the repository at this point in the history
Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot committed Dec 11, 2024
1 parent 8d79acc commit c9e23c3
Show file tree
Hide file tree
Showing 160 changed files with 6,713 additions and 5,118 deletions.
11 changes: 4 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ require (
golang.org/x/net v0.32.0
golang.org/x/sync v0.10.0
golang.org/x/time v0.8.0
google.golang.org/grpc v1.68.0
google.golang.org/grpc v1.70.0-dev.0.20241210174008-e4d084a6ece3
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
)
Expand All @@ -92,7 +92,7 @@ require (
cloud.google.com/go v0.115.1 // indirect
cloud.google.com/go/auth v0.9.3 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.4 // indirect
cloud.google.com/go/compute/metadata v0.5.0 // indirect
cloud.google.com/go/compute/metadata v0.5.2 // indirect
cloud.google.com/go/iam v1.1.13 // indirect
cloud.google.com/go/storage v1.43.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0 // indirect
Expand Down Expand Up @@ -239,8 +239,8 @@ require (
gonum.org/v1/gonum v0.15.0 // indirect
google.golang.org/api v0.195.0 // indirect
google.golang.org/genproto v0.0.0-20240823204242-4ba0660f739c // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a // indirect
gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect
gopkg.in/telebot.v3 v3.2.1 // indirect
k8s.io/apimachinery v0.31.1 // indirect
Expand Down Expand Up @@ -275,6 +275,3 @@ replace github.com/google/gnostic => github.com/googleapis/gnostic v0.6.9
// Same replace used by thanos: (may be removed in the future)
// https://github.com/thanos-io/thanos/blob/fdeea3917591fc363a329cbe23af37c6fff0b5f0/go.mod#L265
replace gopkg.in/alecthomas/kingpin.v2 => github.com/alecthomas/kingpin v1.3.8-0.20210301060133-17f40c25f497

// gRPC 1.66 introduced memory pooling which breaks Cortex queries. Pin 1.65.0 until we have a fix.
replace google.golang.org/grpc => google.golang.org/grpc v1.65.0
1,216 changes: 68 additions & 1,148 deletions go.sum

Large diffs are not rendered by default.

233 changes: 233 additions & 0 deletions integration/grpc_server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
//go:build requires_docker
// +build requires_docker

package integration

import (
"context"
"flag"
"fmt"
"math/rand"
"net"
"strconv"
"sync"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/server"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/distributor/distributorpb"
ingester_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
)

type mockGprcServer struct {
ingester_client.IngesterServer
}

func (m mockGprcServer) QueryStream(_ *ingester_client.QueryRequest, streamServer ingester_client.Ingester_QueryStreamServer) error {
md, _ := metadata.FromIncomingContext(streamServer.Context())
i, _ := strconv.Atoi(md["i"][0])
return streamServer.Send(createStreamResponse(i))
}

func (m mockGprcServer) Push(ctx context.Context, request *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
time.Sleep(time.Duration(rand.Int31n(100)) * time.Millisecond)
md, _ := metadata.FromIncomingContext(ctx)
i, _ := strconv.Atoi(md["i"][0])
expected := createRequest(i)

if expected.String() != request.String() {
return nil, fmt.Errorf("expected %v, got %v", expected, request)
}
return nil, nil
}

func run(t *testing.T, cfg server.Config, register func(s *grpc.Server), validate func(t *testing.T, con *grpc.ClientConn)) {
savedRegistry := prometheus.DefaultRegisterer
prometheus.DefaultRegisterer = prometheus.NewRegistry()
defer func() {
prometheus.DefaultRegisterer = savedRegistry
}()

grpcPort, closeGrpcPort, err := getLocalHostPort()
require.NoError(t, err)
httpPort, closeHTTPPort, err := getLocalHostPort()
require.NoError(t, err)

err = closeGrpcPort()
require.NoError(t, err)
err = closeHTTPPort()
require.NoError(t, err)

cfg.HTTPListenPort = httpPort
cfg.GRPCListenPort = grpcPort

serv, err := server.New(cfg)
require.NoError(t, err)
register(serv.GRPC)

go func() {
err := serv.Run()
require.NoError(t, err)
}()

defer serv.Shutdown()

grpcHost := fmt.Sprintf("localhost:%d", grpcPort)

clientConfig := grpcclient.Config{}
clientConfig.RegisterFlags(flag.NewFlagSet("fake", flag.ContinueOnError))

dialOptions, err := clientConfig.DialOption(nil, nil)
assert.NoError(t, err)
dialOptions = append([]grpc.DialOption{grpc.WithDefaultCallOptions(clientConfig.CallOptions()...)}, dialOptions...)

conn, err := grpc.NewClient(grpcHost, dialOptions...)
assert.NoError(t, err)
validate(t, conn)
}

func TestConcurrentGrpcCalls(t *testing.T) {
cfg := server.Config{}
(&cfg).RegisterFlags(flag.NewFlagSet("fake", flag.ContinueOnError))

tc := map[string]struct {
cfg server.Config
register func(s *grpc.Server)
validate func(t *testing.T, con *grpc.ClientConn)
}{
"distributor": {
cfg: cfg,
register: func(s *grpc.Server) {
d := &mockGprcServer{}
distributorpb.RegisterDistributorServer(s, d)
},
validate: func(t *testing.T, conn *grpc.ClientConn) {
client := distributorpb.NewDistributorClient(conn)
wg := sync.WaitGroup{}
n := 10000
wg.Add(n)
for i := 0; i < n; i++ {
go func(i int) {
defer wg.Done()
ctx := context.Background()
ctx = metadata.NewOutgoingContext(ctx, metadata.MD{"i": []string{strconv.Itoa(i)}})
_, err := client.Push(ctx, createRequest(i))
require.NoError(t, err)
}(i)
}

wg.Wait()
},
},
"ingester": {
cfg: cfg,
register: func(s *grpc.Server) {
d := &mockGprcServer{}
ingester_client.RegisterIngesterServer(s, d)
},
validate: func(t *testing.T, conn *grpc.ClientConn) {
client := ingester_client.NewIngesterClient(conn)
wg := sync.WaitGroup{}
n := 10000
wg.Add(n)
for i := 0; i < n; i++ {
go func(i int) {
defer wg.Done()
ctx := context.Background()
ctx = metadata.NewOutgoingContext(ctx, metadata.MD{"i": []string{strconv.Itoa(i)}})
s, err := client.QueryStream(ctx, &ingester_client.QueryRequest{})
require.NoError(t, err)
resp, err := s.Recv()
require.NoError(t, err)
expected := createStreamResponse(i)
require.Equal(t, expected.String(), resp.String())
}(i)
}

wg.Wait()
},
},
}

for name, c := range tc {
t.Run(name, func(t *testing.T) {
run(t, c.cfg, c.register, c.validate)
})
}
}

func createStreamResponse(i int) *ingester_client.QueryStreamResponse {
return &ingester_client.QueryStreamResponse{Chunkseries: []ingester_client.TimeSeriesChunk{
{
FromIngesterId: strconv.Itoa(i),
Labels: createLabels(i),
Chunks: []ingester_client.Chunk{
{
StartTimestampMs: int64(i),
EndTimestampMs: int64(i),
Encoding: int32(i),
Data: []byte(strconv.Itoa(i)),
},
},
},
}}
}

func createRequest(i int) *cortexpb.WriteRequest {
labels := createLabels(i)
return &cortexpb.WriteRequest{
Timeseries: []cortexpb.PreallocTimeseries{
{
TimeSeries: &cortexpb.TimeSeries{
Labels: labels,
Samples: []cortexpb.Sample{
{TimestampMs: int64(i), Value: float64(i)},
},
Exemplars: []cortexpb.Exemplar{
{
Labels: labels,
Value: float64(i),
TimestampMs: int64(i),
},
},
},
},
},
}
}

func createLabels(i int) []cortexpb.LabelPair {
labels := make([]cortexpb.LabelPair, 0, 100)
for j := 0; j < 100; j++ {
labels = append(labels, cortexpb.LabelPair{
Name: fmt.Sprintf("test%d_%d", i, j),
Value: fmt.Sprintf("test%d_%d", i, j),
})
}
return labels
}

func getLocalHostPort() (int, func() error, error) {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
return 0, nil, err
}

l, err := net.ListenTCP("tcp", addr)
if err != nil {
return 0, nil, err
}

closePort := func() error {
return l.Close()
}
return l.Addr().(*net.TCPAddr).Port, closePort, nil
}
20 changes: 10 additions & 10 deletions pkg/cortexpb/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ func (w *WriteRequest) AddHistogramTimeSeries(lbls []labels.Labels, histograms [
//
// Note: while resulting labels.Labels is supposedly sorted, this function
// doesn't enforce that. If input is not sorted, output will be wrong.
func FromLabelAdaptersToLabels(ls []LabelAdapter) labels.Labels {
func FromLabelAdaptersToLabels(ls []LabelPair) labels.Labels {
return *(*labels.Labels)(unsafe.Pointer(&ls))
}

// FromLabelAdaptersToLabelsWithCopy converts []LabelAdapter to labels.Labels.
// Do NOT use unsafe to convert between data types because this function may
// get in input labels whose data structure is reused.
func FromLabelAdaptersToLabelsWithCopy(input []LabelAdapter) labels.Labels {
func FromLabelAdaptersToLabelsWithCopy(input []LabelPair) labels.Labels {
return CopyLabels(FromLabelAdaptersToLabels(input))
}

Expand Down Expand Up @@ -107,29 +107,29 @@ func copyStringToBuffer(in string, buf []byte) (string, []byte) {
// FromLabelsToLabelAdapters casts labels.Labels to []LabelAdapter.
// It uses unsafe, but as LabelAdapter == labels.Label this should be safe.
// This allows us to use labels.Labels directly in protos.
func FromLabelsToLabelAdapters(ls labels.Labels) []LabelAdapter {
return *(*[]LabelAdapter)(unsafe.Pointer(&ls))
func FromLabelsToLabelAdapters(ls labels.Labels) []LabelPair {
return *(*[]LabelPair)(unsafe.Pointer(&ls))
}

// FromLabelAdaptersToMetric converts []LabelAdapter to a model.Metric.
// Don't do this on any performance sensitive paths.
func FromLabelAdaptersToMetric(ls []LabelAdapter) model.Metric {
func FromLabelAdaptersToMetric(ls []LabelPair) model.Metric {
return util.LabelsToMetric(FromLabelAdaptersToLabels(ls))
}

// FromLabelAdaptersToMetric converts []LabelAdapter to a model.Metric with copy.
// Don't do this on any performance sensitive paths.
func FromLabelAdaptersToMetricWithCopy(ls []LabelAdapter) model.Metric {
func FromLabelAdaptersToMetricWithCopy(ls []LabelPair) model.Metric {
return util.LabelsToMetric(FromLabelAdaptersToLabelsWithCopy(ls))
}

// FromMetricsToLabelAdapters converts model.Metric to []LabelAdapter.
// Don't do this on any performance sensitive paths.
// The result is sorted.
func FromMetricsToLabelAdapters(metric model.Metric) []LabelAdapter {
result := make([]LabelAdapter, 0, len(metric))
func FromMetricsToLabelAdapters(metric model.Metric) []LabelPair {
result := make([]LabelPair, 0, len(metric))
for k, v := range metric {
result = append(result, LabelAdapter{
result = append(result, LabelPair{
Name: string(k),
Value: string(v),
})
Expand Down Expand Up @@ -162,7 +162,7 @@ func FromExemplarProtosToExemplars(es []Exemplar) []exemplar.Exemplar {
return result
}

type byLabel []LabelAdapter
type byLabel []LabelPair

func (s byLabel) Len() int { return len(s) }
func (s byLabel) Less(i, j int) bool { return strings.Compare(s[i].Name, s[j].Name) < 0 }
Expand Down
6 changes: 3 additions & 3 deletions pkg/cortexpb/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestMetricMetadataToMetricTypeToMetricType(t *testing.T) {
}

func TestFromLabelAdaptersToLabels(t *testing.T) {
input := []LabelAdapter{{Name: "hello", Value: "world"}}
input := []LabelPair{{Name: "hello", Value: "world"}}
expected := labels.Labels{labels.Label{Name: "hello", Value: "world"}}
actual := FromLabelAdaptersToLabels(input)

Expand All @@ -115,7 +115,7 @@ func TestFromLabelAdaptersToLabels(t *testing.T) {
}

func TestFromLabelAdaptersToLabelsWithCopy(t *testing.T) {
input := []LabelAdapter{{Name: "hello", Value: "world"}}
input := []LabelPair{{Name: "hello", Value: "world"}}
expected := labels.Labels{labels.Label{Name: "hello", Value: "world"}}
actual := FromLabelAdaptersToLabelsWithCopy(input)

Expand All @@ -127,7 +127,7 @@ func TestFromLabelAdaptersToLabelsWithCopy(t *testing.T) {
}

func BenchmarkFromLabelAdaptersToLabelsWithCopy(b *testing.B) {
input := []LabelAdapter{
input := []LabelPair{
{Name: "hello", Value: "world"},
{Name: "some label", Value: "and its value"},
{Name: "long long long long long label name", Value: "perhaps even longer label value, but who's counting anyway?"}}
Expand Down
Loading

0 comments on commit c9e23c3

Please sign in to comment.