From f700c2dba9173bf5bcc91e9b75a77213a2d5778e Mon Sep 17 00:00:00 2001 From: Michael Zalimeni Date: Thu, 21 Sep 2023 15:53:23 -0400 Subject: [PATCH] [NET-5674] v2: Conditional target port when numeric in k8s (#2978) v2: Conditional target port when numeric in k8s When choosing a Consul service TargetPort (name) in the v2 Endpoints Controller, attempt to find the best match among endpoint container ports when the K8s service target port value is a number. This bridges an existing gap between Consul (which always expects a Workload port to be named), and K8s (where container ports need not be named, and names are ignored when the K8s service targets by number). This change will be mostly reverted in a future release once Consul's v2 data model allows for target ports to be a name or number in alignment w/ K8s behavior. --- .../bases/v2-multiport-app/deployment.yaml | 2 + .../bases/v2-multiport-app/service.yaml | 4 +- control-plane/connect-inject/common/common.go | 3 + .../connect-inject/common/common_test.go | 11 + .../endpointsv2/endpoints_controller.go | 247 +++++++-- .../endpointsv2/endpoints_controller_test.go | 499 ++++++++++++++++-- 6 files changed, 686 insertions(+), 80 deletions(-) diff --git a/acceptance/tests/fixtures/bases/v2-multiport-app/deployment.yaml b/acceptance/tests/fixtures/bases/v2-multiport-app/deployment.yaml index 0aefa14d50..77c7de3bc9 100644 --- a/acceptance/tests/fixtures/bases/v2-multiport-app/deployment.yaml +++ b/acceptance/tests/fixtures/bases/v2-multiport-app/deployment.yaml @@ -56,6 +56,8 @@ spec: - -listen=:9090 ports: - containerPort: 9090 + # This name is meant to be used alongside the _numeric_ K8s service target port + # to verify that we can still route traffic to the named port when there's a mismatch. name: admin # TODO: (v2/nitya) add these probes back when expose paths and L7 are supported. # livenessProbe: diff --git a/acceptance/tests/fixtures/bases/v2-multiport-app/service.yaml b/acceptance/tests/fixtures/bases/v2-multiport-app/service.yaml index 940a9a3a17..fe47663c3d 100644 --- a/acceptance/tests/fixtures/bases/v2-multiport-app/service.yaml +++ b/acceptance/tests/fixtures/bases/v2-multiport-app/service.yaml @@ -11,8 +11,8 @@ spec: ports: - name: web port: 8080 - # TODO: (v2/nitya) ensure this works with numeric target ports also once support for that is merged. targetPort: web - name: admin port: 9090 - targetPort: admin + # Test with a mix of named and numeric target ports. + targetPort: 9090 diff --git a/control-plane/connect-inject/common/common.go b/control-plane/connect-inject/common/common.go index 5ad336f327..402f17bbb3 100644 --- a/control-plane/connect-inject/common/common.go +++ b/control-plane/connect-inject/common/common.go @@ -184,6 +184,9 @@ func PortValueFromIntOrString(pod corev1.Pod, port intstr.IntOrString) (uint32, // HasBeenMeshInjected checks the value of the status annotation and returns true if the Pod has been injected. // Does not apply to V1 pods, which use a different key (`constants.KeyInjectStatus`). func HasBeenMeshInjected(pod corev1.Pod) bool { + if pod.Annotations == nil { + return false + } if anno, ok := pod.Annotations[constants.KeyMeshInjectStatus]; ok && anno == constants.Injected { return true } diff --git a/control-plane/connect-inject/common/common_test.go b/control-plane/connect-inject/common/common_test.go index 3bf19c72d4..86618088f0 100644 --- a/control-plane/connect-inject/common/common_test.go +++ b/control-plane/connect-inject/common/common_test.go @@ -457,6 +457,17 @@ func TestHasBeenMeshInjected(t *testing.T) { }, expected: false, }, + { + name: "Pod with nil annotations", + pod: corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: metav1.NamespaceDefault, + Labels: map[string]string{}, + }, + }, + expected: false, + }, } for _, tt := range cases { diff --git a/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller.go b/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller.go index 9b7cfa8971..b652aa3b6b 100644 --- a/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller.go +++ b/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller.go @@ -15,6 +15,7 @@ import ( k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -60,7 +61,6 @@ func (r *Controller) SetupWithManager(mgr ctrl.Manager) error { // Reconcile reads the state of an Endpoints object for a Kubernetes Service and reconciles Consul services which // correspond to the Kubernetes Service. These events are driven by changes to the Pods backing the Kube service. func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - var errs error var endpoints corev1.Endpoints var service corev1.Service @@ -96,38 +96,74 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu } r.Log.Info("retrieved Service", "name", req.Name, "ns", req.Namespace) - workloadSelector, err := r.getWorkloadSelectorFromEndpoints(ctx, &ClientPodFetcher{client: r.Client}, &endpoints) + consulSvc, err := r.getConsulService(ctx, &ClientPodFetcher{client: r.Client}, service, endpoints) if err != nil { - errs = multierror.Append(errs, err) + r.Log.Error(err, "failed to build Consul service resource", "name", req.Name, "ns", req.Namespace) + return ctrl.Result{}, err } - // If we have at least one mesh-injected pod targeted by the service, register it in Consul. + // If we don't have at least one mesh-injected pod targeted by the service, do not register the service. //TODO: Register service with mesh port added if global flag for inject is true, // even if Endpoints are empty or have no mesh pod, iff. the service has a selector. // This should ensure that we don't target kube or consul (system) services. - if workloadSelector != nil { - //TODO: Maybe check service-enable label here on service/deployments/other pod owners - if err = r.registerService(ctx, resourceClient, service, workloadSelector); err != nil { - // We could be racing with the namespace controller. - // Requeue (which includes backoff) to try again. - if common.ConsulNamespaceIsNotFound(err) { - r.Log.Info("Consul namespace not found; re-queueing request", - "service", service.GetName(), "ns", req.Namespace, - "consul-ns", r.getConsulNamespace(req.Namespace), "err", err.Error()) - return ctrl.Result{Requeue: true}, nil - } - errs = multierror.Append(errs, err) + if consulSvc.Workloads == nil { + return ctrl.Result{}, nil + } + + // Register the service in Consul. + //TODO: Maybe check service-enable label here on service/deployments/other pod owners + if err = r.registerService(ctx, resourceClient, service, consulSvc); err != nil { + // We could be racing with the namespace controller. + // Requeue (which includes backoff) to try again. + if common.ConsulNamespaceIsNotFound(err) { + r.Log.Info("Consul namespace not found; re-queueing request", + "service", service.GetName(), "ns", req.Namespace, + "consul-ns", r.getConsulNamespace(req.Namespace), "err", err.Error()) + return ctrl.Result{Requeue: true}, nil } + return ctrl.Result{}, err } - return ctrl.Result{}, errs + + return ctrl.Result{}, nil } -// getWorkloadSelectorFromEndpoints calculates a Consul service WorkloadSelector from Endpoints based on pod names and -// owners. -func (r *Controller) getWorkloadSelectorFromEndpoints(ctx context.Context, pf PodFetcher, endpoints *corev1.Endpoints) (*pbcatalog.WorkloadSelector, error) { - podPrefixes := make(map[string]any) - podExactNames := make(map[string]any) +func (r *Controller) getConsulService(ctx context.Context, pf PodFetcher, service corev1.Service, endpoints corev1.Endpoints) (*pbcatalog.Service, error) { + prefixedPods, exactNamePods, err := r.getWorkloadDataFromEndpoints(ctx, pf, endpoints) + if err != nil { + return nil, err + } + + // Create Consul Service resource to be registered. + return &pbcatalog.Service{ + Workloads: getWorkloadSelector(prefixedPods, exactNamePods), + Ports: getServicePorts(service, prefixedPods, exactNamePods), + VirtualIps: r.getServiceVIPs(service), + }, nil +} + +type podSetData struct { + podCount int + samplePod *corev1.Pod +} + +// selectorPodData represents data for each set of pods represented by a WorkloadSelector value. +// The data may be for several pods (prefix) or a single pod (exact name). +// This is used for choosing the ideal Consul service TargetPort value when the K8s service target port is numeric. +type selectorPodData map[string]*podSetData + +// getWorkloadDataFromEndpoints accumulates data to supply the Consul service WorkloadSelector and TargetPort from +// Endpoints based on pod names and owners. +func (r *Controller) getWorkloadDataFromEndpoints(ctx context.Context, pf PodFetcher, endpoints corev1.Endpoints) (selectorPodData, selectorPodData, error) { var errs error + + // Determine the workload selector by fetching as many pods as needed to accumulate prefixes + // and exact pod name matches. + // + // If the K8s service target port is numeric, we also use this information to determine the + // appropriate Consul target port value. + prefixedPods := make(selectorPodData) + exactNamePods := make(selectorPodData) + ignoredPodPrefixes := make(map[string]any) for address := range allAddresses(endpoints.Subsets) { if address.TargetRef != nil && address.TargetRef.Kind == "Pod" { podName := types.NamespacedName{Name: address.TargetRef.Name, Namespace: endpoints.Namespace} @@ -137,7 +173,14 @@ func (r *Controller) getWorkloadSelectorFromEndpoints(ctx context.Context, pf Po // If not, fetch the owner. If the owner has a unique prefix, add it to known prefixes. // If not, add the pod name to exact name matches. maybePodOwnerPrefix := getOwnerPrefixFromPodName(podName.Name) - if _, ok := podPrefixes[maybePodOwnerPrefix]; !ok { + + // If prefix is ignored, skip pod. + if _, ok := ignoredPodPrefixes[maybePodOwnerPrefix]; ok { + continue + } + + if existingPodData, ok := prefixedPods[maybePodOwnerPrefix]; !ok { + // Fetch pod info from K8s. pod, err := pf.GetPod(ctx, podName) if err != nil { r.Log.Error(err, "failed to get pod", "name", podName.Name, "ns", endpoints.Namespace) @@ -145,22 +188,41 @@ func (r *Controller) getWorkloadSelectorFromEndpoints(ctx context.Context, pf Po continue } - // If the pod hasn't been mesh-injected, skip it, as it won't be available as a workload. - if !common.HasBeenMeshInjected(*pod) { - continue + // Store data corresponding to the new selector value, which may be an actual set or exact pod. + podData := podSetData{ + podCount: 1, + samplePod: pod, } - // Add to workload selector values. + // Add pod to workload selector values as appropriate. // Pods can appear more than once in Endpoints subsets, so we use a set for exact names as well. if prefix := getOwnerPrefixFromPod(pod); prefix != "" { - podPrefixes[prefix] = true + if common.HasBeenMeshInjected(*pod) { + // Add to the list of pods represented by this prefix. This list is used by + // `getEffectiveTargetPort` to determine the most-used target container port name if the + // k8s service target port is numeric. + prefixedPods[prefix] = &podData + } else { + // If the pod hasn't been mesh-injected, ignore it, as it won't be available as a workload. + // Remember its prefix to avoid fetching its siblings needlessly. + ignoredPodPrefixes[prefix] = true + } } else { - podExactNames[podName.Name] = true + if common.HasBeenMeshInjected(*pod) { + exactNamePods[podName.Name] = &podData + } + // If the pod hasn't been mesh-injected, ignore it, as it won't be available as a workload. + // No need to remember ignored exact pod names since we don't expect to see them twice. } + } else { + // We've seen this prefix before. + // Keep track of how many times so that we can choose a container port name if needed later. + existingPodData.podCount += 1 } } } - return getWorkloadSelector(podPrefixes, podExactNames), errs + + return prefixedPods, exactNamePods, errs } // allAddresses combines all Endpoints subset addresses to a single set. Service registration by this controller @@ -202,18 +264,13 @@ func getOwnerPrefixFromPod(pod *corev1.Pod) string { } // registerService creates a Consul service registration from the provided Kuberetes service and endpoint information. -func (r *Controller) registerService(ctx context.Context, resourceClient pbresource.ResourceServiceClient, service corev1.Service, selector *pbcatalog.WorkloadSelector) error { - consulSvc := &pbcatalog.Service{ - Workloads: selector, - Ports: getServicePorts(service), - VirtualIps: r.getServiceVIPs(service), - } +func (r *Controller) registerService(ctx context.Context, resourceClient pbresource.ResourceServiceClient, k8sService corev1.Service, consulSvc *pbcatalog.Service) error { consulSvcResource := r.getServiceResource( consulSvc, - service.Name, // Consul and Kubernetes service name will always match - r.getConsulNamespace(service.Namespace), + k8sService.Name, // Consul and Kubernetes service name will always match + r.getConsulNamespace(k8sService.Namespace), r.getConsulPartition(), - getServiceMeta(service), + getServiceMeta(k8sService), ) r.Log.Info("registering service with Consul", getLogFieldsForResource(consulSvcResource.Id)...) @@ -253,7 +310,7 @@ func getServiceID(name, namespace, partition string) *pbresource.ID { } // getServicePorts converts Kubernetes Service ports data into Consul service ports. -func getServicePorts(service corev1.Service) []*pbcatalog.ServicePort { +func getServicePorts(service corev1.Service, prefixedPods selectorPodData, exactNamePods selectorPodData) []*pbcatalog.ServicePort { ports := make([]*pbcatalog.ServicePort, 0, len(service.Spec.Ports)+1) for _, p := range service.Spec.Ports { @@ -266,10 +323,8 @@ func getServicePorts(service corev1.Service) []*pbcatalog.ServicePort { if p.Protocol == corev1.ProtocolTCP { ports = append(ports, &pbcatalog.ServicePort{ VirtualPort: uint32(p.Port), - //TODO: If the value is a number, infer the correct name value based on - // the most prevalent endpoint subset for the port (best-effot, inspect a pod). - TargetPort: p.TargetPort.String(), - Protocol: common.GetPortProtocol(p.AppProtocol), + TargetPort: getEffectiveTargetPort(p.TargetPort, prefixedPods, exactNamePods), + Protocol: common.GetPortProtocol(p.AppProtocol), }) } } @@ -285,6 +340,102 @@ func getServicePorts(service corev1.Service) []*pbcatalog.ServicePort { return ports } +func getEffectiveTargetPort(targetPort intstr.IntOrString, prefixedPods selectorPodData, exactNamePods selectorPodData) string { + // The Kubernetes service is targeting a port name; use it directly. + // The expected behavior of Kubernetes is that all included Endpoints conform and have a matching named port. + // This is the simplest path and preferred over services targeting by port number. + if targetPort.Type == intstr.String { + return targetPort.String() + } + + // The Kubernetes service is targeting a numeric port. This is more complicated for mapping to Consul: + // - Endpoints will contain _all_ selected pods, not just those with a matching declared port number. + // - Consul Workload ports always have a name, so we must determine the best name to match on. + // - There may be more than one option among the pods with named ports, including no name at all. + // + // Our best-effort approach is to find the most prevalent port name among selected pods that _do_ declare the target + // port explicitly in container ports. We'll assume that for each set of pods, the first pod is "representative" - + // i.e. we expect a ReplicaSet to be homogenous. In the vast majority of cases, this means we'll be looking for the + // largest selected ReplicaSet and using the first pod from it. + // + // The goal is to make this determination without fetching all pods belonging to the service, as that would be a + // very expensive operation to repeat every time endpoints change, and we don't expect the target port to change + // often if ever across pod/deployment lifecycles. + // + //TODO in GA, we intend to change port selection to allow for Consul TargetPort to be numeric. If we retain the + // port selection model used here beyond GA, we should consider updating it to also consider pod health, s.t. when + // the selected port name changes between deployments of a ReplicaSet, we route traffic to ports belonging to the + // set most able to serve traffic, rather than simply the largest one. + targetPortInt := int32(targetPort.IntValue()) + var mostPrevalentContainerPort *corev1.ContainerPort + maxCount := 0 + effectiveNameForPort := func(port *corev1.ContainerPort) string { + if port.Name != "" { + return port.Name + } + return targetPort.String() + } + for _, podData := range prefixedPods { + containerPort := getTargetContainerPort(targetPortInt, podData.samplePod) + + // Ignore pods without a declared port matching the service targetPort. + if containerPort == nil { + continue + } + + // If this is the most prevalent container port by pod set size, update result. + if maxCount < podData.podCount { + mostPrevalentContainerPort = containerPort + maxCount = podData.podCount + } + } + + if mostPrevalentContainerPort != nil { + return effectiveNameForPort(mostPrevalentContainerPort) + } + + // If no pod sets have the expected target port, fall back to the most common name among exact-name pods. + // An assumption here is that exact name pods mixed with pod sets will be rare, and sets should be preferred. + if len(exactNamePods) > 0 { + nameCount := make(map[string]int) + for _, podData := range exactNamePods { + if containerPort := getTargetContainerPort(targetPortInt, podData.samplePod); containerPort != nil { + nameCount[effectiveNameForPort(containerPort)] += 1 + } + } + if len(nameCount) > 0 { + maxNameCount := 0 + mostPrevalentContainerPortName := "" + for name, count := range nameCount { + if maxNameCount < count { + mostPrevalentContainerPortName = name + maxNameCount = count + } + } + return mostPrevalentContainerPortName + } + } + + // If still no match for the target port, fall back to string-ifying the target port name, which + // is what the PodController will do when converting unnamed ContainerPorts to Workload ports. + return targetPort.String() +} + +// getTargetContainerPort returns the pod ContainerPort matching the given numeric port value, or nil if none is found. +func getTargetContainerPort(targetPort int32, pod *corev1.Pod) *corev1.ContainerPort { + for _, c := range pod.Spec.Containers { + if len(c.Ports) == 0 { + continue + } + for _, p := range c.Ports { + if p.ContainerPort == targetPort && p.Protocol == corev1.ProtocolTCP { + return &p + } + } + } + return nil +} + // getServiceVIPs returns the VIPs to associate with the registered Consul service. This will contain the Kubernetes // Service ClusterIP if it exists. // @@ -309,18 +460,18 @@ func getServiceMeta(service corev1.Service) map[string]string { // getWorkloadSelector returns the WorkloadSelector for the given pod name prefixes and exact names. // It returns nil if the provided name sets are empty. -func getWorkloadSelector(podPrefixes, podExactNames map[string]any) *pbcatalog.WorkloadSelector { +func getWorkloadSelector(prefixedPods selectorPodData, exactNamePods selectorPodData) *pbcatalog.WorkloadSelector { // If we don't have any values, return nil - if len(podPrefixes) == 0 && len(podExactNames) == 0 { + if len(prefixedPods) == 0 && len(exactNamePods) == 0 { return nil } // Create the WorkloadSelector workloads := &pbcatalog.WorkloadSelector{} - for v := range podPrefixes { + for v := range prefixedPods { workloads.Prefixes = append(workloads.Prefixes, v) } - for v := range podExactNames { + for v := range exactNamePods { workloads.Names = append(workloads.Names, v) } diff --git a/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller_test.go b/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller_test.go index 1762471a46..f696e06150 100644 --- a/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller_test.go +++ b/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller_test.go @@ -35,6 +35,10 @@ import ( "github.com/hashicorp/consul-k8s/control-plane/helper/test" ) +const ( + kindDaemonSet = "DaemonSet" +) + var ( appProtocolHttp = "http" appProtocolHttp2 = "http2" @@ -158,7 +162,7 @@ func TestReconcile_CreateService(t *testing.T) { svcName: "service-created", k8sObjects: func() []runtime.Object { pod1 := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-abcde") - pod2 := createServicePod("DaemonSet", "service-created-ds", "12345") + pod2 := createServicePod(kindDaemonSet, "service-created-ds", "12345") endpoints := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "service-created", @@ -466,6 +470,385 @@ func TestReconcile_CreateService(t *testing.T) { }, }, }, + { + name: "Numeric service target port: Named container port gets the pod port name", + svcName: "service-created", + k8sObjects: func() []runtime.Object { + pod1 := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-abcde", + // Named port with container port value matching service target port + containerWithPort("named-port", 2345), + // Unnamed port with container port value matching service target port + containerWithPort("", 6789)) + endpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-created", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: addressesForPods(pod1), + Ports: []corev1.EndpointPort{ + { + Name: "public", + Port: 2345, + Protocol: "TCP", + AppProtocol: &appProtocolHttp, + }, + { + Name: "api", + Port: 6789, + Protocol: "TCP", + AppProtocol: &appProtocolGrpc, + }, + }, + }, + }, + } + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-created", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "172.18.0.1", + Ports: []corev1.ServicePort{ + { + Name: "public", + Port: 8080, + Protocol: "TCP", + TargetPort: intstr.FromInt(2345), // Numeric target port + AppProtocol: &appProtocolHttp, + }, + { + Name: "api", + Port: 9090, + Protocol: "TCP", + TargetPort: intstr.FromInt(6789), // Numeric target port + AppProtocol: &appProtocolGrpc, + }, + { + Name: "unmatched-port", + Port: 10010, + Protocol: "TCP", + TargetPort: intstr.FromInt(10010), // Numeric target port + AppProtocol: &appProtocolHttp, + }, + }, + }, + } + return []runtime.Object{pod1, endpoints, service} + }, + expectedResource: &pbresource.Resource{ + Id: &pbresource.ID{ + Name: "service-created", + Type: &pbresource.Type{ + Group: "catalog", + GroupVersion: "v1alpha1", + Kind: "Service", + }, + Tenancy: &pbresource.Tenancy{ + Namespace: constants.DefaultConsulNS, + Partition: constants.DefaultConsulPartition, + }, + }, + Data: common.ToProtoAny(&pbcatalog.Service{ + Ports: []*pbcatalog.ServicePort{ + { + VirtualPort: 8080, + TargetPort: "named-port", // Matches container port name, not service target number + Protocol: pbcatalog.Protocol_PROTOCOL_HTTP, + }, + { + VirtualPort: 9090, + TargetPort: "6789", // Matches service target number + Protocol: pbcatalog.Protocol_PROTOCOL_GRPC, + }, + { + VirtualPort: 10010, + TargetPort: "10010", // Matches service target number (unmatched by container ports) + Protocol: pbcatalog.Protocol_PROTOCOL_HTTP, + }, + { + TargetPort: "mesh", + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + Workloads: &pbcatalog.WorkloadSelector{ + Prefixes: []string{"service-created-rs-abcde"}, + }, + VirtualIps: []string{"172.18.0.1"}, + }), + Metadata: map[string]string{ + constants.MetaKeyKubeNS: constants.DefaultConsulNS, + constants.MetaKeyManagedBy: constants.ManagedByEndpointsValue, + }, + }, + }, + { + name: "Numeric service target port: Container port mix gets the name from largest matching pod set", + svcName: "service-created", + k8sObjects: func() []runtime.Object { + // Unnamed port matching service target port. + // Also has second named port, and is not the most prevalent set for that port. + pod1 := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-abcde", + containerWithPort("", 2345), + containerWithPort("api-port", 6789)) + + // Named port with different name from most prevalent pods. + // Also has second unnamed port, and _is_ the most prevalent set for that port. + pod2a := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-fghij", + containerWithPort("another-port-name", 2345), + containerWithPort("", 6789)) + pod2b := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-fghij", + containerWithPort("another-port-name", 2345), + containerWithPort("", 6789)) + + // Named port with container port value matching service target port. + // The most common "set" of pods, so should become the port name for service target port. + pod3a := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-klmno", + containerWithPort("named-port", 2345)) + pod3b := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-klmno", + containerWithPort("named-port", 2345)) + pod3c := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-klmno", + containerWithPort("named-port", 2345)) + + // Named port that does not match service target port. + // More common "set" of pods selected by the service, but does not have a target port (value) match. + pod4a := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-pqrst", + containerWithPort("non-matching-named-port", 5432)) + pod4b := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-pqrst", + containerWithPort("non-matching-named-port", 5432)) + pod4c := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-pqrst", + containerWithPort("non-matching-named-port", 5432)) + pod4d := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-pqrst", + containerWithPort("non-matching-named-port", 5432)) + + // Named port from non-injected pods. + // More common "set" of pods selected by the service, but should be filtered out. + pod5a := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-uvwxy", + containerWithPort("ignored-named-port", 2345)) + pod5b := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-uvwxy", + containerWithPort("ignored-named-port", 2345)) + pod5c := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-uvwxy", + containerWithPort("ignored-named-port", 2345)) + pod5d := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-uvwxy", + containerWithPort("ignored-named-port", 2345)) + for _, p := range []*corev1.Pod{pod5a, pod5b, pod5c, pod5d} { + removeMeshInjectStatus(t, p) + } + + // Named port with container port value matching service target port. + // Single pod from non-ReplicaSet owner. Should not take precedence over set pods. + pod6a := createServicePod(kindDaemonSet, "service-created-ds", "12345", + containerWithPort("another-port-name", 2345)) + + endpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-created", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: addressesForPods( + pod1, + pod2a, pod2b, + pod3a, pod3b, pod3c, + pod4a, pod4b, pod4c, pod4d, + pod5a, pod5b, pod5c, pod5d, + pod6a), + Ports: []corev1.EndpointPort{ + { + Name: "public", + Port: 2345, + Protocol: "TCP", + AppProtocol: &appProtocolHttp, + }, + }, + }, + }, + } + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-created", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "172.18.0.1", + Ports: []corev1.ServicePort{ + { + Name: "public", + Port: 8080, + Protocol: "TCP", + TargetPort: intstr.FromInt(2345), // Numeric target port + AppProtocol: &appProtocolHttp, + }, + { + Name: "api", + Port: 9090, + Protocol: "TCP", + TargetPort: intstr.FromInt(6789), // Numeric target port + AppProtocol: &appProtocolGrpc, + }, + }, + }, + } + return []runtime.Object{ + pod1, + pod2a, pod2b, + pod3a, pod3b, pod3c, + pod4a, pod4b, pod4c, pod4d, + pod5a, pod5b, pod5c, pod5d, + pod6a, + endpoints, service} + }, + expectedResource: &pbresource.Resource{ + Id: &pbresource.ID{ + Name: "service-created", + Type: &pbresource.Type{ + Group: "catalog", + GroupVersion: "v1alpha1", + Kind: "Service", + }, + Tenancy: &pbresource.Tenancy{ + Namespace: constants.DefaultConsulNS, + Partition: constants.DefaultConsulPartition, + }, + }, + Data: common.ToProtoAny(&pbcatalog.Service{ + Ports: []*pbcatalog.ServicePort{ + { + VirtualPort: 8080, + TargetPort: "named-port", // Matches container port name, not service target number + Protocol: pbcatalog.Protocol_PROTOCOL_HTTP, + }, + { + VirtualPort: 9090, + TargetPort: "6789", // Matches service target number due to unnamed being most common + Protocol: pbcatalog.Protocol_PROTOCOL_GRPC, + }, + { + TargetPort: "mesh", + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + Workloads: &pbcatalog.WorkloadSelector{ + Prefixes: []string{ + "service-created-rs-abcde", + "service-created-rs-fghij", + "service-created-rs-klmno", + "service-created-rs-pqrst", + }, + Names: []string{ + "service-created-ds-12345", + }, + }, + VirtualIps: []string{"172.18.0.1"}, + }), + Metadata: map[string]string{ + constants.MetaKeyKubeNS: constants.DefaultConsulNS, + constants.MetaKeyManagedBy: constants.ManagedByEndpointsValue, + }, + }, + }, + { + name: "Numeric service target port: Most used container port name from exact name pods used when no pod sets present", + svcName: "service-created", + k8sObjects: func() []runtime.Object { + // Named port with different name from most prevalent pods. + pod1a := createServicePod(kindDaemonSet, "service-created-ds1", "12345", + containerWithPort("another-port-name", 2345)) + + // Named port with container port value matching service target port. + // The most common container port name, so should become the port name for service target port. + pod2a := createServicePod(kindDaemonSet, "service-created-ds2", "12345", + containerWithPort("named-port", 2345)) + pod2b := createServicePod(kindDaemonSet, "service-created-ds2", "23456", + containerWithPort("named-port", 2345)) + + endpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-created", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: addressesForPods( + pod1a, + pod2a, pod2b), + Ports: []corev1.EndpointPort{ + { + Name: "public", + Port: 2345, + Protocol: "TCP", + AppProtocol: &appProtocolHttp, + }, + }, + }, + }, + } + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-created", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "172.18.0.1", + Ports: []corev1.ServicePort{ + { + Name: "public", + Port: 8080, + Protocol: "TCP", + TargetPort: intstr.FromInt(2345), // Numeric target port + AppProtocol: &appProtocolHttp, + }, + }, + }, + } + return []runtime.Object{ + pod1a, + pod2a, pod2b, + endpoints, service} + }, + expectedResource: &pbresource.Resource{ + Id: &pbresource.ID{ + Name: "service-created", + Type: &pbresource.Type{ + Group: "catalog", + GroupVersion: "v1alpha1", + Kind: "Service", + }, + Tenancy: &pbresource.Tenancy{ + Namespace: constants.DefaultConsulNS, + Partition: constants.DefaultConsulPartition, + }, + }, + Data: common.ToProtoAny(&pbcatalog.Service{ + Ports: []*pbcatalog.ServicePort{ + { + VirtualPort: 8080, + TargetPort: "named-port", // Matches container port name, not service target number + Protocol: pbcatalog.Protocol_PROTOCOL_HTTP, + }, + { + TargetPort: "mesh", + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + Workloads: &pbcatalog.WorkloadSelector{ + Names: []string{ + "service-created-ds1-12345", + "service-created-ds2-12345", + "service-created-ds2-23456", + }, + }, + VirtualIps: []string{"172.18.0.1"}, + }), + Metadata: map[string]string{ + constants.MetaKeyKubeNS: constants.DefaultConsulNS, + constants.MetaKeyManagedBy: constants.ManagedByEndpointsValue, + }, + }, + }, { name: "Only L4 TCP ports get a Consul Service port when L4 protocols are multiplexed", svcName: "service-created", @@ -561,8 +944,7 @@ func TestReconcile_CreateService(t *testing.T) { svcName: "service-created", k8sObjects: func() []runtime.Object { pod1 := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-abcde") - // Clear mesh inject status - delete(pod1.Annotations, constants.KeyMeshInjectStatus) + removeMeshInjectStatus(t, pod1) endpoints := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "service-created", @@ -621,8 +1003,8 @@ func TestReconcile_UpdateService(t *testing.T) { k8sObjects: func() []runtime.Object { pod1 := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-abcde") pod2 := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-klmno") - pod3 := createServicePod("DaemonSet", "service-created-ds", "12345") - pod4 := createServicePod("DaemonSet", "service-created-ds", "34567") + pod3 := createServicePod(kindDaemonSet, "service-created-ds", "12345") + pod4 := createServicePod(kindDaemonSet, "service-created-ds", "34567") endpoints := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "service-updated", @@ -753,7 +1135,7 @@ func TestReconcile_UpdateService(t *testing.T) { svcName: "service-updated", k8sObjects: func() []runtime.Object { pod1 := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-abcde") - pod2 := createServicePod("DaemonSet", "service-created-ds", "12345") + pod2 := createServicePod(kindDaemonSet, "service-created-ds", "12345") endpoints := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "service-updated", @@ -961,7 +1343,7 @@ func TestGetWorkloadSelectorFromEndpoints(t *testing.T) { type testCase struct { name string - endpoints *corev1.Endpoints + endpoints corev1.Endpoints responses map[types.NamespacedName]*corev1.Pod expected *pbcatalog.WorkloadSelector mockFn func(*testing.T, *MockPodFetcher) @@ -976,13 +1358,19 @@ func TestGetWorkloadSelectorFromEndpoints(t *testing.T) { createServicePod(kindReplicaSet, "svc-rs-fghij", "34567"), } otherPods := []*corev1.Pod{ - createServicePod("DaemonSet", "svc-ds", "12345"), - createServicePod("DaemonSet", "svc-ds", "23456"), - createServicePod("DaemonSet", "svc-ds", "34567"), + createServicePod(kindDaemonSet, "svc-ds", "12345"), + createServicePod(kindDaemonSet, "svc-ds", "23456"), + createServicePod(kindDaemonSet, "svc-ds", "34567"), createServicePod("StatefulSet", "svc-ss", "12345"), createServicePod("StatefulSet", "svc-ss", "23456"), createServicePod("StatefulSet", "svc-ss", "34567"), } + ignoredPods := []*corev1.Pod{ + createServicePod(kindReplicaSet, "svc-rs-ignored-klmno", "12345"), + createServicePod(kindReplicaSet, "svc-rs-ignored-klmno", "23456"), + createServicePod(kindReplicaSet, "svc-rs-ignored-klmno", "34567"), + } + podsByName := make(map[types.NamespacedName]*corev1.Pod) for _, p := range rsPods { podsByName[types.NamespacedName{Name: p.Name, Namespace: p.Namespace}] = p @@ -990,11 +1378,15 @@ func TestGetWorkloadSelectorFromEndpoints(t *testing.T) { for _, p := range otherPods { podsByName[types.NamespacedName{Name: p.Name, Namespace: p.Namespace}] = p } + for _, p := range ignoredPods { + removeMeshInjectStatus(t, p) + podsByName[types.NamespacedName{Name: p.Name, Namespace: p.Namespace}] = p + } cases := []testCase{ { name: "Pod is fetched once per ReplicaSet", - endpoints: &corev1.Endpoints{ + endpoints: corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "svc", Namespace: "default", @@ -1015,11 +1407,11 @@ func TestGetWorkloadSelectorFromEndpoints(t *testing.T) { responses: podsByName, expected: getWorkloadSelector( // Selector should consist of prefixes only. - map[string]any{ - "svc-rs-abcde": true, - "svc-rs-fghij": true, + selectorPodData{ + "svc-rs-abcde": &podSetData{}, + "svc-rs-fghij": &podSetData{}, }, - map[string]any{}), + selectorPodData{}), mockFn: func(t *testing.T, pf *MockPodFetcher) { // Assert called once per set. require.Equal(t, 2, len(pf.calls)) @@ -1027,7 +1419,7 @@ func TestGetWorkloadSelectorFromEndpoints(t *testing.T) { }, { name: "Pod is fetched once per other pod owner type", - endpoints: &corev1.Endpoints{ + endpoints: corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "svc", Namespace: "default", @@ -1048,21 +1440,47 @@ func TestGetWorkloadSelectorFromEndpoints(t *testing.T) { responses: podsByName, expected: getWorkloadSelector( // Selector should consist of exact name matches only. - map[string]any{}, - map[string]any{ - "svc-ds-12345": true, - "svc-ds-23456": true, - "svc-ds-34567": true, - "svc-ss-12345": true, - "svc-ss-23456": true, - "svc-ss-34567": true, + selectorPodData{}, + selectorPodData{ + "svc-ds-12345": &podSetData{}, + "svc-ds-23456": &podSetData{}, + "svc-ds-34567": &podSetData{}, + "svc-ss-12345": &podSetData{}, + "svc-ss-23456": &podSetData{}, + "svc-ss-34567": &podSetData{}, }), mockFn: func(t *testing.T, pf *MockPodFetcher) { // Assert called once per pod. require.Equal(t, len(otherPods), len(pf.calls)) }, }, - //TODO: Add cases to cover non-injected pod skipping. + { + name: "Pod is ignored if not mesh-injected", + endpoints: corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: addressesForPods(ignoredPods...), + Ports: []corev1.EndpointPort{ + { + Name: "my-http-port", + AppProtocol: &appProtocolHttp, + Port: 2345, + }, + }, + }, + }, + }, + responses: podsByName, + expected: nil, + mockFn: func(t *testing.T, pf *MockPodFetcher) { + // Assert called once for single set. + require.Equal(t, 1, len(pf.calls)) + }, + }, } for _, tc := range cases { @@ -1075,10 +1493,11 @@ func TestGetWorkloadSelectorFromEndpoints(t *testing.T) { Log: logrtest.New(t), } - resp, err := ep.getWorkloadSelectorFromEndpoints(ctx, &pf, tc.endpoints) + prefixedPods, exactNamePods, err := ep.getWorkloadDataFromEndpoints(ctx, &pf, tc.endpoints) require.NoError(t, err) - if diff := cmp.Diff(tc.expected, resp, test.CmpProtoIgnoreOrder()...); diff != "" { + ws := getWorkloadSelector(prefixedPods, exactNamePods) + if diff := cmp.Diff(tc.expected, ws, test.CmpProtoIgnoreOrder()...); diff != "" { t.Errorf("unexpected difference:\n%v", diff) } tc.mockFn(t, &pf) @@ -1191,11 +1610,11 @@ func expectedServiceMatches(t *testing.T, client pbresource.ResourceServiceClien } } -func createServicePodOwnedBy(ownerKind, ownerName string) *corev1.Pod { - return createServicePod(ownerKind, ownerName, randomKubernetesId()) +func createServicePodOwnedBy(ownerKind, ownerName string, containers ...corev1.Container) *corev1.Pod { + return createServicePod(ownerKind, ownerName, randomKubernetesId(), containers...) } -func createServicePod(ownerKind, ownerName, podId string) *corev1.Pod { +func createServicePod(ownerKind, ownerName, podId string, containers ...corev1.Container) *corev1.Pod { pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-%s", ownerName, podId), @@ -1212,10 +1631,25 @@ func createServicePod(ownerKind, ownerName, podId string) *corev1.Pod { }, }, }, + Spec: corev1.PodSpec{ + Containers: containers, + }, } return pod } +func containerWithPort(name string, port int32) corev1.Container { + return corev1.Container{ + Ports: []corev1.ContainerPort{ + { + Name: name, + ContainerPort: port, + Protocol: "TCP", + }, + }, + } +} + func addressesForPods(pods ...*corev1.Pod) []corev1.EndpointAddress { var addresses []corev1.EndpointAddress for i, p := range pods { @@ -1238,3 +1672,8 @@ func randomKubernetesId() string { } return u[:5] } + +func removeMeshInjectStatus(t *testing.T, pod *corev1.Pod) { + delete(pod.Annotations, constants.KeyMeshInjectStatus) + require.False(t, common.HasBeenMeshInjected(*pod)) +}