Skip to content

Commit

Permalink
agent/tcpconn: add integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
nadiamoe committed Oct 24, 2023
1 parent f9e1660 commit 057ac2f
Showing 1 changed file with 215 additions and 0 deletions.
215 changes: 215 additions & 0 deletions pkg/agent/tcpconn/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
//go:build integration
// +build integration

package tcpconn_test

import (
"bufio"
"context"
"fmt"
"math/rand"
"net"
"path/filepath"
"testing"
"time"

"github.com/docker/docker/api/types/container"
"github.com/docker/go-connections/nat"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
)

const echoServerPort = "6666"

func tcpDisruption(port string, duration time.Duration, rate float64) []string {
return []string{
"xk6-disruptor-agent", "tcp-drop", "-d", fmt.Sprint(duration), "--port", port, "--rate", fmt.Sprint(rate),
}
}

const disruptWithRate = "xk6-disruptor-agent tcp-drop -d 1h --port 6666 --rate %f"

// Test_DropsConnections tests that a number of connections to a target server fail according to rate.
func Test_DropsConnections(t *testing.T) {
t.Parallel()

const rate = 0.5

ctx := context.TODO()

echoserver, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ProviderType: testcontainers.ProviderDocker,
ContainerRequest: testcontainers.ContainerRequest{
ExposedPorts: []string{echoServerPort},
FromDockerfile: testcontainers.FromDockerfile{
Dockerfile: "Dockerfile",
Context: filepath.Join("..", "..", "..", "testcontainers", "echoserver"),
},
WaitingFor: wait.ForExposedPort(),
},
Started: true,
})
if err != nil {
t.Fatalf("creating echoserver container: %v", err)
}

t.Cleanup(func() {
echoserver.Terminate(ctx)
})

agentSidecar, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ProviderType: testcontainers.ProviderDocker,
ContainerRequest: testcontainers.ContainerRequest{
Image: "ghcr.io/grafana/xk6-disruptor-agent",
Cmd: tcpDisruption(echoServerPort, time.Hour, rate),
Privileged: true,
WaitingFor: wait.ForExec([]string{"pgrep", "xk6-disruptor-agent"}),
NetworkMode: container.NetworkMode("container:" + echoserver.GetContainerID()),
},
Started: true,
})
if err != nil {
t.Fatalf("creating agent container: %v", err)
}

t.Cleanup(func() {
agentSidecar.Terminate(ctx)
})

port, err := echoserver.MappedPort(ctx, nat.Port(echoServerPort))
if err != nil {
t.Fatalf("getting echoserver mapped port: %v", err)
}

// TODO: Find a better way to wait for disruption to start.
time.Sleep(time.Second)

errors := make(chan error)

const nTests = 500
for i := 0; i < nTests; i++ {
go func() {
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
errors <- testEcho(net.JoinHostPort("localhost", port.Port()))
}()
}

nErrs := 0.0
for i := 0; i < nTests; i++ {
if err := <-errors; err != nil {
nErrs++
}
}

// We expect nTests * rate errors, but we will accept +-10%.
min := nTests * rate * 0.9
max := nTests * rate * 1.1

if nErrs < min || nErrs > max {
t.Fatalf("got %f errors, expected %f<%f<%f", nErrs, min, nTests*rate, max)
}

t.Logf("Got %f errors", nErrs)
}

// Test_StopsDroppingConnections tests that after the disruption ends, no connections are terminated.
func Test_StopsDroppingConnections(t *testing.T) {
t.Parallel()

const rate = 1

ctx := context.TODO()

echoserver, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ProviderType: testcontainers.ProviderDocker,
ContainerRequest: testcontainers.ContainerRequest{
ExposedPorts: []string{echoServerPort},
FromDockerfile: testcontainers.FromDockerfile{
Dockerfile: "Dockerfile",
Context: filepath.Join("..", "..", "..", "testcontainers", "echoserver"),
},
WaitingFor: wait.ForExposedPort(),
},
Started: true,
})
if err != nil {
t.Fatalf("creating echoserver container: %v", err)
}

t.Cleanup(func() {
echoserver.Terminate(ctx)
})

agentSidecar, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ProviderType: testcontainers.ProviderDocker,
ContainerRequest: testcontainers.ContainerRequest{
Image: "ghcr.io/grafana/xk6-disruptor-agent",
Cmd: tcpDisruption(echoServerPort, 3*time.Second, rate),
Privileged: true,
WaitingFor: wait.ForExec([]string{"pgrep", "xk6-disruptor-agent"}),
NetworkMode: container.NetworkMode("container:" + echoserver.GetContainerID()),
},
Started: true,
})
if err != nil {
t.Fatalf("creating agent container: %v", err)
}

t.Cleanup(func() {
agentSidecar.Terminate(ctx)
})

port, err := echoserver.MappedPort(ctx, nat.Port(echoServerPort))
if err != nil {
t.Fatalf("getting echoserver mapped port: %v", err)
}

// Wait until the disruption has ended.
time.Sleep(5 * time.Second)

errors := make(chan error)

const nTests = 5
for i := 0; i < nTests; i++ {
go func() {
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
errors <- testEcho(net.JoinHostPort("localhost", port.Port()))
}()
}

for i := 0; i < nTests; i++ {
if err := <-errors; err != nil {
t.Errorf("Error connecting to echoserver: %v", err)
}
}
}

// testEcho attempts to connect to an echoserver at address, sends a line, and checks that it received the same line
// back.
func testEcho(address string) error {
const line = "I am a test!\n"

conn, err := net.Dial("tcp", address)
if err != nil {
return fmt.Errorf("connecting : %w", err)
}

defer conn.Close()

_, err = conn.Write([]byte(line))
if err != nil {
return fmt.Errorf("writing string: %w", err)
}

reader := bufio.NewReader(conn)
echoed, err := reader.ReadString('\n')
if err != nil {
return fmt.Errorf("reading back: %w", err)
}

if echoed != line {
return fmt.Errorf("echoed string %q does not match sent %q", echoed, line)
}

return nil
}

0 comments on commit 057ac2f

Please sign in to comment.