diff --git a/pkg/agent/tcpconn/integration_test.go b/pkg/agent/tcpconn/integration_test.go new file mode 100644 index 00000000..a541d4d1 --- /dev/null +++ b/pkg/agent/tcpconn/integration_test.go @@ -0,0 +1,215 @@ +//go:build integration +// +build integration + +package tcpconn_test + +import ( + "bufio" + "context" + "fmt" + "math/rand" + "net" + "path/filepath" + "testing" + "time" + + "github.com/docker/docker/api/types/container" + "github.com/docker/go-connections/nat" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" +) + +const echoServerPort = "6666" + +func tcpDisruption(port string, duration time.Duration, rate float64) []string { + return []string{ + "xk6-disruptor-agent", "tcp-drop", "-d", fmt.Sprint(duration), "--port", port, "--rate", fmt.Sprint(rate), + } +} + +const disruptWithRate = "xk6-disruptor-agent tcp-drop -d 1h --port 6666 --rate %f" + +// Test_DropsConnections tests that a number of connections to a target server fail according to rate. +func Test_DropsConnections(t *testing.T) { + t.Parallel() + + const rate = 0.5 + + ctx := context.TODO() + + echoserver, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ProviderType: testcontainers.ProviderDocker, + ContainerRequest: testcontainers.ContainerRequest{ + ExposedPorts: []string{echoServerPort}, + FromDockerfile: testcontainers.FromDockerfile{ + Dockerfile: "Dockerfile", + Context: filepath.Join("..", "..", "..", "testcontainers", "echoserver"), + }, + WaitingFor: wait.ForExposedPort(), + }, + Started: true, + }) + if err != nil { + t.Fatalf("creating echoserver container: %v", err) + } + + t.Cleanup(func() { + echoserver.Terminate(ctx) + }) + + agentSidecar, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ProviderType: testcontainers.ProviderDocker, + ContainerRequest: testcontainers.ContainerRequest{ + Image: "ghcr.io/grafana/xk6-disruptor-agent", + Cmd: tcpDisruption(echoServerPort, time.Hour, rate), + Privileged: true, + WaitingFor: wait.ForExec([]string{"pgrep", "xk6-disruptor-agent"}), + NetworkMode: container.NetworkMode("container:" + echoserver.GetContainerID()), + }, + Started: true, + }) + if err != nil { + t.Fatalf("creating agent container: %v", err) + } + + t.Cleanup(func() { + agentSidecar.Terminate(ctx) + }) + + port, err := echoserver.MappedPort(ctx, nat.Port(echoServerPort)) + if err != nil { + t.Fatalf("getting echoserver mapped port: %v", err) + } + + // TODO: Find a better way to wait for disruption to start. + time.Sleep(time.Second) + + errors := make(chan error) + + const nTests = 500 + for i := 0; i < nTests; i++ { + go func() { + time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond) + errors <- testEcho(net.JoinHostPort("localhost", port.Port())) + }() + } + + nErrs := 0.0 + for i := 0; i < nTests; i++ { + if err := <-errors; err != nil { + nErrs++ + } + } + + // We expect nTests * rate errors, but we will accept +-10%. + min := nTests * rate * 0.9 + max := nTests * rate * 1.1 + + if nErrs < min || nErrs > max { + t.Fatalf("got %f errors, expected %f<%f<%f", nErrs, min, nTests*rate, max) + } + + t.Logf("Got %f errors", nErrs) +} + +// Test_StopsDroppingConnections tests that after the disruption ends, no connections are terminated. +func Test_StopsDroppingConnections(t *testing.T) { + t.Parallel() + + const rate = 1 + + ctx := context.TODO() + + echoserver, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ProviderType: testcontainers.ProviderDocker, + ContainerRequest: testcontainers.ContainerRequest{ + ExposedPorts: []string{echoServerPort}, + FromDockerfile: testcontainers.FromDockerfile{ + Dockerfile: "Dockerfile", + Context: filepath.Join("..", "..", "..", "testcontainers", "echoserver"), + }, + WaitingFor: wait.ForExposedPort(), + }, + Started: true, + }) + if err != nil { + t.Fatalf("creating echoserver container: %v", err) + } + + t.Cleanup(func() { + echoserver.Terminate(ctx) + }) + + agentSidecar, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ProviderType: testcontainers.ProviderDocker, + ContainerRequest: testcontainers.ContainerRequest{ + Image: "ghcr.io/grafana/xk6-disruptor-agent", + Cmd: tcpDisruption(echoServerPort, 3*time.Second, rate), + Privileged: true, + WaitingFor: wait.ForExec([]string{"pgrep", "xk6-disruptor-agent"}), + NetworkMode: container.NetworkMode("container:" + echoserver.GetContainerID()), + }, + Started: true, + }) + if err != nil { + t.Fatalf("creating agent container: %v", err) + } + + t.Cleanup(func() { + agentSidecar.Terminate(ctx) + }) + + port, err := echoserver.MappedPort(ctx, nat.Port(echoServerPort)) + if err != nil { + t.Fatalf("getting echoserver mapped port: %v", err) + } + + // Wait until the disruption has ended. + time.Sleep(5 * time.Second) + + errors := make(chan error) + + const nTests = 5 + for i := 0; i < nTests; i++ { + go func() { + time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond) + errors <- testEcho(net.JoinHostPort("localhost", port.Port())) + }() + } + + for i := 0; i < nTests; i++ { + if err := <-errors; err != nil { + t.Errorf("Error connecting to echoserver: %v", err) + } + } +} + +// testEcho attempts to connect to an echoserver at address, sends a line, and checks that it received the same line +// back. +func testEcho(address string) error { + const line = "I am a test!\n" + + conn, err := net.Dial("tcp", address) + if err != nil { + return fmt.Errorf("connecting : %w", err) + } + + defer conn.Close() + + _, err = conn.Write([]byte(line)) + if err != nil { + return fmt.Errorf("writing string: %w", err) + } + + reader := bufio.NewReader(conn) + echoed, err := reader.ReadString('\n') + if err != nil { + return fmt.Errorf("reading back: %w", err) + } + + if echoed != line { + return fmt.Errorf("echoed string %q does not match sent %q", echoed, line) + } + + return nil +}