Skip to content

Commit

Permalink
[NET-5674] v2: Conditional target port when numeric in k8s (#2978)
Browse files Browse the repository at this point in the history
v2: Conditional target port when numeric in k8s

When choosing a Consul service TargetPort (name) in the v2 Endpoints
Controller, attempt to find the best match among endpoint container
ports when the K8s service target port value is a number.

This bridges an existing gap between Consul (which always expects a
Workload port to be named), and K8s (where container ports need not be
named, and names are ignored when the K8s service targets by number).

This change will be mostly reverted in a future release once Consul's v2
data model allows for target ports to be a name or number in alignment
w/ K8s behavior.
  • Loading branch information
zalimeni authored Sep 21, 2023
1 parent c1bcdf4 commit f700c2d
Show file tree
Hide file tree
Showing 6 changed files with 686 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ spec:
- -listen=:9090
ports:
- containerPort: 9090
# This name is meant to be used alongside the _numeric_ K8s service target port
# to verify that we can still route traffic to the named port when there's a mismatch.
name: admin
# TODO: (v2/nitya) add these probes back when expose paths and L7 are supported.
# livenessProbe:
Expand Down
4 changes: 2 additions & 2 deletions acceptance/tests/fixtures/bases/v2-multiport-app/service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ spec:
ports:
- name: web
port: 8080
# TODO: (v2/nitya) ensure this works with numeric target ports also once support for that is merged.
targetPort: web
- name: admin
port: 9090
targetPort: admin
# Test with a mix of named and numeric target ports.
targetPort: 9090
3 changes: 3 additions & 0 deletions control-plane/connect-inject/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ func PortValueFromIntOrString(pod corev1.Pod, port intstr.IntOrString) (uint32,
// HasBeenMeshInjected checks the value of the status annotation and returns true if the Pod has been injected.
// Does not apply to V1 pods, which use a different key (`constants.KeyInjectStatus`).
func HasBeenMeshInjected(pod corev1.Pod) bool {
if pod.Annotations == nil {
return false
}
if anno, ok := pod.Annotations[constants.KeyMeshInjectStatus]; ok && anno == constants.Injected {
return true
}
Expand Down
11 changes: 11 additions & 0 deletions control-plane/connect-inject/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,17 @@ func TestHasBeenMeshInjected(t *testing.T) {
},
expected: false,
},
{
name: "Pod with nil annotations",
pod: corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: metav1.NamespaceDefault,
Labels: map[string]string{},
},
},
expected: false,
},
}

for _, tt := range cases {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -60,7 +61,6 @@ func (r *Controller) SetupWithManager(mgr ctrl.Manager) error {
// Reconcile reads the state of an Endpoints object for a Kubernetes Service and reconciles Consul services which
// correspond to the Kubernetes Service. These events are driven by changes to the Pods backing the Kube service.
func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
var errs error
var endpoints corev1.Endpoints
var service corev1.Service

Expand Down Expand Up @@ -96,38 +96,74 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}
r.Log.Info("retrieved Service", "name", req.Name, "ns", req.Namespace)

workloadSelector, err := r.getWorkloadSelectorFromEndpoints(ctx, &ClientPodFetcher{client: r.Client}, &endpoints)
consulSvc, err := r.getConsulService(ctx, &ClientPodFetcher{client: r.Client}, service, endpoints)
if err != nil {
errs = multierror.Append(errs, err)
r.Log.Error(err, "failed to build Consul service resource", "name", req.Name, "ns", req.Namespace)
return ctrl.Result{}, err
}

// If we have at least one mesh-injected pod targeted by the service, register it in Consul.
// If we don't have at least one mesh-injected pod targeted by the service, do not register the service.
//TODO: Register service with mesh port added if global flag for inject is true,
// even if Endpoints are empty or have no mesh pod, iff. the service has a selector.
// This should ensure that we don't target kube or consul (system) services.
if workloadSelector != nil {
//TODO: Maybe check service-enable label here on service/deployments/other pod owners
if err = r.registerService(ctx, resourceClient, service, workloadSelector); err != nil {
// We could be racing with the namespace controller.
// Requeue (which includes backoff) to try again.
if common.ConsulNamespaceIsNotFound(err) {
r.Log.Info("Consul namespace not found; re-queueing request",
"service", service.GetName(), "ns", req.Namespace,
"consul-ns", r.getConsulNamespace(req.Namespace), "err", err.Error())
return ctrl.Result{Requeue: true}, nil
}
errs = multierror.Append(errs, err)
if consulSvc.Workloads == nil {
return ctrl.Result{}, nil
}

// Register the service in Consul.
//TODO: Maybe check service-enable label here on service/deployments/other pod owners
if err = r.registerService(ctx, resourceClient, service, consulSvc); err != nil {
// We could be racing with the namespace controller.
// Requeue (which includes backoff) to try again.
if common.ConsulNamespaceIsNotFound(err) {
r.Log.Info("Consul namespace not found; re-queueing request",
"service", service.GetName(), "ns", req.Namespace,
"consul-ns", r.getConsulNamespace(req.Namespace), "err", err.Error())
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, err
}
return ctrl.Result{}, errs

return ctrl.Result{}, nil
}

// getWorkloadSelectorFromEndpoints calculates a Consul service WorkloadSelector from Endpoints based on pod names and
// owners.
func (r *Controller) getWorkloadSelectorFromEndpoints(ctx context.Context, pf PodFetcher, endpoints *corev1.Endpoints) (*pbcatalog.WorkloadSelector, error) {
podPrefixes := make(map[string]any)
podExactNames := make(map[string]any)
func (r *Controller) getConsulService(ctx context.Context, pf PodFetcher, service corev1.Service, endpoints corev1.Endpoints) (*pbcatalog.Service, error) {
prefixedPods, exactNamePods, err := r.getWorkloadDataFromEndpoints(ctx, pf, endpoints)
if err != nil {
return nil, err
}

// Create Consul Service resource to be registered.
return &pbcatalog.Service{
Workloads: getWorkloadSelector(prefixedPods, exactNamePods),
Ports: getServicePorts(service, prefixedPods, exactNamePods),
VirtualIps: r.getServiceVIPs(service),
}, nil
}

type podSetData struct {
podCount int
samplePod *corev1.Pod
}

// selectorPodData represents data for each set of pods represented by a WorkloadSelector value.
// The data may be for several pods (prefix) or a single pod (exact name).
// This is used for choosing the ideal Consul service TargetPort value when the K8s service target port is numeric.
type selectorPodData map[string]*podSetData

// getWorkloadDataFromEndpoints accumulates data to supply the Consul service WorkloadSelector and TargetPort from
// Endpoints based on pod names and owners.
func (r *Controller) getWorkloadDataFromEndpoints(ctx context.Context, pf PodFetcher, endpoints corev1.Endpoints) (selectorPodData, selectorPodData, error) {
var errs error

// Determine the workload selector by fetching as many pods as needed to accumulate prefixes
// and exact pod name matches.
//
// If the K8s service target port is numeric, we also use this information to determine the
// appropriate Consul target port value.
prefixedPods := make(selectorPodData)
exactNamePods := make(selectorPodData)
ignoredPodPrefixes := make(map[string]any)
for address := range allAddresses(endpoints.Subsets) {
if address.TargetRef != nil && address.TargetRef.Kind == "Pod" {
podName := types.NamespacedName{Name: address.TargetRef.Name, Namespace: endpoints.Namespace}
Expand All @@ -137,30 +173,56 @@ func (r *Controller) getWorkloadSelectorFromEndpoints(ctx context.Context, pf Po
// If not, fetch the owner. If the owner has a unique prefix, add it to known prefixes.
// If not, add the pod name to exact name matches.
maybePodOwnerPrefix := getOwnerPrefixFromPodName(podName.Name)
if _, ok := podPrefixes[maybePodOwnerPrefix]; !ok {

// If prefix is ignored, skip pod.
if _, ok := ignoredPodPrefixes[maybePodOwnerPrefix]; ok {
continue
}

if existingPodData, ok := prefixedPods[maybePodOwnerPrefix]; !ok {
// Fetch pod info from K8s.
pod, err := pf.GetPod(ctx, podName)
if err != nil {
r.Log.Error(err, "failed to get pod", "name", podName.Name, "ns", endpoints.Namespace)
errs = multierror.Append(errs, err)
continue
}

// If the pod hasn't been mesh-injected, skip it, as it won't be available as a workload.
if !common.HasBeenMeshInjected(*pod) {
continue
// Store data corresponding to the new selector value, which may be an actual set or exact pod.
podData := podSetData{
podCount: 1,
samplePod: pod,
}

// Add to workload selector values.
// Add pod to workload selector values as appropriate.
// Pods can appear more than once in Endpoints subsets, so we use a set for exact names as well.
if prefix := getOwnerPrefixFromPod(pod); prefix != "" {
podPrefixes[prefix] = true
if common.HasBeenMeshInjected(*pod) {
// Add to the list of pods represented by this prefix. This list is used by
// `getEffectiveTargetPort` to determine the most-used target container port name if the
// k8s service target port is numeric.
prefixedPods[prefix] = &podData
} else {
// If the pod hasn't been mesh-injected, ignore it, as it won't be available as a workload.
// Remember its prefix to avoid fetching its siblings needlessly.
ignoredPodPrefixes[prefix] = true
}
} else {
podExactNames[podName.Name] = true
if common.HasBeenMeshInjected(*pod) {
exactNamePods[podName.Name] = &podData
}
// If the pod hasn't been mesh-injected, ignore it, as it won't be available as a workload.
// No need to remember ignored exact pod names since we don't expect to see them twice.
}
} else {
// We've seen this prefix before.
// Keep track of how many times so that we can choose a container port name if needed later.
existingPodData.podCount += 1
}
}
}
return getWorkloadSelector(podPrefixes, podExactNames), errs

return prefixedPods, exactNamePods, errs
}

// allAddresses combines all Endpoints subset addresses to a single set. Service registration by this controller
Expand Down Expand Up @@ -202,18 +264,13 @@ func getOwnerPrefixFromPod(pod *corev1.Pod) string {
}

// registerService creates a Consul service registration from the provided Kuberetes service and endpoint information.
func (r *Controller) registerService(ctx context.Context, resourceClient pbresource.ResourceServiceClient, service corev1.Service, selector *pbcatalog.WorkloadSelector) error {
consulSvc := &pbcatalog.Service{
Workloads: selector,
Ports: getServicePorts(service),
VirtualIps: r.getServiceVIPs(service),
}
func (r *Controller) registerService(ctx context.Context, resourceClient pbresource.ResourceServiceClient, k8sService corev1.Service, consulSvc *pbcatalog.Service) error {
consulSvcResource := r.getServiceResource(
consulSvc,
service.Name, // Consul and Kubernetes service name will always match
r.getConsulNamespace(service.Namespace),
k8sService.Name, // Consul and Kubernetes service name will always match
r.getConsulNamespace(k8sService.Namespace),
r.getConsulPartition(),
getServiceMeta(service),
getServiceMeta(k8sService),
)

r.Log.Info("registering service with Consul", getLogFieldsForResource(consulSvcResource.Id)...)
Expand Down Expand Up @@ -253,7 +310,7 @@ func getServiceID(name, namespace, partition string) *pbresource.ID {
}

// getServicePorts converts Kubernetes Service ports data into Consul service ports.
func getServicePorts(service corev1.Service) []*pbcatalog.ServicePort {
func getServicePorts(service corev1.Service, prefixedPods selectorPodData, exactNamePods selectorPodData) []*pbcatalog.ServicePort {
ports := make([]*pbcatalog.ServicePort, 0, len(service.Spec.Ports)+1)

for _, p := range service.Spec.Ports {
Expand All @@ -266,10 +323,8 @@ func getServicePorts(service corev1.Service) []*pbcatalog.ServicePort {
if p.Protocol == corev1.ProtocolTCP {
ports = append(ports, &pbcatalog.ServicePort{
VirtualPort: uint32(p.Port),
//TODO: If the value is a number, infer the correct name value based on
// the most prevalent endpoint subset for the port (best-effot, inspect a pod).
TargetPort: p.TargetPort.String(),
Protocol: common.GetPortProtocol(p.AppProtocol),
TargetPort: getEffectiveTargetPort(p.TargetPort, prefixedPods, exactNamePods),
Protocol: common.GetPortProtocol(p.AppProtocol),
})
}
}
Expand All @@ -285,6 +340,102 @@ func getServicePorts(service corev1.Service) []*pbcatalog.ServicePort {
return ports
}

func getEffectiveTargetPort(targetPort intstr.IntOrString, prefixedPods selectorPodData, exactNamePods selectorPodData) string {
// The Kubernetes service is targeting a port name; use it directly.
// The expected behavior of Kubernetes is that all included Endpoints conform and have a matching named port.
// This is the simplest path and preferred over services targeting by port number.
if targetPort.Type == intstr.String {
return targetPort.String()
}

// The Kubernetes service is targeting a numeric port. This is more complicated for mapping to Consul:
// - Endpoints will contain _all_ selected pods, not just those with a matching declared port number.
// - Consul Workload ports always have a name, so we must determine the best name to match on.
// - There may be more than one option among the pods with named ports, including no name at all.
//
// Our best-effort approach is to find the most prevalent port name among selected pods that _do_ declare the target
// port explicitly in container ports. We'll assume that for each set of pods, the first pod is "representative" -
// i.e. we expect a ReplicaSet to be homogenous. In the vast majority of cases, this means we'll be looking for the
// largest selected ReplicaSet and using the first pod from it.
//
// The goal is to make this determination without fetching all pods belonging to the service, as that would be a
// very expensive operation to repeat every time endpoints change, and we don't expect the target port to change
// often if ever across pod/deployment lifecycles.
//
//TODO in GA, we intend to change port selection to allow for Consul TargetPort to be numeric. If we retain the
// port selection model used here beyond GA, we should consider updating it to also consider pod health, s.t. when
// the selected port name changes between deployments of a ReplicaSet, we route traffic to ports belonging to the
// set most able to serve traffic, rather than simply the largest one.
targetPortInt := int32(targetPort.IntValue())
var mostPrevalentContainerPort *corev1.ContainerPort
maxCount := 0
effectiveNameForPort := func(port *corev1.ContainerPort) string {
if port.Name != "" {
return port.Name
}
return targetPort.String()
}
for _, podData := range prefixedPods {
containerPort := getTargetContainerPort(targetPortInt, podData.samplePod)

// Ignore pods without a declared port matching the service targetPort.
if containerPort == nil {
continue
}

// If this is the most prevalent container port by pod set size, update result.
if maxCount < podData.podCount {
mostPrevalentContainerPort = containerPort
maxCount = podData.podCount
}
}

if mostPrevalentContainerPort != nil {
return effectiveNameForPort(mostPrevalentContainerPort)
}

// If no pod sets have the expected target port, fall back to the most common name among exact-name pods.
// An assumption here is that exact name pods mixed with pod sets will be rare, and sets should be preferred.
if len(exactNamePods) > 0 {
nameCount := make(map[string]int)
for _, podData := range exactNamePods {
if containerPort := getTargetContainerPort(targetPortInt, podData.samplePod); containerPort != nil {
nameCount[effectiveNameForPort(containerPort)] += 1
}
}
if len(nameCount) > 0 {
maxNameCount := 0
mostPrevalentContainerPortName := ""
for name, count := range nameCount {
if maxNameCount < count {
mostPrevalentContainerPortName = name
maxNameCount = count
}
}
return mostPrevalentContainerPortName
}
}

// If still no match for the target port, fall back to string-ifying the target port name, which
// is what the PodController will do when converting unnamed ContainerPorts to Workload ports.
return targetPort.String()
}

// getTargetContainerPort returns the pod ContainerPort matching the given numeric port value, or nil if none is found.
func getTargetContainerPort(targetPort int32, pod *corev1.Pod) *corev1.ContainerPort {
for _, c := range pod.Spec.Containers {
if len(c.Ports) == 0 {
continue
}
for _, p := range c.Ports {
if p.ContainerPort == targetPort && p.Protocol == corev1.ProtocolTCP {
return &p
}
}
}
return nil
}

// getServiceVIPs returns the VIPs to associate with the registered Consul service. This will contain the Kubernetes
// Service ClusterIP if it exists.
//
Expand All @@ -309,18 +460,18 @@ func getServiceMeta(service corev1.Service) map[string]string {

// getWorkloadSelector returns the WorkloadSelector for the given pod name prefixes and exact names.
// It returns nil if the provided name sets are empty.
func getWorkloadSelector(podPrefixes, podExactNames map[string]any) *pbcatalog.WorkloadSelector {
func getWorkloadSelector(prefixedPods selectorPodData, exactNamePods selectorPodData) *pbcatalog.WorkloadSelector {
// If we don't have any values, return nil
if len(podPrefixes) == 0 && len(podExactNames) == 0 {
if len(prefixedPods) == 0 && len(exactNamePods) == 0 {
return nil
}

// Create the WorkloadSelector
workloads := &pbcatalog.WorkloadSelector{}
for v := range podPrefixes {
for v := range prefixedPods {
workloads.Prefixes = append(workloads.Prefixes, v)
}
for v := range podExactNames {
for v := range exactNamePods {
workloads.Names = append(workloads.Names, v)
}

Expand Down
Loading

0 comments on commit f700c2d

Please sign in to comment.