From 75eb36eca684e4d3b67d81a7ea4f8cd1be653275 Mon Sep 17 00:00:00 2001 From: pablochacin Date: Mon, 23 Oct 2023 15:30:40 +0200 Subject: [PATCH] Unify fault injection commands (#356) * Add IntOrStr type * Use IntOrString for fault ports * Unify pod and service fault injection commands Signed-off-by: Pablo Chacin --- e2e/disruptors/pod_e2e_test.go | 14 +-- e2e/disruptors/service_e2e_test.go | 7 +- pkg/api/convert.go | 39 ++++++- pkg/api/convert_test.go | 71 +++++++++---- pkg/disruptors/commads.go | 93 ++++------------- pkg/disruptors/commads_test.go | 25 ++--- pkg/disruptors/pod.go | 10 +- pkg/disruptors/protocol.go | 6 +- pkg/disruptors/service.go | 25 ++++- pkg/types/intstr/intstr.go | 73 ++++++++++++++ pkg/types/intstr/intstr_test.go | 131 ++++++++++++++++++++++++ pkg/utils/kubernetes.go | 69 +++++-------- pkg/utils/kubernetes_test.go | 157 ++++++++++++++++------------- 13 files changed, 476 insertions(+), 244 deletions(-) create mode 100644 pkg/types/intstr/intstr.go create mode 100644 pkg/types/intstr/intstr_test.go diff --git a/e2e/disruptors/pod_e2e_test.go b/e2e/disruptors/pod_e2e_test.go index aa29c22a..8cfd7932 100644 --- a/e2e/disruptors/pod_e2e_test.go +++ b/e2e/disruptors/pod_e2e_test.go @@ -12,8 +12,10 @@ import ( "time" "github.com/grafana/xk6-disruptor/pkg/agent/protocol" + "github.com/grafana/xk6-disruptor/pkg/types/intstr" + corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/intstr" + k8sintstr "k8s.io/apimachinery/pkg/util/intstr" "github.com/grafana/xk6-disruptor/pkg/disruptors" "github.com/grafana/xk6-disruptor/pkg/kubernetes" @@ -71,7 +73,7 @@ func Test_PodDisruptor(t *testing.T) { port: 80, injector: func(d disruptors.PodDisruptor) error { fault := disruptors.HTTPFault{ - Port: 80, + Port: intstr.FromInt32(80), ErrorRate: 1.0, ErrorCode: 500, } @@ -97,7 +99,7 @@ func Test_PodDisruptor(t *testing.T) { port: 9000, injector: func(d disruptors.PodDisruptor) error { fault := disruptors.GrpcFault{ - Port: 9000, + Port: intstr.FromInt32(9000), ErrorRate: 1.0, StatusCode: 14, Exclude: "grpc.reflection.v1alpha.ServerReflection,grpc.reflection.v1.ServerReflection", @@ -135,7 +137,7 @@ func Test_PodDisruptor(t *testing.T) { namespace, tc.pod, tc.service, - intstr.FromInt(tc.port), + k8sintstr.FromInt(tc.port), 30*time.Second, ) if err != nil { @@ -225,7 +227,7 @@ func Test_PodDisruptor(t *testing.T) { namespace, fixtures.BuildHttpbinPod(), service, - intstr.FromInt(80), + k8sintstr.FromInt(80), 30*time.Second, ) if err != nil { @@ -251,7 +253,7 @@ func Test_PodDisruptor(t *testing.T) { } fault := disruptors.HTTPFault{ - Port: 80, + Port: intstr.FromInt32(80), ErrorRate: 1.0, ErrorCode: 500, } diff --git a/e2e/disruptors/service_e2e_test.go b/e2e/disruptors/service_e2e_test.go index 4a546bd5..5afa0c06 100644 --- a/e2e/disruptors/service_e2e_test.go +++ b/e2e/disruptors/service_e2e_test.go @@ -9,7 +9,7 @@ import ( "time" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/intstr" + k8sintstr "k8s.io/apimachinery/pkg/util/intstr" "github.com/grafana/xk6-disruptor/pkg/disruptors" "github.com/grafana/xk6-disruptor/pkg/kubernetes" @@ -18,6 +18,7 @@ import ( "github.com/grafana/xk6-disruptor/pkg/testutils/e2e/deploy" "github.com/grafana/xk6-disruptor/pkg/testutils/e2e/fixtures" "github.com/grafana/xk6-disruptor/pkg/testutils/e2e/kubernetes/namespace" + "github.com/grafana/xk6-disruptor/pkg/types/intstr" ) func Test_ServiceDisruptor(t *testing.T) { @@ -65,7 +66,7 @@ func Test_ServiceDisruptor(t *testing.T) { port: 80, injector: func(d disruptors.ServiceDisruptor) error { fault := disruptors.HTTPFault{ - Port: 80, + Port: intstr.FromInt32(80), ErrorRate: 1.0, ErrorCode: 500, } @@ -100,7 +101,7 @@ func Test_ServiceDisruptor(t *testing.T) { namespace, tc.pod, tc.service, - intstr.FromInt(tc.port), + k8sintstr.FromInt(tc.port), 30*time.Second, ) if err != nil { diff --git a/pkg/api/convert.go b/pkg/api/convert.go index ce8de925..04786061 100644 --- a/pkg/api/convert.go +++ b/pkg/api/convert.go @@ -2,8 +2,11 @@ package api import ( "fmt" + "math" "reflect" "time" + + "github.com/grafana/xk6-disruptor/pkg/types/intstr" ) // Convert converts from a generic object received from the JS interface via goja into a go type. @@ -17,6 +20,7 @@ import ( // string <-- string // time.Duration <-- string // time.Time <-- string (only in RFC3339 format) +// IntOrStr <-- string or int64 (only supports int32 values) // // (1) TODO: support other key types, such as numeric and attempt conversion from the string key func Convert(value interface{}, target interface{}) error { @@ -47,6 +51,14 @@ func Convert(value interface{}, target interface{}) error { if targetValue.Type().String() == "time.Duration" { return convertDuration(value, target) } + + if targetValue.Type().String() == "intstr.IntOrString" { + return convertIntOrString(value, target) + } + case reflect.String: + if targetValue.Type().String() == "intstr.IntOrString" { + return convertIntOrString(value, target) + } } // try default conversions @@ -147,8 +159,7 @@ func convertDuration(value interface{}, target interface{}) error { } targetValue.Set(reflect.ValueOf(duration)) - - return err + return nil } func convertTime(value interface{}, target interface{}) error { @@ -167,3 +178,27 @@ func convertTime(value interface{}, target interface{}) error { targetValue.Set(reflect.ValueOf(timeValue)) return nil } + +func convertIntOrString(value interface{}, target interface{}) error { + targetValue := reflect.ValueOf(target).Elem() + + int64Value, ok := value.(int64) + if ok { + // check overflow here to avoid panic in the conversion + if int64Value > math.MaxInt32 || int64Value < math.MinInt32 { + return fmt.Errorf("value overflows int32 range: %d", int64Value) + } + intOrStrValue := intstr.FromInt32(int32(int64Value)) + targetValue.Set(reflect.ValueOf(intOrStrValue)) + return nil + } + + stringValue, ok := value.(string) + if ok { + intOrStrValue := intstr.FromString(stringValue) + targetValue.Set(reflect.ValueOf(intOrStrValue)) + return nil + } + + return fmt.Errorf("expected int or string value got %s", reflect.TypeOf(value)) +} diff --git a/pkg/api/convert_test.go b/pkg/api/convert_test.go index a4dd361b..5e836655 100644 --- a/pkg/api/convert_test.go +++ b/pkg/api/convert_test.go @@ -4,6 +4,8 @@ import ( "reflect" "testing" "time" + + "github.com/grafana/xk6-disruptor/pkg/types/intstr" ) func Test_Conversions(t *testing.T) { @@ -13,13 +15,15 @@ func Test_Conversions(t *testing.T) { SubfieldString string } type TypedFields struct { - FieldString string - FieldDuration time.Duration - FieldInt int64 - FieldFloat float64 - FieldStruct StructField - FieldMap map[string]string - FieldArray []string + IntOrStrStr intstr.IntOrString + IntOrStrInt intstr.IntOrString + String string + Duration time.Duration + Int int64 + Float float64 + Struct StructField + Map map[string]string + Array []string } testCases := []struct { @@ -43,6 +47,27 @@ func Test_Conversions(t *testing.T) { expected: int64(1), expectError: false, }, + { + description: "IntOrString int conversion", + value: int64(1), + target: new(intstr.IntOrString), + expected: intstr.FromInt32(1), + expectError: false, + }, + { + description: "IntOrString string to int conversion", + value: "1", + target: new(intstr.IntOrString), + expected: intstr.FromInt32(1), + expectError: false, + }, + { + description: "IntOrString string conversion", + value: "one", + target: new(intstr.IntOrString), + expected: intstr.FromString("one"), + expectError: false, + }, { description: "Float to Int conversion", value: float64(1.0), @@ -153,31 +178,35 @@ func Test_Conversions(t *testing.T) { { description: "Struct field conversion", value: map[string]interface{}{ - "fieldString": "string", - "fieldInt": int64(1), - "fieldDuration": "1s", - "fieldFloat": float64(1.0), - "fieldStruct": map[string]interface{}{ + "intOrStrStr": "uno", + "intOrStrInt": int64(1), + "string": "string", + "int": int64(1), + "duration": "1s", + "float": float64(1.0), + "struct": map[string]interface{}{ "subfieldInt": int64(0), "subfieldString": "string", }, - "fieldMap": map[string]interface{}{ + "map": map[string]interface{}{ "key": "value", }, - "fieldArray": []interface{}{"string"}, + "array": []interface{}{"string"}, }, target: &TypedFields{}, expected: TypedFields{ - FieldString: "string", - FieldInt: 1, - FieldDuration: time.Second, - FieldFloat: 1.0, - FieldStruct: StructField{ + IntOrStrStr: intstr.FromString("uno"), + IntOrStrInt: intstr.FromInt32(1), + String: "string", + Int: 1, + Duration: time.Second, + Float: 1.0, + Struct: StructField{ SubfieldInt: 0, SubfieldString: "string", }, - FieldArray: []string{"string"}, - FieldMap: map[string]string{ + Array: []string{"string"}, + Map: map[string]string{ "key": "value", }, }, diff --git a/pkg/disruptors/commads.go b/pkg/disruptors/commads.go index add27303..7dcde9a2 100644 --- a/pkg/disruptors/commads.go +++ b/pkg/disruptors/commads.go @@ -4,7 +4,9 @@ import ( "fmt" "time" + "github.com/grafana/xk6-disruptor/pkg/types/intstr" "github.com/grafana/xk6-disruptor/pkg/utils" + corev1 "k8s.io/api/core/v1" ) @@ -22,8 +24,8 @@ func buildGrpcFaultCmd( } // TODO: make port mandatory - if fault.Port != 0 { - cmd = append(cmd, "-t", fmt.Sprint(fault.Port)) + if fault.Port != intstr.NullValue { + cmd = append(cmd, "-t", fault.Port.Str()) } if fault.AverageDelay > 0 { @@ -75,8 +77,8 @@ func buildHTTPFaultCmd( } // TODO: make port mandatory - if fault.Port != 0 { - cmd = append(cmd, "-t", fmt.Sprint(fault.Port)) + if fault.Port != intstr.NullValue { + cmd = append(cmd, "-t", fault.Port.Str()) } if fault.AverageDelay > 0 { @@ -129,13 +131,17 @@ type PodHTTPFaultCommand struct { // Commands return the command for injecting a HttpFault in a Pod func (c PodHTTPFaultCommand) Commands(pod corev1.Pod) (VisitCommands, error) { - if !utils.HasPort(pod, c.fault.Port) { - return VisitCommands{}, fmt.Errorf("pod %q does not expose port %d", pod.Name, c.fault.Port) + if utils.HasHostNetwork(pod) { + return VisitCommands{}, fmt.Errorf("fault cannot be safely injected because pod %q uses hostNetwork", pod.Name) } - if utils.HasHostNetwork(pod) { - return VisitCommands{}, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name) + // find the container port for fault injection + port, err := utils.FindPort(c.fault.Port, pod) + if err != nil { + return VisitCommands{}, err } + podFault := c.fault + podFault.Port = port targetAddress, err := utils.PodIP(pod) if err != nil { @@ -143,7 +149,7 @@ func (c PodHTTPFaultCommand) Commands(pod corev1.Pod) (VisitCommands, error) { } return VisitCommands{ - Exec: buildHTTPFaultCmd(targetAddress, c.fault, c.duration, c.options), + Exec: buildHTTPFaultCmd(targetAddress, podFault, c.duration, c.options), Cleanup: buildCleanupCmd(), }, nil } @@ -157,76 +163,15 @@ type PodGrpcFaultCommand struct { // Commands return the command for injecting a GrpcFault in a Pod func (c PodGrpcFaultCommand) Commands(pod corev1.Pod) (VisitCommands, error) { - if !utils.HasPort(pod, c.fault.Port) { - return VisitCommands{}, fmt.Errorf("pod %q does not expose port %d", pod.Name, c.fault.Port) - } - - targetAddress, err := utils.PodIP(pod) - if err != nil { - return VisitCommands{}, err - } - - return VisitCommands{ - Exec: buildGrpcFaultCmd(targetAddress, c.fault, c.duration, c.options), - Cleanup: buildCleanupCmd(), - }, nil -} - -// ServiceHTTPFaultCommand implements the PodVisitCommands interface for injecting HttpFaults in a Pod -type ServiceHTTPFaultCommand struct { - service corev1.Service - fault HTTPFault - duration time.Duration - options HTTPDisruptionOptions -} - -// Commands return the command for injecting a HttpFault in a Service -func (c ServiceHTTPFaultCommand) Commands(pod corev1.Pod) (VisitCommands, error) { - port, err := utils.MapPort(c.service, c.fault.Port, pod) - if err != nil { - return VisitCommands{}, err - } - if utils.HasHostNetwork(pod) { - return VisitCommands{}, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name) + return VisitCommands{}, fmt.Errorf("fault cannot be safely injected because pod %q uses hostNetwork", pod.Name) } - // copy fault to change target port for the pod - podFault := c.fault - podFault.Port = port - - targetAddress, err := utils.PodIP(pod) - if err != nil { - return VisitCommands{}, err - } - - return VisitCommands{ - Exec: buildHTTPFaultCmd(targetAddress, podFault, c.duration, c.options), - Cleanup: buildCleanupCmd(), - }, nil -} - -// Cleanup defines the command to execute for cleaning up if command execution fails -func (c ServiceHTTPFaultCommand) Cleanup(_ corev1.Pod) []string { - return buildCleanupCmd() -} - -// ServiceGrpcFaultCommand implements the PodVisitCommands interface for injecting a -// GrpcFault in a Service -type ServiceGrpcFaultCommand struct { - service corev1.Service - fault GrpcFault - duration time.Duration - options GrpcDisruptionOptions -} - -// Commands return the VisitCommands for injecting a GrpcFault in a Service -func (c ServiceGrpcFaultCommand) Commands(pod corev1.Pod) (VisitCommands, error) { - port, err := utils.MapPort(c.service, c.fault.Port, pod) + // find the container port for fault injection + port, err := utils.FindPort(c.fault.Port, pod) if err != nil { return VisitCommands{}, err } - podFault := c.fault podFault.Port = port @@ -236,7 +181,7 @@ func (c ServiceGrpcFaultCommand) Commands(pod corev1.Pod) (VisitCommands, error) } return VisitCommands{ - Exec: buildGrpcFaultCmd(targetAddress, podFault, c.duration, c.options), + Exec: buildGrpcFaultCmd(targetAddress, c.fault, c.duration, c.options), Cleanup: buildCleanupCmd(), }, nil } diff --git a/pkg/disruptors/commads_test.go b/pkg/disruptors/commads_test.go index 850b49d5..af996506 100644 --- a/pkg/disruptors/commads_test.go +++ b/pkg/disruptors/commads_test.go @@ -7,6 +7,7 @@ import ( "github.com/grafana/xk6-disruptor/pkg/testutils/command" "github.com/grafana/xk6-disruptor/pkg/testutils/kubernetes/builders" + "github.com/grafana/xk6-disruptor/pkg/types/intstr" corev1 "k8s.io/api/core/v1" ) @@ -44,7 +45,7 @@ func Test_PodHTTPFaultCommandGenerator(t *testing.T) { fault: HTTPFault{ ErrorRate: 0.1, ErrorCode: 500, - Port: 80, + Port: intstr.FromInt32(80), }, opts: HTTPDisruptionOptions{}, duration: 60 * time.Second, @@ -64,7 +65,7 @@ func Test_PodHTTPFaultCommandGenerator(t *testing.T) { ErrorRate: 0.1, ErrorCode: 500, ErrorBody: "{\"error\": 500}", - Port: 80, + Port: intstr.FromInt32(80), }, opts: HTTPDisruptionOptions{}, duration: 60 * time.Second, @@ -77,7 +78,7 @@ func Test_PodHTTPFaultCommandGenerator(t *testing.T) { cmdError: nil, fault: HTTPFault{ AverageDelay: 100 * time.Millisecond, - Port: 80, + Port: intstr.FromInt32(80), }, opts: HTTPDisruptionOptions{}, duration: 60 * time.Second, @@ -90,7 +91,7 @@ func Test_PodHTTPFaultCommandGenerator(t *testing.T) { cmdError: nil, fault: HTTPFault{ Exclude: "/path1,/path2", - Port: 80, + Port: intstr.FromInt32(80), }, opts: HTTPDisruptionOptions{}, duration: 60 * time.Second, @@ -101,7 +102,7 @@ func Test_PodHTTPFaultCommandGenerator(t *testing.T) { expectedCmd: "", expectError: true, fault: HTTPFault{ - Port: 8080, + Port: intstr.FromInt32(8080), }, opts: HTTPDisruptionOptions{}, duration: 60, @@ -120,7 +121,7 @@ func Test_PodHTTPFaultCommandGenerator(t *testing.T) { expectedCmd: "", expectError: true, fault: HTTPFault{ - Port: 80, + Port: intstr.FromInt32(80), }, opts: HTTPDisruptionOptions{}, duration: 60, @@ -141,7 +142,7 @@ func Test_PodHTTPFaultCommandGenerator(t *testing.T) { expectedCmd: "", expectError: true, fault: HTTPFault{ - Port: 80, + Port: intstr.FromInt32(80), }, opts: HTTPDisruptionOptions{}, duration: 60, @@ -196,7 +197,7 @@ func Test_PodGrpcPFaultCommandGenerator(t *testing.T) { fault: GrpcFault{ ErrorRate: 0.1, StatusCode: 14, - Port: 3000, + Port: intstr.FromInt32(3000), }, opts: GrpcDisruptionOptions{}, duration: 60 * time.Second, @@ -211,7 +212,7 @@ func Test_PodGrpcPFaultCommandGenerator(t *testing.T) { ErrorRate: 0.1, StatusCode: 14, StatusMessage: "internal error", - Port: 3000, + Port: intstr.FromInt32(3000), }, opts: GrpcDisruptionOptions{}, duration: 60 * time.Second, @@ -224,7 +225,7 @@ func Test_PodGrpcPFaultCommandGenerator(t *testing.T) { target: buildPodWithPort("my-app-pod", "grpc", 3000), fault: GrpcFault{ AverageDelay: 100 * time.Millisecond, - Port: 3000, + Port: intstr.FromInt32(3000), }, opts: GrpcDisruptionOptions{}, duration: 60 * time.Second, @@ -237,7 +238,7 @@ func Test_PodGrpcPFaultCommandGenerator(t *testing.T) { target: buildPodWithPort("my-app-pod", "grpc", 3000), fault: GrpcFault{ Exclude: "service1,service2", - Port: 3000, + Port: intstr.FromInt32(3000), }, opts: GrpcDisruptionOptions{}, duration: 60 * time.Second, @@ -249,7 +250,7 @@ func Test_PodGrpcPFaultCommandGenerator(t *testing.T) { title: "Container port not found", target: buildPodWithPort("my-app-pod", "grpc", 3000), expectError: true, - fault: GrpcFault{Port: 8080}, + fault: GrpcFault{Port: intstr.FromInt32(8080)}, opts: GrpcDisruptionOptions{}, duration: 60, }, diff --git a/pkg/disruptors/pod.go b/pkg/disruptors/pod.go index 53b4be5b..b4e8a943 100644 --- a/pkg/disruptors/pod.go +++ b/pkg/disruptors/pod.go @@ -11,17 +11,18 @@ import ( "github.com/grafana/xk6-disruptor/pkg/kubernetes" "github.com/grafana/xk6-disruptor/pkg/kubernetes/helpers" + "github.com/grafana/xk6-disruptor/pkg/types/intstr" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -// DefaultTargetPort defines default target port if not specified in Fault -const DefaultTargetPort = 80 - // ErrSelectorNoPods is returned by NewPodDisruptor when the selector passed to it does not match any pod in the // cluster. var ErrSelectorNoPods = errors.New("no pods found matching selector") +// DefaultTargetPort defines the default value for a target HTTP +var DefaultTargetPort = intstr.FromInt32(80) //nolint:gochecknoglobals + // PodDisruptor defines the types of faults that can be injected in a Pod type PodDisruptor interface { Disruptor @@ -155,8 +156,9 @@ func (d *podDisruptor) InjectHTTPFaults( duration time.Duration, options HTTPDisruptionOptions, ) error { + // Handle default port mapping // TODO: make port mandatory instead of using a default - if fault.Port == 0 { + if fault.Port.IsNull() || fault.Port.IsZero() { fault.Port = DefaultTargetPort } diff --git a/pkg/disruptors/protocol.go b/pkg/disruptors/protocol.go index f301a6fa..cc387efb 100644 --- a/pkg/disruptors/protocol.go +++ b/pkg/disruptors/protocol.go @@ -3,6 +3,8 @@ package disruptors import ( "context" "time" + + "github.com/grafana/xk6-disruptor/pkg/types/intstr" ) // ProtocolFaultInjector defines the methods for injecting protocol faults @@ -30,7 +32,7 @@ type GrpcDisruptionOptions struct { // HTTPFault specifies a fault to be injected in http requests type HTTPFault struct { // port the disruptions will be applied to - Port uint + Port intstr.IntOrString // Average delay introduced to requests AverageDelay time.Duration `js:"averageDelay"` // Variation in the delay (with respect of the average delay) @@ -48,7 +50,7 @@ type HTTPFault struct { // GrpcFault specifies a fault to be injected in grpc requests type GrpcFault struct { // port the disruptions will be applied to - Port uint + Port intstr.IntOrString // Average delay introduced to requests AverageDelay time.Duration `js:"averageDelay"` // Variation in the delay (with respect of the average delay) diff --git a/pkg/disruptors/service.go b/pkg/disruptors/service.go index d2938e2e..47bee3fe 100644 --- a/pkg/disruptors/service.go +++ b/pkg/disruptors/service.go @@ -8,6 +8,7 @@ import ( "github.com/grafana/xk6-disruptor/pkg/kubernetes" "github.com/grafana/xk6-disruptor/pkg/kubernetes/helpers" + "github.com/grafana/xk6-disruptor/pkg/utils" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -83,9 +84,16 @@ func (d *serviceDisruptor) InjectHTTPFaults( duration time.Duration, options HTTPDisruptionOptions, ) error { - command := ServiceHTTPFaultCommand{ - service: d.service, - fault: fault, + // Map service port to a target pod port + port, err := utils.GetTargetPort(d.service, fault.Port) + if err != nil { + return err + } + podFault := fault + podFault.Port = port + + command := PodHTTPFaultCommand{ + fault: podFault, duration: duration, options: options, } @@ -105,8 +113,15 @@ func (d *serviceDisruptor) InjectGrpcFaults( duration time.Duration, options GrpcDisruptionOptions, ) error { - command := ServiceGrpcFaultCommand{ - service: d.service, + // Map service port to a target pod port + port, err := utils.GetTargetPort(d.service, fault.Port) + if err != nil { + return err + } + podFault := fault + podFault.Port = port + + command := PodGrpcFaultCommand{ fault: fault, duration: duration, options: options, diff --git a/pkg/types/intstr/intstr.go b/pkg/types/intstr/intstr.go new file mode 100644 index 00000000..efb37215 --- /dev/null +++ b/pkg/types/intstr/intstr.go @@ -0,0 +1,73 @@ +// Package intstr implements a custom type for handling values that can be either a string or an int32 +package intstr + +import ( + "fmt" + "strconv" +) + +// ValueType defines the type of a IntOrString value +type ValueType int + +const ( + // ValueTypeInt is a IntOrString that represents an integer value + ValueTypeInt ValueType = iota + // ValueTypeString is a IntOrString that represents a string value + ValueTypeString +) + +// IntOrString holds a value that can be either a string or a int +type IntOrString string + +// NullValue is an empty IntOrString value +const NullValue = IntOrString("") + +// Type returns the ValueType of a IntOrString value +func (value IntOrString) Type() ValueType { + if _, err := strconv.Atoi(string(value)); err == nil { + return ValueTypeInt + } + return ValueTypeString +} + +// IsInt returns true if the value is an integer +func (value IntOrString) IsInt() bool { + return value.Type() == ValueTypeInt +} + +// IsZero checks if the IntOrString value is an integer 0 +func (value IntOrString) IsZero() bool { + return value.IsInt() && value.Int32() == 0 +} + +// IsNull checks if the IntOrString value is the Int NullValue +func (value IntOrString) IsNull() bool { + return value == NullValue +} + +// Int32 returns the value of the IntOrString as an int32. +// If the current value is not an string, 0 is returned +func (value IntOrString) Int32() int32 { + int64Value, err := strconv.ParseInt(string(value), 10, 32) + if err != nil { + panic(fmt.Errorf("invalid int32 value %s", value)) + } + + return int32(int64Value) +} + +// Str returns the value of the IntOrString as a string. +func (value IntOrString) Str() string { + return string(value) +} + +// FromInt32 return a IntOrString from a int32 +func FromInt32(value int32) IntOrString { + strValue := fmt.Sprintf("%d", value) + return IntOrString(strValue) +} + +// FromString return a IntOrString from a string +func FromString(value string) IntOrString { + return IntOrString(value) +} diff --git a/pkg/types/intstr/intstr_test.go b/pkg/types/intstr/intstr_test.go new file mode 100644 index 00000000..bbd0b39e --- /dev/null +++ b/pkg/types/intstr/intstr_test.go @@ -0,0 +1,131 @@ +package intstr + +import ( + "testing" +) + +func Test_IntStrFrom(t *testing.T) { + t.Parallel() + + testCases := []struct { + title string + value interface{} + function func(interface{}) interface{} + expected interface{} + shouldPanic bool + }{ + { + title: "fromString string value", + value: "uno", + function: func(value interface{}) interface{} { + strValue, _ := value.(string) + return FromString(strValue) + }, + expected: IntOrString("uno"), + }, + { + title: "fromString numeric value", + value: "1", + function: func(value interface{}) interface{} { + strValue, _ := value.(string) + return FromString(strValue) + }, + expected: IntOrString("1"), + }, + { + title: "fromInt32", + value: int32(1), + function: func(value interface{}) interface{} { + int32Value, _ := value.(int32) + return FromInt32(int32Value) + }, + expected: IntOrString("1"), + }, + { + title: "Int32", + value: IntOrString("1"), + function: func(value interface{}) interface{} { + intOrStrValue, _ := value.(IntOrString) + return intOrStrValue.Int32() + }, + expected: int32(1), + }, + { + title: "Int32 overflow", + value: IntOrString("9223372036854775807"), + function: func(value interface{}) interface{} { + intOrStrValue, _ := value.(IntOrString) + return intOrStrValue.Int32() + }, + expected: nil, + shouldPanic: true, + }, + { + title: "Int32 form string value", + value: IntOrString("uno"), + function: func(value interface{}) interface{} { + intOrStrValue, _ := value.(IntOrString) + return intOrStrValue.Int32() + }, + expected: nil, + shouldPanic: true, + }, + { + title: "Int32 form nul value", + value: IntOrString(""), + function: func(value interface{}) interface{} { + intOrStrValue, _ := value.(IntOrString) + return intOrStrValue.Int32() + }, + expected: nil, + shouldPanic: true, + }, + { + title: "String form string", + value: IntOrString("uno"), + function: func(value interface{}) interface{} { + intOrStrValue, _ := value.(IntOrString) + return intOrStrValue.Str() + }, + expected: "uno", + shouldPanic: false, + }, + { + title: "String form nul value", + value: IntOrString(""), + function: func(value interface{}) interface{} { + intOrStrValue, _ := value.(IntOrString) + return intOrStrValue.Str() + }, + expected: "", + shouldPanic: false, + }, + } + + for _, tc := range testCases { + tc := tc + + t.Run(tc.title, func(t *testing.T) { + t.Parallel() + + defer func() { + panicked := recover() + if panicked != nil && !tc.shouldPanic { + t.Fatalf("panicked %v", panicked) + } + }() + + // if this conversion panics, the defer function checks if this is normal + value := tc.function(tc.value) + + if tc.shouldPanic { + t.Fatal("should had panicked") + } + + // if conversion should panic expected value is undefined, so don't assert it + if value != tc.expected { + t.Fatalf("expected %s got %s", tc.expected, value) + } + }) + } +} diff --git a/pkg/utils/kubernetes.go b/pkg/utils/kubernetes.go index 806ba867..c66fe405 100644 --- a/pkg/utils/kubernetes.go +++ b/pkg/utils/kubernetes.go @@ -3,70 +3,53 @@ package utils import ( "fmt" + "github.com/grafana/xk6-disruptor/pkg/types/intstr" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/intstr" ) -// getTargetPort returns the ServicePort object that corresponds to the given port number -// if the given port is 0, it will return the first port or error if more than one port is defined -func getTargetPort(service corev1.Service, svcPort uint) (corev1.ServicePort, error) { - ports := service.Spec.Ports - if svcPort != 0 { - for _, p := range ports { - if uint(p.Port) == svcPort { - return p, nil - } +// GetTargetPort returns the target port for the given service port +func GetTargetPort(service corev1.Service, svcPort intstr.IntOrString) (intstr.IntOrString, error) { + // Handle default port mapping + // TODO: make port required + if svcPort.IsNull() || svcPort.IsZero() { + if len(service.Spec.Ports) > 1 { + return intstr.NullValue, fmt.Errorf("no port selected and service exposes more than one service") } - return corev1.ServicePort{}, fmt.Errorf("the service does not expose the given svcPort: %d", svcPort) + return intstr.IntOrString(service.Spec.Ports[0].TargetPort.String()), nil } - if len(ports) > 1 { - return corev1.ServicePort{}, fmt.Errorf("service exposes multiple ports. Port option must be defined") + for _, p := range service.Spec.Ports { + if svcPort.Str() == p.Name || (svcPort.IsInt() && svcPort.Int32() == p.Port) { + return intstr.IntOrString(p.TargetPort.String()), nil + } } - return ports[0], nil + return intstr.NullValue, fmt.Errorf("the service does not expose the given svcPort: %s", svcPort) } -// MapPort returns the port in the Pod that maps to the given service port -func MapPort(service corev1.Service, port uint, pod corev1.Pod) (uint, error) { - svcPort, err := getTargetPort(service, port) - if err != nil { - return 0, err - } - - switch svcPort.TargetPort.Type { - case intstr.String: +// FindPort returns the port in the Pod that maps to the given port by port number or name +func FindPort(port intstr.IntOrString, pod corev1.Pod) (intstr.IntOrString, error) { + switch port.Type() { + case intstr.ValueTypeString: for _, container := range pod.Spec.Containers { - for _, port := range container.Ports { - if port.Name == svcPort.TargetPort.StrVal { - return uint(port.ContainerPort), nil + for _, p := range container.Ports { + if p.Name == port.Str() { + return intstr.FromInt32(p.ContainerPort), nil } } } - case intstr.Int: + case intstr.ValueTypeInt: for _, container := range pod.Spec.Containers { - for _, port := range container.Ports { - if port.ContainerPort == svcPort.TargetPort.IntVal { - return uint(port.ContainerPort), nil + for _, p := range container.Ports { + if p.ContainerPort == port.Int32() { + return intstr.FromInt32(p.ContainerPort), nil } } } } - return 0, fmt.Errorf("pod %q does match port %d for service %q", pod.Name, port, service.Name) -} - -// HasPort verifies if a pods listen to the given port -func HasPort(pod corev1.Pod, port uint) bool { - for _, container := range pod.Spec.Containers { - for _, containerPort := range container.Ports { - if uint(containerPort.ContainerPort) == port { - return true - } - } - } - return false + return intstr.NullValue, fmt.Errorf("pod %q does exports port %q", pod.Name, port.Str()) } // HasHostNetwork returns whether a pod has HostNetwork enabled, i.e. it shares the host's network namespace. diff --git a/pkg/utils/kubernetes_test.go b/pkg/utils/kubernetes_test.go index 8afaae19..a796f60d 100644 --- a/pkg/utils/kubernetes_test.go +++ b/pkg/utils/kubernetes_test.go @@ -5,9 +5,10 @@ import ( corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/intstr" + k8sintstr "k8s.io/apimachinery/pkg/util/intstr" "github.com/grafana/xk6-disruptor/pkg/testutils/kubernetes/builders" + "github.com/grafana/xk6-disruptor/pkg/types/intstr" ) func buildPodWithPort(name string, portName string, port int32) corev1.Pod { @@ -22,7 +23,7 @@ func buildPodWithPort(name string, portName string, port int32) corev1.Pod { return pod } -func buildServicWithPort(name string, portName string, port int32, target intstr.IntOrString) corev1.Service { +func buildServicWithPort(name string, portName string, port int32, target k8sintstr.IntOrString) corev1.Service { return builders.NewServiceBuilder(name). WithNamespace("test-ns"). WithSelectorLabel("app", "test"). @@ -30,69 +31,43 @@ func buildServicWithPort(name string, portName string, port int32, target intstr Build() } -func Test_ServicePortMapping(t *testing.T) { +func Test_FindPort(t *testing.T) { t.Parallel() testCases := []struct { title string - serviceName string - namespace string - service corev1.Service pod corev1.Pod - endpoints *corev1.Endpoints - port uint + port intstr.IntOrString expectError bool - expected uint + expected intstr.IntOrString }{ { - title: "invalid Port option", - serviceName: "test-svc", - namespace: "test-ns", - service: buildServicWithPort("test-svc", "http", 8080, intstr.FromInt(8080)), - pod: buildPodWithPort("pod-1", "http", 80), - port: 80, - expectError: true, - expected: 0, - }, - { - title: "Numeric target port specified", - serviceName: "test-svc", - namespace: "test-ns", - service: buildServicWithPort("test-svc", "http", 8080, intstr.FromInt(80)), + title: "Numeric port", pod: buildPodWithPort("pod-1", "http", 80), - port: 8080, + port: intstr.FromInt32(80), expectError: false, - expected: 80, + expected: intstr.FromInt32(80), }, { - title: "Named target port", - serviceName: "test-svc", - namespace: "test-ns", - service: buildServicWithPort("test-svc", "http", 8080, intstr.FromString("http")), + title: "Numeric port not exposed", pod: buildPodWithPort("pod-1", "http", 80), - port: 8080, - expectError: false, - expected: 80, + port: intstr.FromInt32(8080), + expectError: true, + expected: intstr.NullValue, }, { - title: "Default port mapping", - serviceName: "test-svc", - namespace: "test-ns", - service: buildServicWithPort("test-svc", "http", 8080, intstr.FromInt(80)), + title: "Named port", pod: buildPodWithPort("pod-1", "http", 80), - port: 0, + port: intstr.FromString("http"), expectError: false, - expected: 80, + expected: intstr.FromInt32(80), }, { - title: "No target for mapping", - serviceName: "test-svc", - namespace: "test-ns", - service: buildServicWithPort("test-svc", "http", 8080, intstr.FromInt(80)), - pod: buildPodWithPort("pod-1", "http", 8080), - port: 8080, + title: "Named port not exposed port", + pod: buildPodWithPort("pod-1", "http", 80), + port: intstr.FromString("http2"), expectError: true, - expected: 0, + expected: intstr.NullValue, }, } @@ -102,7 +77,7 @@ func Test_ServicePortMapping(t *testing.T) { t.Run(tc.title, func(t *testing.T) { t.Parallel() - port, err := MapPort(tc.service, tc.port, tc.pod) + port, err := FindPort(tc.port, tc.pod) if !tc.expectError && err != nil { t.Errorf(" failed: %v", err) return @@ -118,53 +93,91 @@ func Test_ServicePortMapping(t *testing.T) { } if tc.expected != port { - t.Errorf("expected %d got %d", tc.expected, port) + t.Errorf("expected %q got %q", tc.expected.Str(), port.Str()) return } }) } } -func Test_ValidatePort(t *testing.T) { +func Test_GetTargetPort(t *testing.T) { t.Parallel() testCases := []struct { - title string - namespace string - pod corev1.Pod - targetPort uint - expect bool + title string + + service corev1.Service + endpoints *corev1.Endpoints + port intstr.IntOrString + expectError bool + expected intstr.IntOrString }{ { - title: "Pods listen to the specified port", - namespace: "testns", - pod: builders.NewPodBuilder("test-pod-1"). - WithContainer(corev1.Container{Ports: []corev1.ContainerPort{{ContainerPort: 8080}}}). - WithNamespace("testns"). - Build(), - targetPort: 8080, - expect: true, + title: "Numeric service port specified", + service: buildServicWithPort("test-svc", "http", 8080, k8sintstr.FromInt(80)), + port: intstr.FromInt32(8080), + expectError: false, + expected: intstr.FromInt32(80), + }, + { + title: "Named service port", + service: buildServicWithPort("test-svc", "http", 8080, k8sintstr.FromInt(80)), + port: intstr.FromString("http"), + expectError: false, + expected: intstr.FromInt32(80), + }, + { + title: "Named target port", + service: buildServicWithPort("test-svc", "http", 8080, k8sintstr.FromString("http")), + port: intstr.FromInt32(8080), + expectError: false, + expected: intstr.FromString("http"), + }, + { + title: "Default port mapping", + service: buildServicWithPort("test-svc", "http", 8080, k8sintstr.FromInt(80)), + port: intstr.FromInt32(0), + expectError: false, + expected: intstr.FromInt32(80), + }, + { + title: "Numeric port not exposed", + service: buildServicWithPort("test-svc", "http", 80, k8sintstr.FromInt(80)), + port: intstr.FromInt32(8080), + expectError: true, }, { - title: "Pod doesn't listen to the specified port", - namespace: "testns", - pod: builders.NewPodBuilder("test-pod-2"). - WithContainer(corev1.Container{Ports: []corev1.ContainerPort{{ContainerPort: 9090}}}). - WithNamespace("testns"). - Build(), - targetPort: 8080, - expect: false, + title: "Named port not exposed", + service: buildServicWithPort("test-svc", "http", 80, k8sintstr.FromString("http")), + port: intstr.FromString("http2"), + expectError: true, }, } for _, tc := range testCases { tc := tc + t.Run(tc.title, func(t *testing.T) { t.Parallel() - validation := HasPort(tc.pod, tc.targetPort) - if validation != tc.expect { - t.Errorf("expected %t got %t", tc.expect, validation) + port, err := GetTargetPort(tc.service, tc.port) + if !tc.expectError && err != nil { + t.Errorf(" failed: %v", err) + return + } + + if tc.expectError && err == nil { + t.Errorf("should had failed") + return + } + + if tc.expectError && err != nil { + return + } + + if tc.expected != port { + t.Errorf("expected %q got %q", tc.expected.Str(), port.Str()) + return } }) }