diff --git a/internal/component/otelcol/exporter/loadbalancing/loadbalancing_test.go b/internal/component/otelcol/exporter/loadbalancing/loadbalancing_test.go index 83e241c20..741b59d03 100644 --- a/internal/component/otelcol/exporter/loadbalancing/loadbalancing_test.go +++ b/internal/component/otelcol/exporter/loadbalancing/loadbalancing_test.go @@ -1,13 +1,20 @@ package loadbalancing_test import ( + "context" + "fmt" + "net" "testing" "time" "github.com/grafana/alloy/internal/component/otelcol" otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" "github.com/grafana/alloy/internal/component/otelcol/exporter/loadbalancing" + "github.com/grafana/alloy/internal/runtime/componenttest" + "github.com/grafana/alloy/internal/runtime/logging/level" + "github.com/grafana/alloy/internal/util" "github.com/grafana/alloy/syntax" + "github.com/grafana/dskit/backoff" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/config/configgrpc" @@ -15,6 +22,9 @@ import ( "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/exporter/otlpexporter" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" + "google.golang.org/grpc" ) func getPtrToUint(v uint16) *uint16 { @@ -22,6 +32,206 @@ func getPtrToUint(v uint16) *uint16 { return res } +// Test performs a basic integration test which runs the otelcol.exporter.loadbalancing +// component and ensures that it can pass data to an OTLP gRPC server. +func Test(t *testing.T) { + traceCh := make(chan ptrace.Traces) + tracesServer := makeTracesServer(t, traceCh) + + ctx := componenttest.TestContext(t) + l := util.TestLogger(t) + + ctrl, err := componenttest.NewControllerFromID(l, "otelcol.exporter.loadbalancing") + require.NoError(t, err) + + cfgTemplate := ` + routing_key = "%s" + resolver { + static { + hostnames = ["%s"] + } + } + protocol { + otlp { + client { + compression = "none" + + tls { + insecure = true + insecure_skip_verify = true + } + } + } + } + + debug_metrics { + disable_high_cardinality_metrics = true + } + ` + + cfg := fmt.Sprintf(cfgTemplate, "traceID", tracesServer) + var args loadbalancing.Arguments + require.NoError(t, syntax.Unmarshal([]byte(cfg), &args)) + require.Equal(t, args.DebugMetricsConfig().DisableHighCardinalityMetrics, true) + + go func() { + err := ctrl.Run(ctx, args) + require.NoError(t, err) + }() + + require.NoError(t, ctrl.WaitRunning(time.Second), "component never started") + require.NoError(t, ctrl.WaitExports(time.Second), "component never exported anything") + + // Send traces in the background to our exporter. + go func() { + exports := ctrl.Exports().(otelcol.ConsumerExports) + + bo := backoff.New(ctx, backoff.Config{ + MinBackoff: 10 * time.Millisecond, + MaxBackoff: 100 * time.Millisecond, + }) + for bo.Ongoing() { + err := exports.Input.ConsumeTraces(ctx, createTestTraces()) + if err != nil { + level.Error(l).Log("msg", "failed to send traces", "err", err) + bo.Wait() + continue + } + + return + } + }() + + // Wait for our exporter to finish and pass data to our rpc server. + select { + case <-time.After(time.Second): + require.FailNow(t, "failed waiting for traces") + case tr := <-traceCh: + require.Equal(t, 1, tr.SpanCount()) + } + + // Update the config to disable traces export + cfg = fmt.Sprintf(cfgTemplate, "metric", tracesServer) + require.NoError(t, syntax.Unmarshal([]byte(cfg), &args)) + ctrl.Update(args) + + // Send traces in the background to our exporter. + go func() { + exports := ctrl.Exports().(otelcol.ConsumerExports) + + bo := backoff.New(ctx, backoff.Config{ + MaxRetries: 3, + MinBackoff: 10 * time.Millisecond, + MaxBackoff: 100 * time.Millisecond, + }) + for bo.Ongoing() { + err := exports.Input.ConsumeTraces(ctx, createTestTraces()) + require.ErrorContains(t, err, "telemetry type is not supported") + if err != nil { + level.Error(l).Log("msg", "failed to send traces", "err", err) + bo.Wait() + continue + } + + return + } + }() + + // Wait for our exporter to finish and pass data to our rpc server. + // no error here, as we we expect to fail sending in the first place + select { + case <-traceCh: + require.FailNow(t, "no traces expected here") + case <-time.After(time.Second): + } + + // Re-run the test with reenabled traces export + cfg = fmt.Sprintf(cfgTemplate, "traceID", tracesServer) + require.NoError(t, syntax.Unmarshal([]byte(cfg), &args)) + ctrl.Update(args) + + // Send traces in the background to our exporter. + go func() { + exports := ctrl.Exports().(otelcol.ConsumerExports) + + bo := backoff.New(ctx, backoff.Config{ + MinBackoff: 10 * time.Millisecond, + MaxBackoff: 100 * time.Millisecond, + }) + for bo.Ongoing() { + err := exports.Input.ConsumeTraces(ctx, createTestTraces()) + if err != nil { + level.Error(l).Log("msg", "failed to send traces", "err", err) + bo.Wait() + continue + } + + return + } + }() + + // Wait for our exporter to finish and pass data to our rpc server. + select { + case <-time.After(time.Second): + require.FailNow(t, "failed waiting for traces") + case tr := <-traceCh: + require.Equal(t, 1, tr.SpanCount()) + } +} + +// makeTracesServer returns a host:port which will accept traces over insecure +// gRPC. +func makeTracesServer(t *testing.T, ch chan ptrace.Traces) string { + t.Helper() + + lis, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + srv := grpc.NewServer() + ptraceotlp.RegisterGRPCServer(srv, &mockTracesReceiver{ch: ch}) + + go func() { + err := srv.Serve(lis) + require.NoError(t, err) + }() + t.Cleanup(srv.Stop) + + return lis.Addr().String() +} + +type mockTracesReceiver struct { + ptraceotlp.UnimplementedGRPCServer + ch chan ptrace.Traces +} + +var _ ptraceotlp.GRPCServer = (*mockTracesReceiver)(nil) + +func (ms *mockTracesReceiver) Export(_ context.Context, req ptraceotlp.ExportRequest) (ptraceotlp.ExportResponse, error) { + ms.ch <- req.Traces() + return ptraceotlp.NewExportResponse(), nil +} + +func createTestTraces() ptrace.Traces { + // Matches format from the protobuf definition: + // https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto + bb := `{ + "resource_spans": [{ + "scope_spans": [{ + "spans": [{ + "name": "TestSpan" + }] + }] + }] + }` + + decoder := &ptrace.JSONUnmarshaler{} + data, err := decoder.UnmarshalTraces([]byte(bb)) + if err != nil { + panic(err) + } + return data +} + func TestConfigConversion(t *testing.T) { var ( defaultRetrySettings = configretry.NewDefaultBackOffConfig()