From 78ad3764233f363f98af9f7ad8ab3160fb2b9452 Mon Sep 17 00:00:00 2001 From: Iryna Shustava Date: Tue, 26 Sep 2023 14:54:24 -0600 Subject: [PATCH] Rename pbmesh.Upstreams to pbmesh.Destinations (#3005) * Rename pbmesh.Upstreams to pbmesh.Destinations * fix traffic perm acceptance tests fixture * more mesh v2 acceptance test fixes * kick off k8s tests * update images --- .../trafficpermissions.yaml | 5 +- acceptance/tests/mesh_v2/mesh_inject_test.go | 14 +- .../common/annotation_processor.go | 86 +++---- .../common/annotation_processor_test.go | 229 +++++++++--------- .../constants/annotations_and_labels.go | 2 +- .../controllers/pod/pod_controller.go | 26 +- .../pod/pod_controller_ent_test.go | 28 +-- .../controllers/pod/pod_controller_test.go | 150 ++++++------ .../connect-inject/webhookv2/container_env.go | 16 +- .../webhookv2/mesh_webhook_test.go | 4 +- control-plane/go.mod | 2 +- control-plane/go.sum | 4 +- 12 files changed, 284 insertions(+), 282 deletions(-) diff --git a/acceptance/tests/fixtures/bases/trafficpermissions/trafficpermissions.yaml b/acceptance/tests/fixtures/bases/trafficpermissions/trafficpermissions.yaml index 48d2ace187..f43bd2d62f 100644 --- a/acceptance/tests/fixtures/bases/trafficpermissions/trafficpermissions.yaml +++ b/acceptance/tests/fixtures/bases/trafficpermissions/trafficpermissions.yaml @@ -9,5 +9,6 @@ spec: destination: identityName: multiport action: allow - sources: - - identityName: static-client + permissions: + - sources: + - identityName: static-client diff --git a/acceptance/tests/mesh_v2/mesh_inject_test.go b/acceptance/tests/mesh_v2/mesh_inject_test.go index 2ee713e520..56cae5640c 100644 --- a/acceptance/tests/mesh_v2/mesh_inject_test.go +++ b/acceptance/tests/mesh_v2/mesh_inject_test.go @@ -34,10 +34,7 @@ func TestMeshInject_MultiportService(t *testing.T) { ctx := suite.Environment().DefaultContext(t) helmValues := map[string]string{ - "global.image": "thisisnotashwin/consul:foo", - "global.imageK8S": "thisisnotashwin/consul-k8s:foo", - "global.imageConsulDataplane": "jmurrethc/consul-dataplane-dev", - "global.experiments[0]": "resource-apis", + "global.experiments[0]": "resource-apis", // The UI is not supported for v2 in 1.17, so for now it must be disabled. "ui.enabled": "false", "connectInject.enabled": "true", @@ -79,16 +76,19 @@ func TestMeshInject_MultiportService(t *testing.T) { { // TODO: once ACLs are implemented, only run this block when secure=true and delete the below line and fixture since the default is deny when secure is true. - k8s.DeployKustomize(t, ctx.KubectlOptions(t), cfg.NoCleanupOnFailure, cfg.NoCleanup, cfg.DebugDirectory, "../../tests/fixtures/cases/trafficpermissions-deny") + k8s.KubectlApplyK(t, ctx.KubectlOptions(t), "../../tests/fixtures/cases/trafficpermissions-deny") // Now test that traffic is denied between the source and the destination. if cfg.EnableTransparentProxy { k8s.CheckStaticServerConnectionMultipleFailureMessages(t, ctx.KubectlOptions(t), connhelper.StaticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "", "http://multiport:8080") k8s.CheckStaticServerConnectionMultipleFailureMessages(t, ctx.KubectlOptions(t), connhelper.StaticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "", "http://multiport:9090") } else { k8s.CheckStaticServerConnectionMultipleFailureMessages(t, ctx.KubectlOptions(t), connhelper.StaticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "", "http://localhost:1234") - k8s.CheckStaticServerConnectionMultipleFailureMessages(t, ctx.KubectlOptions(t), connhelper.StaticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "", "http://multiport:2345") + k8s.CheckStaticServerConnectionMultipleFailureMessages(t, ctx.KubectlOptions(t), connhelper.StaticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "", "http://localhost:2345") } - k8s.DeployKustomize(t, ctx.KubectlOptions(t), cfg.NoCleanupOnFailure, cfg.NoCleanup, cfg.DebugDirectory, "../../tests/fixtures/bases/trafficpermissions") + k8s.KubectlApplyK(t, ctx.KubectlOptions(t), "../../tests/fixtures/bases/trafficpermissions") + helpers.Cleanup(t, cfg.NoCleanupOnFailure, cfg.NoCleanup, func() { + k8s.KubectlDeleteK(t, ctx.KubectlOptions(t), "../../tests/fixtures/bases/trafficpermissions") + }) } // Check connection from static-client to multiport. diff --git a/control-plane/connect-inject/common/annotation_processor.go b/control-plane/connect-inject/common/annotation_processor.go index 3dcb23842e..778f630049 100644 --- a/control-plane/connect-inject/common/annotation_processor.go +++ b/control-plane/connect-inject/common/annotation_processor.go @@ -19,27 +19,27 @@ const ( ConsulNodeAddress = "127.0.0.1" ) -// ProcessPodUpstreamsForMeshWebhook reads the list of upstreams from the Pod annotation and converts them into a pbmesh.Upstreams +// ProcessPodDestinationsForMeshWebhook reads the list of destinations from the Pod annotation and converts them into a pbmesh.Destinations // object. -func ProcessPodUpstreamsForMeshWebhook(pod corev1.Pod) (*pbmesh.Upstreams, error) { - return ProcessPodUpstreams(pod, true, true) +func ProcessPodDestinationsForMeshWebhook(pod corev1.Pod) (*pbmesh.Destinations, error) { + return ProcessPodDestinations(pod, true, true) } -// ProcessPodUpstreams reads the list of upstreams from the Pod annotation and converts them into a pbmesh.Upstreams +// ProcessPodDestinations reads the list of destinations from the Pod annotation and converts them into a pbmesh.Destinations // object. -func ProcessPodUpstreams(pod corev1.Pod, enablePartitions, enableNamespaces bool) (*pbmesh.Upstreams, error) { - upstreams := &pbmesh.Upstreams{} +func ProcessPodDestinations(pod corev1.Pod, enablePartitions, enableNamespaces bool) (*pbmesh.Destinations, error) { + destinations := &pbmesh.Destinations{} raw, ok := pod.Annotations[constants.AnnotationMeshDestinations] if !ok || raw == "" { return nil, nil } - upstreams.Workloads = &pbcatalog.WorkloadSelector{ + destinations.Workloads = &pbcatalog.WorkloadSelector{ Names: []string{pod.Name}, } for _, raw := range strings.Split(raw, ",") { - var upstream *pbmesh.Upstream + var destination *pbmesh.Destination // Determine the type of processing required unlabeled or labeled // [service-port-name].[service-name].[service-namespace].[service-partition]:[port]:[optional datacenter] @@ -62,25 +62,25 @@ func ProcessPodUpstreams(pod corev1.Pod, enablePartitions, enableNamespaces bool if labeledFormat { var err error - upstream, err = processPodLabeledUpstream(pod, raw, enablePartitions, enableNamespaces) + destination, err = processPodLabeledDestination(pod, raw, enablePartitions, enableNamespaces) if err != nil { - return &pbmesh.Upstreams{}, err + return nil, err } } else { var err error - upstream, err = processPodUnlabeledUpstream(pod, raw, enablePartitions, enableNamespaces) + destination, err = processPodUnlabeledDestination(pod, raw, enablePartitions, enableNamespaces) if err != nil { - return &pbmesh.Upstreams{}, err + return nil, err } } - upstreams.Upstreams = append(upstreams.Upstreams, upstream) + destinations.Destinations = append(destinations.Destinations, destination) } - return upstreams, nil + return destinations, nil } -// processPodLabeledUpstream processes an upstream in the format: +// processPodLabeledDestination processes a destination in the format: // [service-port-name].port.[service-name].svc.[service-namespace].ns.[service-peer].peer:[port] // [service-port-name].port.[service-name].svc.[service-namespace].ns.[service-partition].ap:[port] // [service-port-name].port.[service-name].svc.[service-namespace].ns.[service-datacenter].dc:[port]. @@ -88,12 +88,12 @@ func ProcessPodUpstreams(pod corev1.Pod, enablePartitions, enableNamespaces bool // The ordering matters for labeled as well as unlabeled. The ordering of the labeled parameters should follow // the order and requirements of the unlabeled parameters. // TODO: enable dc and peer support when ready, currently return errors if set. -func processPodLabeledUpstream(pod corev1.Pod, rawUpstream string, enablePartitions, enableNamespaces bool) (*pbmesh.Upstream, error) { +func processPodLabeledDestination(pod corev1.Pod, rawUpstream string, enablePartitions, enableNamespaces bool) (*pbmesh.Destination, error) { parts := strings.SplitN(rawUpstream, ":", 3) var port int32 port, _ = PortValue(pod, strings.TrimSpace(parts[1])) if port <= 0 { - return &pbmesh.Upstream{}, fmt.Errorf("port value %d in upstream is invalid: %s", port, rawUpstream) + return nil, fmt.Errorf("port value %d in destination is invalid: %s", port, rawUpstream) } service := parts[0] @@ -108,37 +108,37 @@ func processPodLabeledUpstream(pod corev1.Pod, rawUpstream string, enablePartiti case "peer": // TODO: uncomment and remove error when peers supported //peer = strings.TrimSpace(pieces[6]) - return &pbmesh.Upstream{}, fmt.Errorf("upstream currently does not support peers: %s", rawUpstream) + return nil, fmt.Errorf("destination currently does not support peers: %s", rawUpstream) case "ap": partition = strings.TrimSpace(pieces[6]) case "dc": // TODO: uncomment and remove error when datacenters are supported //datacenter = strings.TrimSpace(pieces[6]) - return &pbmesh.Upstream{}, fmt.Errorf("upstream currently does not support datacenters: %s", rawUpstream) + return nil, fmt.Errorf("destination currently does not support datacenters: %s", rawUpstream) default: - return &pbmesh.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream) + return nil, fmt.Errorf("destination structured incorrectly: %s", rawUpstream) } fallthrough case 6: if strings.TrimSpace(pieces[5]) == "ns" { namespace = strings.TrimSpace(pieces[4]) } else { - return &pbmesh.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream) + return nil, fmt.Errorf("destination structured incorrectly: %s", rawUpstream) } fallthrough case 4: if strings.TrimSpace(pieces[3]) == "svc" { svcName = strings.TrimSpace(pieces[2]) } else { - return &pbmesh.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream) + return nil, fmt.Errorf("destination structured incorrectly: %s", rawUpstream) } if strings.TrimSpace(pieces[1]) == "port" { portName = strings.TrimSpace(pieces[0]) } else { - return &pbmesh.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream) + return nil, fmt.Errorf("destination structured incorrectly: %s", rawUpstream) } default: - return &pbmesh.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream) + return nil, fmt.Errorf("destination structured incorrectly: %s", rawUpstream) } } else { switch len(pieces) { @@ -148,13 +148,13 @@ func processPodLabeledUpstream(pod corev1.Pod, rawUpstream string, enablePartiti case "peer": // TODO: uncomment and remove error when peers supported //peer = strings.TrimSpace(pieces[4]) - return &pbmesh.Upstream{}, fmt.Errorf("upstream currently does not support peers: %s", rawUpstream) + return nil, fmt.Errorf("destination currently does not support peers: %s", rawUpstream) case "dc": // TODO: uncomment and remove error when datacenter supported //datacenter = strings.TrimSpace(pieces[4]) - return &pbmesh.Upstream{}, fmt.Errorf("upstream currently does not support datacenters: %s", rawUpstream) + return nil, fmt.Errorf("destination currently does not support datacenters: %s", rawUpstream) default: - return &pbmesh.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream) + return nil, fmt.Errorf("destination structured incorrectly: %s", rawUpstream) } // TODO: uncomment and remove error when datacenter and/or peers supported //fallthrough @@ -162,19 +162,19 @@ func processPodLabeledUpstream(pod corev1.Pod, rawUpstream string, enablePartiti if strings.TrimSpace(pieces[3]) == "svc" { svcName = strings.TrimSpace(pieces[2]) } else { - return &pbmesh.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream) + return nil, fmt.Errorf("destination structured incorrectly: %s", rawUpstream) } if strings.TrimSpace(pieces[1]) == "port" { portName = strings.TrimSpace(pieces[0]) } else { - return &pbmesh.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream) + return nil, fmt.Errorf("destination structured incorrectly: %s", rawUpstream) } default: - return &pbmesh.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream) + return nil, fmt.Errorf("destination structured incorrectly: %s", rawUpstream) } } - upstream := pbmesh.Upstream{ + destination := pbmesh.Destination{ DestinationRef: &pbresource.Reference{ Type: pbcatalog.ServiceType, Tenancy: &pbresource.Tenancy{ @@ -186,7 +186,7 @@ func processPodLabeledUpstream(pod corev1.Pod, rawUpstream string, enablePartiti }, DestinationPort: portName, Datacenter: datacenter, - ListenAddr: &pbmesh.Upstream_IpPort{ + ListenAddr: &pbmesh.Destination_IpPort{ IpPort: &pbmesh.IPPortAddress{ Port: uint32(port), Ip: ConsulNodeAddress, @@ -194,24 +194,24 @@ func processPodLabeledUpstream(pod corev1.Pod, rawUpstream string, enablePartiti }, } - return &upstream, nil + return &destination, nil } -// processPodUnlabeledUpstream processes an upstream in the format: +// processPodUnlabeledDestination processes a destination in the format: // [service-port-name].[service-name].[service-namespace].[service-partition]:[port]:[optional datacenter]. // There is no unlabeled field for peering. // TODO: enable dc and peer support when ready, currently return errors if set. -func processPodUnlabeledUpstream(pod corev1.Pod, rawUpstream string, enablePartitions, enableNamespaces bool) (*pbmesh.Upstream, error) { +func processPodUnlabeledDestination(pod corev1.Pod, rawUpstream string, enablePartitions, enableNamespaces bool) (*pbmesh.Destination, error) { var portName, datacenter, svcName, namespace, partition string var port int32 - var upstream pbmesh.Upstream + var destination pbmesh.Destination parts := strings.SplitN(rawUpstream, ":", 3) port, _ = PortValue(pod, strings.TrimSpace(parts[1])) // If Consul Namespaces or Admin Partitions are enabled, attempt to parse the - // upstream for a namespace. + // destination for a namespace. if enableNamespaces || enablePartitions { pieces := strings.SplitN(parts[0], ".", 4) switch len(pieces) { @@ -225,12 +225,12 @@ func processPodUnlabeledUpstream(pod corev1.Pod, rawUpstream string, enableParti svcName = strings.TrimSpace(pieces[1]) portName = strings.TrimSpace(pieces[0]) default: - return &pbmesh.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream) + return nil, fmt.Errorf("destination structured incorrectly: %s", rawUpstream) } } else { pieces := strings.SplitN(parts[0], ".", 2) if len(pieces) < 2 { - return &pbmesh.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream) + return nil, fmt.Errorf("destination structured incorrectly: %s", rawUpstream) } svcName = strings.TrimSpace(pieces[1]) portName = strings.TrimSpace(pieces[0]) @@ -240,11 +240,11 @@ func processPodUnlabeledUpstream(pod corev1.Pod, rawUpstream string, enableParti if len(parts) > 2 { // TODO: uncomment and remove error when datacenters supported //datacenter = strings.TrimSpace(parts[2]) - return &pbmesh.Upstream{}, fmt.Errorf("upstream currently does not support datacenters: %s", rawUpstream) + return nil, fmt.Errorf("destination currently does not support datacenters: %s", rawUpstream) } if port > 0 { - upstream = pbmesh.Upstream{ + destination = pbmesh.Destination{ DestinationRef: &pbresource.Reference{ Type: pbcatalog.ServiceType, Tenancy: &pbresource.Tenancy{ @@ -256,7 +256,7 @@ func processPodUnlabeledUpstream(pod corev1.Pod, rawUpstream string, enableParti }, DestinationPort: portName, Datacenter: datacenter, - ListenAddr: &pbmesh.Upstream_IpPort{ + ListenAddr: &pbmesh.Destination_IpPort{ IpPort: &pbmesh.IPPortAddress{ Port: uint32(port), Ip: ConsulNodeAddress, @@ -264,5 +264,5 @@ func processPodUnlabeledUpstream(pod corev1.Pod, rawUpstream string, enableParti }, } } - return &upstream, nil + return &destination, nil } diff --git a/control-plane/connect-inject/common/annotation_processor_test.go b/control-plane/connect-inject/common/annotation_processor_test.go index f5d2788488..223067e6c5 100644 --- a/control-plane/connect-inject/common/annotation_processor_test.go +++ b/control-plane/connect-inject/common/annotation_processor_test.go @@ -7,7 +7,6 @@ import ( "testing" "github.com/google/go-cmp/cmp" - "github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants" "github.com/hashicorp/consul/api" pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1" pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1" @@ -16,6 +15,8 @@ import ( "google.golang.org/protobuf/testing/protocmp" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants" ) func TestProcessUpstreams(t *testing.T) { @@ -26,7 +27,7 @@ func TestProcessUpstreams(t *testing.T) { cases := []struct { name string pod func() *corev1.Pod - expected *pbmesh.Upstreams + expected *pbmesh.Destinations expErr string configEntry func() api.ConfigEntry consulUnavailable bool @@ -34,16 +35,16 @@ func TestProcessUpstreams(t *testing.T) { consulPartitionsEnabled bool }{ { - name: "labeled annotated upstream with svc only", + name: "labeled annotated destination with svc only", pod: func() *corev1.Pod { pod1 := createPod(podName, "myPort.port.upstream1.svc:1234") return pod1 }, - expected: &pbmesh.Upstreams{ + expected: &pbmesh.Destinations{ Workloads: &pbcatalog.WorkloadSelector{ Names: []string{podName}, }, - Upstreams: []*pbmesh.Upstream{ + Destinations: []*pbmesh.Destination{ { DestinationRef: &pbresource.Reference{ Type: pbcatalog.ServiceType, @@ -56,7 +57,7 @@ func TestProcessUpstreams(t *testing.T) { }, DestinationPort: "myPort", Datacenter: "", - ListenAddr: &pbmesh.Upstream_IpPort{ + ListenAddr: &pbmesh.Destination_IpPort{ IpPort: &pbmesh.IPPortAddress{ Port: uint32(1234), Ip: ConsulNodeAddress, @@ -69,18 +70,18 @@ func TestProcessUpstreams(t *testing.T) { consulPartitionsEnabled: false, }, { - name: "labeled annotated upstream with svc and dc", + name: "labeled annotated destination with svc and dc", pod: func() *corev1.Pod { pod1 := createPod(podName, "myPort.port.upstream1.svc.dc1.dc:1234") return pod1 }, - expErr: "upstream currently does not support datacenters: myPort.port.upstream1.svc.dc1.dc:1234", + expErr: "destination currently does not support datacenters: myPort.port.upstream1.svc.dc1.dc:1234", // TODO: uncomment this and remove expErr when datacenters is supported - //expected: &pbmesh.Upstreams{ + //expected: &pbmesh.Destinations{ // Workloads: &pbcatalog.WorkloadSelector{ // Names: []string{podName}, // }, - // Upstreams: []*pbmesh.Upstream{ + // Upstreams: []*pbmesh.Destination{ // { // DestinationRef: &pbresource.Reference{ // Type: pbcatalog.ServiceType, @@ -93,7 +94,7 @@ func TestProcessUpstreams(t *testing.T) { // }, // DestinationPort: "myPort", // Datacenter: "dc1", - // ListenAddr: &pbmesh.Upstream_IpPort{ + // ListenAddr: &pbmesh.Destination_IpPort{ // IpPort: &pbmesh.IPPortAddress{ // Port: uint32(1234), // Ip: ConsulNodeAddress, @@ -106,18 +107,18 @@ func TestProcessUpstreams(t *testing.T) { consulPartitionsEnabled: false, }, { - name: "labeled annotated upstream with svc and peer", + name: "labeled annotated destination with svc and peer", pod: func() *corev1.Pod { pod1 := createPod(podName, "myPort.port.upstream1.svc.peer1.peer:1234") return pod1 }, - expErr: "upstream currently does not support peers: myPort.port.upstream1.svc.peer1.peer:1234", + expErr: "destination currently does not support peers: myPort.port.upstream1.svc.peer1.peer:1234", // TODO: uncomment this and remove expErr when peers is supported - //expected: &pbmesh.Upstreams{ + //expected: &pbmesh.Destinations{ // Workloads: &pbcatalog.WorkloadSelector{ // Names: []string{podName}, // }, - // Upstreams: []*pbmesh.Upstream{ + // Upstreams: []*pbmesh.Destination{ // { // DestinationRef: &pbresource.Reference{ // Type: pbcatalog.ServiceType, @@ -130,7 +131,7 @@ func TestProcessUpstreams(t *testing.T) { // }, // DestinationPort: "myPort", // Datacenter: "", - // ListenAddr: &pbmesh.Upstream_IpPort{ + // ListenAddr: &pbmesh.Destination_IpPort{ // IpPort: &pbmesh.IPPortAddress{ // Port: uint32(1234), // Ip: ConsulNodeAddress, @@ -143,18 +144,18 @@ func TestProcessUpstreams(t *testing.T) { consulPartitionsEnabled: false, }, { - name: "labeled annotated upstream with svc, ns, and peer", + name: "labeled annotated destination with svc, ns, and peer", pod: func() *corev1.Pod { pod1 := createPod(podName, "myPort.port.upstream1.svc.ns1.ns.peer1.peer:1234") return pod1 }, - expErr: "upstream currently does not support peers: myPort.port.upstream1.svc.ns1.ns.peer1.peer:1234", + expErr: "destination currently does not support peers: myPort.port.upstream1.svc.ns1.ns.peer1.peer:1234", // TODO: uncomment this and remove expErr when peers is supported - //expected: &pbmesh.Upstreams{ + //expected: &pbmesh.Destinations{ // Workloads: &pbcatalog.WorkloadSelector{ // Names: []string{podName}, // }, - // Upstreams: []*pbmesh.Upstream{ + // Upstreams: []*pbmesh.Destination{ // { // DestinationRef: &pbresource.Reference{ // Type: pbcatalog.ServiceType, @@ -167,7 +168,7 @@ func TestProcessUpstreams(t *testing.T) { // }, // DestinationPort: "myPort", // Datacenter: "", - // ListenAddr: &pbmesh.Upstream_IpPort{ + // ListenAddr: &pbmesh.Destination_IpPort{ // IpPort: &pbmesh.IPPortAddress{ // Port: uint32(1234), // Ip: ConsulNodeAddress, @@ -180,16 +181,16 @@ func TestProcessUpstreams(t *testing.T) { consulPartitionsEnabled: false, }, { - name: "labeled annotated upstream with svc, ns, and partition", + name: "labeled annotated destination with svc, ns, and partition", pod: func() *corev1.Pod { pod1 := createPod(podName, "myPort.port.upstream1.svc.ns1.ns.part1.ap:1234") return pod1 }, - expected: &pbmesh.Upstreams{ + expected: &pbmesh.Destinations{ Workloads: &pbcatalog.WorkloadSelector{ Names: []string{podName}, }, - Upstreams: []*pbmesh.Upstream{ + Destinations: []*pbmesh.Destination{ { DestinationRef: &pbresource.Reference{ Type: pbcatalog.ServiceType, @@ -202,7 +203,7 @@ func TestProcessUpstreams(t *testing.T) { }, DestinationPort: "myPort", Datacenter: "", - ListenAddr: &pbmesh.Upstream_IpPort{ + ListenAddr: &pbmesh.Destination_IpPort{ IpPort: &pbmesh.IPPortAddress{ Port: uint32(1234), Ip: ConsulNodeAddress, @@ -215,18 +216,18 @@ func TestProcessUpstreams(t *testing.T) { consulPartitionsEnabled: true, }, { - name: "labeled annotated upstream with svc, ns, and dc", + name: "labeled annotated destination with svc, ns, and dc", pod: func() *corev1.Pod { pod1 := createPod(podName, "myPort.port.upstream1.svc.ns1.ns.dc1.dc:1234") return pod1 }, - expErr: "upstream currently does not support datacenters: myPort.port.upstream1.svc.ns1.ns.dc1.dc:1234", + expErr: "destination currently does not support datacenters: myPort.port.upstream1.svc.ns1.ns.dc1.dc:1234", // TODO: uncomment this and remove expErr when datacenters is supported - //expected: &pbmesh.Upstreams{ + //expected: &pbmesh.Destinations{ // Workloads: &pbcatalog.WorkloadSelector{ // Names: []string{podName}, // }, - // Upstreams: []*pbmesh.Upstream{ + // Upstreams: []*pbmesh.Destination{ // { // DestinationRef: &pbresource.Reference{ // Type: pbcatalog.ServiceType, @@ -239,7 +240,7 @@ func TestProcessUpstreams(t *testing.T) { // }, // DestinationPort: "myPort", // Datacenter: "dc1", - // ListenAddr: &pbmesh.Upstream_IpPort{ + // ListenAddr: &pbmesh.Destination_IpPort{ // IpPort: &pbmesh.IPPortAddress{ // Port: uint32(1234), // Ip: ConsulNodeAddress, @@ -252,16 +253,16 @@ func TestProcessUpstreams(t *testing.T) { consulPartitionsEnabled: false, }, { - name: "labeled multiple annotated upstreams", + name: "labeled multiple annotated destinations", pod: func() *corev1.Pod { pod1 := createPod(podName, "myPort.port.upstream1.svc.ns1.ns:1234, myPort2.port.upstream2.svc:2234, myPort4.port.upstream4.svc.ns1.ns.ap1.ap:4234") return pod1 }, - expected: &pbmesh.Upstreams{ + expected: &pbmesh.Destinations{ Workloads: &pbcatalog.WorkloadSelector{ Names: []string{podName}, }, - Upstreams: []*pbmesh.Upstream{ + Destinations: []*pbmesh.Destination{ { DestinationRef: &pbresource.Reference{ Type: pbcatalog.ServiceType, @@ -274,7 +275,7 @@ func TestProcessUpstreams(t *testing.T) { }, DestinationPort: "myPort", Datacenter: "", - ListenAddr: &pbmesh.Upstream_IpPort{ + ListenAddr: &pbmesh.Destination_IpPort{ IpPort: &pbmesh.IPPortAddress{ Port: uint32(1234), Ip: ConsulNodeAddress, @@ -293,7 +294,7 @@ func TestProcessUpstreams(t *testing.T) { }, DestinationPort: "myPort2", Datacenter: "", - ListenAddr: &pbmesh.Upstream_IpPort{ + ListenAddr: &pbmesh.Destination_IpPort{ IpPort: &pbmesh.IPPortAddress{ Port: uint32(2234), Ip: ConsulNodeAddress, @@ -312,7 +313,7 @@ func TestProcessUpstreams(t *testing.T) { }, DestinationPort: "myPort4", Datacenter: "", - ListenAddr: &pbmesh.Upstream_IpPort{ + ListenAddr: &pbmesh.Destination_IpPort{ IpPort: &pbmesh.IPPortAddress{ Port: uint32(4234), Ip: ConsulNodeAddress, @@ -325,18 +326,18 @@ func TestProcessUpstreams(t *testing.T) { consulPartitionsEnabled: true, }, { - name: "labeled multiple annotated upstreams with dcs and peers", + name: "labeled multiple annotated destinations with dcs and peers", pod: func() *corev1.Pod { pod1 := createPod(podName, "myPort.port.upstream1.svc.ns1.ns.dc1.dc:1234, myPort2.port.upstream2.svc:2234, myPort3.port.upstream3.svc.ns1.ns:3234, myPort4.port.upstream4.svc.ns1.ns.peer1.peer:4234") return pod1 }, - expErr: "upstream currently does not support datacenters: myPort.port.upstream1.svc.ns1.ns.dc1.dc:1234", + expErr: "destination currently does not support datacenters: myPort.port.upstream1.svc.ns1.ns.dc1.dc:1234", // TODO: uncomment this and remove expErr when datacenters is supported - //expected: &pbmesh.Upstreams{ + //expected: &pbmesh.Destinations{ // Workloads: &pbcatalog.WorkloadSelector{ // Names: []string{podName}, // }, - // Upstreams: []*pbmesh.Upstream{ + // Upstreams: []*pbmesh.Destination{ // { // DestinationRef: &pbresource.Reference{ // Type: pbcatalog.ServiceType, @@ -349,7 +350,7 @@ func TestProcessUpstreams(t *testing.T) { // }, // DestinationPort: "myPort", // Datacenter: "dc1", - // ListenAddr: &pbmesh.Upstream_IpPort{ + // ListenAddr: &pbmesh.Destination_IpPort{ // IpPort: &pbmesh.IPPortAddress{ // Port: uint32(1234), // Ip: ConsulNodeAddress, @@ -368,7 +369,7 @@ func TestProcessUpstreams(t *testing.T) { // }, // DestinationPort: "myPort2", // Datacenter: "", - // ListenAddr: &pbmesh.Upstream_IpPort{ + // ListenAddr: &pbmesh.Destination_IpPort{ // IpPort: &pbmesh.IPPortAddress{ // Port: uint32(2234), // Ip: ConsulNodeAddress, @@ -387,7 +388,7 @@ func TestProcessUpstreams(t *testing.T) { // }, // DestinationPort: "myPort3", // Datacenter: "", - // ListenAddr: &pbmesh.Upstream_IpPort{ + // ListenAddr: &pbmesh.Destination_IpPort{ // IpPort: &pbmesh.IPPortAddress{ // Port: uint32(3234), // Ip: ConsulNodeAddress, @@ -406,7 +407,7 @@ func TestProcessUpstreams(t *testing.T) { // }, // DestinationPort: "myPort4", // Datacenter: "", - // ListenAddr: &pbmesh.Upstream_IpPort{ + // ListenAddr: &pbmesh.Destination_IpPort{ // IpPort: &pbmesh.IPPortAddress{ // Port: uint32(4234), // Ip: ConsulNodeAddress, @@ -419,132 +420,132 @@ func TestProcessUpstreams(t *testing.T) { consulPartitionsEnabled: true, }, { - name: "error labeled annotated upstream error: invalid partition/dc/peer", + name: "error labeled annotated destination error: invalid partition/dc/peer", pod: func() *corev1.Pod { pod1 := createPod(podName, "myPort.port.upstream1.svc.ns1.ns.part1.err:1234") return pod1 }, - expErr: "upstream structured incorrectly: myPort.port.upstream1.svc.ns1.ns.part1.err:1234", + expErr: "destination structured incorrectly: myPort.port.upstream1.svc.ns1.ns.part1.err:1234", consulNamespacesEnabled: true, consulPartitionsEnabled: false, }, { - name: "error labeled annotated upstream with svc and peer, needs ns before peer if namespaces enabled", + name: "error labeled annotated destination with svc and peer, needs ns before peer if namespaces enabled", pod: func() *corev1.Pod { pod1 := createPod(podName, "myPort.port.upstream1.svc.peer1.peer:1234") return pod1 }, - expErr: "upstream structured incorrectly: myPort.port.upstream1.svc.peer1.peer:1234", + expErr: "destination structured incorrectly: myPort.port.upstream1.svc.peer1.peer:1234", consulNamespacesEnabled: true, consulPartitionsEnabled: false, }, { - name: "error labeled annotated upstream error: invalid namespace", + name: "error labeled annotated destination error: invalid namespace", pod: func() *corev1.Pod { pod1 := createPod(podName, "myPort.port.upstream1.svc.ns1.err:1234") return pod1 }, - expErr: "upstream structured incorrectly: myPort.port.upstream1.svc.ns1.err:1234", + expErr: "destination structured incorrectly: myPort.port.upstream1.svc.ns1.err:1234", consulNamespacesEnabled: true, consulPartitionsEnabled: false, }, { - name: "error labeled annotated upstream error: invalid number of pieces in the address", + name: "error labeled annotated destination error: invalid number of pieces in the address", pod: func() *corev1.Pod { pod1 := createPod(podName, "myPort.port.upstream1.svc.err:1234") return pod1 }, - expErr: "upstream structured incorrectly: myPort.port.upstream1.svc.err:1234", + expErr: "destination structured incorrectly: myPort.port.upstream1.svc.err:1234", consulNamespacesEnabled: true, consulPartitionsEnabled: false, }, { - name: "error labeled annotated upstream error: invalid peer", + name: "error labeled annotated destination error: invalid peer", pod: func() *corev1.Pod { pod1 := createPod(podName, "myPort.port.upstream1.svc.peer1.err:1234") return pod1 }, - expErr: "upstream structured incorrectly: myPort.port.upstream1.svc.peer1.err:1234", + expErr: "destination structured incorrectly: myPort.port.upstream1.svc.peer1.err:1234", consulNamespacesEnabled: false, consulPartitionsEnabled: false, }, { - name: "error labeled annotated upstream error: invalid number of pieces in the address without namespaces and partitions", + name: "error labeled annotated destination error: invalid number of pieces in the address without namespaces and partitions", pod: func() *corev1.Pod { pod1 := createPod(podName, "myPort.port.upstream1.svc.err:1234") return pod1 }, - expErr: "upstream structured incorrectly: myPort.port.upstream1.svc.err:1234", + expErr: "destination structured incorrectly: myPort.port.upstream1.svc.err:1234", consulNamespacesEnabled: false, consulPartitionsEnabled: false, }, { - name: "error labeled annotated upstream error: both peer and partition provided", + name: "error labeled annotated destination error: both peer and partition provided", pod: func() *corev1.Pod { pod1 := createPod(podName, "myPort.port.upstream1.svc.ns1.ns.part1.partition.peer1.peer:1234") return pod1 }, - expErr: "upstream structured incorrectly: myPort.port.upstream1.svc.ns1.ns.part1.partition.peer1.peer:1234", + expErr: "destination structured incorrectly: myPort.port.upstream1.svc.ns1.ns.part1.partition.peer1.peer:1234", consulNamespacesEnabled: true, consulPartitionsEnabled: true, }, { - name: "error labeled annotated upstream error: both peer and dc provided", + name: "error labeled annotated destination error: both peer and dc provided", pod: func() *corev1.Pod { pod1 := createPod(podName, "myPort.port.upstream1.svc.ns1.ns.peer1.peer.dc1.dc:1234") return pod1 }, - expErr: "upstream structured incorrectly: myPort.port.upstream1.svc.ns1.ns.peer1.peer.dc1.dc:1234", + expErr: "destination structured incorrectly: myPort.port.upstream1.svc.ns1.ns.peer1.peer.dc1.dc:1234", consulNamespacesEnabled: true, consulPartitionsEnabled: false, }, { - name: "error labeled annotated upstream error: both dc and partition provided", + name: "error labeled annotated destination error: both dc and partition provided", pod: func() *corev1.Pod { pod1 := createPod(podName, "myPort.port.upstream1.svc.ns1.ns.part1.partition.dc1.dc:1234") return pod1 }, - expErr: "upstream structured incorrectly: myPort.port.upstream1.svc.ns1.ns.part1.partition.dc1.dc:1234", + expErr: "destination structured incorrectly: myPort.port.upstream1.svc.ns1.ns.part1.partition.dc1.dc:1234", consulNamespacesEnabled: true, consulPartitionsEnabled: true, }, { - name: "error labeled annotated upstream error: wrong ordering for port and svc with namespace partition enabled", + name: "error labeled annotated destination error: wrong ordering for port and svc with namespace partition enabled", pod: func() *corev1.Pod { pod1 := createPod(podName, "upstream1.svc.myPort.port.ns1.ns.part1.partition.dc1.dc:1234") return pod1 }, - expErr: "upstream structured incorrectly: upstream1.svc.myPort.port.ns1.ns.part1.partition.dc1.dc:1234", + expErr: "destination structured incorrectly: upstream1.svc.myPort.port.ns1.ns.part1.partition.dc1.dc:1234", consulNamespacesEnabled: true, consulPartitionsEnabled: true, }, { - name: "error labeled annotated upstream error: wrong ordering for port and svc with namespace partition disabled", + name: "error labeled annotated destination error: wrong ordering for port and svc with namespace partition disabled", pod: func() *corev1.Pod { pod1 := createPod(podName, "upstream1.svc.myPort.port:1234") return pod1 }, - expErr: "upstream structured incorrectly: upstream1.svc.myPort.port:1234", + expErr: "destination structured incorrectly: upstream1.svc.myPort.port:1234", consulNamespacesEnabled: false, consulPartitionsEnabled: false, }, { - name: "error labeled annotated upstream error: incorrect key name namespace partition enabled", + name: "error labeled annotated destination error: incorrect key name namespace partition enabled", pod: func() *corev1.Pod { pod1 := createPod(podName, "myPort.portage.upstream1.svc.ns1.ns.part1.partition.dc1.dc:1234") return pod1 }, - expErr: "upstream structured incorrectly: myPort.portage.upstream1.svc.ns1.ns.part1.partition.dc1.dc:1234", + expErr: "destination structured incorrectly: myPort.portage.upstream1.svc.ns1.ns.part1.partition.dc1.dc:1234", consulNamespacesEnabled: true, consulPartitionsEnabled: true, }, { - name: "error labeled annotated upstream error: incorrect key name namespace partition disabled", + name: "error labeled annotated destination error: incorrect key name namespace partition disabled", pod: func() *corev1.Pod { pod1 := createPod(podName, "myPort.portage.upstream1.svc:1234") return pod1 }, - expErr: "upstream structured incorrectly: myPort.portage.upstream1.svc:1234", + expErr: "destination structured incorrectly: myPort.portage.upstream1.svc:1234", consulNamespacesEnabled: false, consulPartitionsEnabled: false, }, @@ -554,7 +555,7 @@ func TestProcessUpstreams(t *testing.T) { pod1 := createPod(podName, "upstream1.svc:1234") return pod1 }, - expErr: "upstream structured incorrectly: upstream1.svc:1234", + expErr: "destination structured incorrectly: upstream1.svc:1234", consulNamespacesEnabled: false, consulPartitionsEnabled: false, }, @@ -564,21 +565,21 @@ func TestProcessUpstreams(t *testing.T) { pod1 := createPod(podName, "upstream1.svc:1234") return pod1 }, - expErr: "upstream structured incorrectly: upstream1.svc:1234", + expErr: "destination structured incorrectly: upstream1.svc:1234", consulNamespacesEnabled: true, consulPartitionsEnabled: true, }, { - name: "unlabeled and labeled multiple annotated upstreams", + name: "unlabeled and labeled multiple annotated destinations", pod: func() *corev1.Pod { pod1 := createPod(podName, "myPort.port.upstream1.svc.ns1.ns:1234, myPort2.upstream2:2234, myPort4.port.upstream4.svc.ns1.ns.ap1.ap:4234") return pod1 }, - expected: &pbmesh.Upstreams{ + expected: &pbmesh.Destinations{ Workloads: &pbcatalog.WorkloadSelector{ Names: []string{podName}, }, - Upstreams: []*pbmesh.Upstream{ + Destinations: []*pbmesh.Destination{ { DestinationRef: &pbresource.Reference{ Type: pbcatalog.ServiceType, @@ -591,7 +592,7 @@ func TestProcessUpstreams(t *testing.T) { }, DestinationPort: "myPort", Datacenter: "", - ListenAddr: &pbmesh.Upstream_IpPort{ + ListenAddr: &pbmesh.Destination_IpPort{ IpPort: &pbmesh.IPPortAddress{ Port: uint32(1234), Ip: ConsulNodeAddress, @@ -610,7 +611,7 @@ func TestProcessUpstreams(t *testing.T) { }, DestinationPort: "myPort2", Datacenter: "", - ListenAddr: &pbmesh.Upstream_IpPort{ + ListenAddr: &pbmesh.Destination_IpPort{ IpPort: &pbmesh.IPPortAddress{ Port: uint32(2234), Ip: ConsulNodeAddress, @@ -629,7 +630,7 @@ func TestProcessUpstreams(t *testing.T) { }, DestinationPort: "myPort4", Datacenter: "", - ListenAddr: &pbmesh.Upstream_IpPort{ + ListenAddr: &pbmesh.Destination_IpPort{ IpPort: &pbmesh.IPPortAddress{ Port: uint32(4234), Ip: ConsulNodeAddress, @@ -642,16 +643,16 @@ func TestProcessUpstreams(t *testing.T) { consulPartitionsEnabled: true, }, { - name: "unlabeled single upstream", + name: "unlabeled single destination", pod: func() *corev1.Pod { pod1 := createPod(podName, "myPort.upstream:1234") return pod1 }, - expected: &pbmesh.Upstreams{ + expected: &pbmesh.Destinations{ Workloads: &pbcatalog.WorkloadSelector{ Names: []string{podName}, }, - Upstreams: []*pbmesh.Upstream{ + Destinations: []*pbmesh.Destination{ { DestinationRef: &pbresource.Reference{ Type: pbcatalog.ServiceType, @@ -664,7 +665,7 @@ func TestProcessUpstreams(t *testing.T) { }, DestinationPort: "myPort", Datacenter: "", - ListenAddr: &pbmesh.Upstream_IpPort{ + ListenAddr: &pbmesh.Destination_IpPort{ IpPort: &pbmesh.IPPortAddress{ Port: uint32(1234), Ip: ConsulNodeAddress, @@ -677,16 +678,16 @@ func TestProcessUpstreams(t *testing.T) { consulPartitionsEnabled: false, }, { - name: "unlabeled single upstream with namespace", + name: "unlabeled single destination with namespace", pod: func() *corev1.Pod { pod1 := createPod(podName, "myPort.upstream.foo:1234") return pod1 }, - expected: &pbmesh.Upstreams{ + expected: &pbmesh.Destinations{ Workloads: &pbcatalog.WorkloadSelector{ Names: []string{podName}, }, - Upstreams: []*pbmesh.Upstream{ + Destinations: []*pbmesh.Destination{ { DestinationRef: &pbresource.Reference{ Type: pbcatalog.ServiceType, @@ -699,7 +700,7 @@ func TestProcessUpstreams(t *testing.T) { }, DestinationPort: "myPort", Datacenter: "", - ListenAddr: &pbmesh.Upstream_IpPort{ + ListenAddr: &pbmesh.Destination_IpPort{ IpPort: &pbmesh.IPPortAddress{ Port: uint32(1234), Ip: ConsulNodeAddress, @@ -712,16 +713,16 @@ func TestProcessUpstreams(t *testing.T) { consulPartitionsEnabled: false, }, { - name: "unlabeled single upstream with namespace and partition", + name: "unlabeled single destination with namespace and partition", pod: func() *corev1.Pod { pod1 := createPod(podName, "myPort.upstream.foo.bar:1234") return pod1 }, - expected: &pbmesh.Upstreams{ + expected: &pbmesh.Destinations{ Workloads: &pbcatalog.WorkloadSelector{ Names: []string{podName}, }, - Upstreams: []*pbmesh.Upstream{ + Destinations: []*pbmesh.Destination{ { DestinationRef: &pbresource.Reference{ Type: pbcatalog.ServiceType, @@ -734,7 +735,7 @@ func TestProcessUpstreams(t *testing.T) { }, DestinationPort: "myPort", Datacenter: "", - ListenAddr: &pbmesh.Upstream_IpPort{ + ListenAddr: &pbmesh.Destination_IpPort{ IpPort: &pbmesh.IPPortAddress{ Port: uint32(1234), Ip: ConsulNodeAddress, @@ -747,16 +748,16 @@ func TestProcessUpstreams(t *testing.T) { consulPartitionsEnabled: true, }, { - name: "unlabeled multiple upstreams", + name: "unlabeled multiple destinations", pod: func() *corev1.Pod { pod1 := createPod(podName, "myPort.upstream1:1234, myPort2.upstream2:2234") return pod1 }, - expected: &pbmesh.Upstreams{ + expected: &pbmesh.Destinations{ Workloads: &pbcatalog.WorkloadSelector{ Names: []string{podName}, }, - Upstreams: []*pbmesh.Upstream{ + Destinations: []*pbmesh.Destination{ { DestinationRef: &pbresource.Reference{ Type: pbcatalog.ServiceType, @@ -769,7 +770,7 @@ func TestProcessUpstreams(t *testing.T) { }, DestinationPort: "myPort", Datacenter: "", - ListenAddr: &pbmesh.Upstream_IpPort{ + ListenAddr: &pbmesh.Destination_IpPort{ IpPort: &pbmesh.IPPortAddress{ Port: uint32(1234), Ip: ConsulNodeAddress, @@ -788,7 +789,7 @@ func TestProcessUpstreams(t *testing.T) { }, DestinationPort: "myPort2", Datacenter: "", - ListenAddr: &pbmesh.Upstream_IpPort{ + ListenAddr: &pbmesh.Destination_IpPort{ IpPort: &pbmesh.IPPortAddress{ Port: uint32(2234), Ip: ConsulNodeAddress, @@ -801,7 +802,7 @@ func TestProcessUpstreams(t *testing.T) { consulPartitionsEnabled: false, }, { - name: "unlabeled multiple upstreams with consul namespaces, partitions and datacenters", + name: "unlabeled multiple destinations with consul namespaces, partitions and datacenters", pod: func() *corev1.Pod { pod1 := createPod(podName, "myPort.upstream1:1234, myPort2.upstream2.bar:2234, myPort3.upstream3.foo.baz:3234:dc2") return pod1 @@ -812,13 +813,13 @@ func TestProcessUpstreams(t *testing.T) { pd.MeshGateway.Mode = "remote" return pd }, - expErr: "upstream currently does not support datacenters: myPort3.upstream3.foo.baz:3234:dc2", + expErr: "destination currently does not support datacenters: myPort3.upstream3.foo.baz:3234:dc2", // TODO: uncomment this and remove expErr when datacenters is supported - //expected: &pbmesh.Upstreams{ + //expected: &pbmesh.Destinations{ // Workloads: &pbcatalog.WorkloadSelector{ // Names: []string{podName}, // }, - // Upstreams: []*pbmesh.Upstream{ + // Upstreams: []*pbmesh.Destination{ // { // DestinationRef: &pbresource.Reference{ // Type: pbcatalog.ServiceType, @@ -831,7 +832,7 @@ func TestProcessUpstreams(t *testing.T) { // }, // DestinationPort: "myPort", // Datacenter: "", - // ListenAddr: &pbmesh.Upstream_IpPort{ + // ListenAddr: &pbmesh.Destination_IpPort{ // IpPort: &pbmesh.IPPortAddress{ // Port: uint32(1234), // Ip: ConsulNodeAddress, @@ -850,7 +851,7 @@ func TestProcessUpstreams(t *testing.T) { // }, // DestinationPort: "myPort2", // Datacenter: "", - // ListenAddr: &pbmesh.Upstream_IpPort{ + // ListenAddr: &pbmesh.Destination_IpPort{ // IpPort: &pbmesh.IPPortAddress{ // Port: uint32(2234), // Ip: ConsulNodeAddress, @@ -869,7 +870,7 @@ func TestProcessUpstreams(t *testing.T) { // }, // DestinationPort: "myPort3", // Datacenter: "dc2", - // ListenAddr: &pbmesh.Upstream_IpPort{ + // ListenAddr: &pbmesh.Destination_IpPort{ // IpPort: &pbmesh.IPPortAddress{ // Port: uint32(3234), // Ip: ConsulNodeAddress, @@ -882,7 +883,7 @@ func TestProcessUpstreams(t *testing.T) { consulPartitionsEnabled: true, }, { - name: "unlabeled multiple upstreams with consul namespaces and datacenters", + name: "unlabeled multiple destinations with consul namespaces and datacenters", pod: func() *corev1.Pod { pod1 := createPod(podName, "myPort.upstream1:1234, myPort2.upstream2.bar:2234, myPort3.upstream3.foo:3234:dc2") return pod1 @@ -893,13 +894,13 @@ func TestProcessUpstreams(t *testing.T) { pd.MeshGateway.Mode = "remote" return pd }, - expErr: "upstream currently does not support datacenters: myPort3.upstream3.foo:3234:dc2", + expErr: "destination currently does not support datacenters: myPort3.upstream3.foo:3234:dc2", // TODO: uncomment this and remove expErr when datacenters is supported - //expected: &pbmesh.Upstreams{ + //expected: &pbmesh.Destinations{ // Workloads: &pbcatalog.WorkloadSelector{ // Names: []string{podName}, // }, - // Upstreams: []*pbmesh.Upstream{ + // Upstreams: []*pbmesh.Destination{ // { // DestinationRef: &pbresource.Reference{ // Type: pbcatalog.ServiceType, @@ -912,7 +913,7 @@ func TestProcessUpstreams(t *testing.T) { // }, // DestinationPort: "myPort", // Datacenter: "", - // ListenAddr: &pbmesh.Upstream_IpPort{ + // ListenAddr: &pbmesh.Destination_IpPort{ // IpPort: &pbmesh.IPPortAddress{ // Port: uint32(1234), // Ip: ConsulNodeAddress, @@ -931,7 +932,7 @@ func TestProcessUpstreams(t *testing.T) { // }, // DestinationPort: "myPort2", // Datacenter: "", - // ListenAddr: &pbmesh.Upstream_IpPort{ + // ListenAddr: &pbmesh.Destination_IpPort{ // IpPort: &pbmesh.IPPortAddress{ // Port: uint32(2234), // Ip: ConsulNodeAddress, @@ -950,7 +951,7 @@ func TestProcessUpstreams(t *testing.T) { // }, // DestinationPort: "myPort3", // Datacenter: "dc2", - // ListenAddr: &pbmesh.Upstream_IpPort{ + // ListenAddr: &pbmesh.Destination_IpPort{ // IpPort: &pbmesh.IPPortAddress{ // Port: uint32(3234), // Ip: ConsulNodeAddress, @@ -967,7 +968,7 @@ func TestProcessUpstreams(t *testing.T) { pod1 := createPod(podName, "upstream1:1234") return pod1 }, - expErr: "upstream structured incorrectly: upstream1:1234", + expErr: "destination structured incorrectly: upstream1:1234", consulNamespacesEnabled: false, consulPartitionsEnabled: false, }, @@ -977,21 +978,21 @@ func TestProcessUpstreams(t *testing.T) { pod1 := createPod(podName, "upstream1:1234") return pod1 }, - expErr: "upstream structured incorrectly: upstream1:1234", + expErr: "destination structured incorrectly: upstream1:1234", consulNamespacesEnabled: true, consulPartitionsEnabled: true, }, } for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { - upstreams, err := ProcessPodUpstreams(*tt.pod(), tt.consulNamespacesEnabled, tt.consulPartitionsEnabled) + destinations, err := ProcessPodDestinations(*tt.pod(), tt.consulNamespacesEnabled, tt.consulPartitionsEnabled) if tt.expErr != "" { require.EqualError(t, err, tt.expErr) } else { require.NoError(t, err) - require.Equal(t, tt.expected, upstreams) + require.Equal(t, tt.expected, destinations) - if diff := cmp.Diff(tt.expected, upstreams, protocmp.Transform()); diff != "" { + if diff := cmp.Diff(tt.expected, destinations, protocmp.Transform()); diff != "" { t.Errorf("unexpected difference:\n%v", diff) } } diff --git a/control-plane/connect-inject/constants/annotations_and_labels.go b/control-plane/connect-inject/constants/annotations_and_labels.go index 0bab084b1f..823776e577 100644 --- a/control-plane/connect-inject/constants/annotations_and_labels.go +++ b/control-plane/connect-inject/constants/annotations_and_labels.go @@ -234,7 +234,7 @@ const ( // of resources. ManagedByServiceAccountValue = "consul-k8s-service-account-controller" - // AnnotationMeshDestinations is a list of upstreams to register with the + // AnnotationMeshDestinations is a list of destinations to register with the // proxy. The service name should map to a Consul service name and the local // port is the local port in the pod that the listener will bind to. It can // be a named port. diff --git a/control-plane/connect-inject/controllers/pod/pod_controller.go b/control-plane/connect-inject/controllers/pod/pod_controller.go index b101b8c745..6a44c20f4c 100644 --- a/control-plane/connect-inject/controllers/pod/pod_controller.go +++ b/control-plane/connect-inject/controllers/pod/pod_controller.go @@ -110,8 +110,8 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu // TODO: clean up ACL Tokens - // Delete upstreams, if any exist - if err := r.deleteUpstreams(ctx, req.NamespacedName); err != nil { + // Delete destinations, if any exist + if err := r.deleteDestinations(ctx, req.NamespacedName); err != nil { errs = multierror.Append(errs, err) } @@ -152,8 +152,8 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu errs = multierror.Append(errs, err) } - // Create explicit upstreams (if any exist) - if err := r.writeUpstreams(ctx, pod); err != nil { + // Create explicit destinations (if any exist) + if err := r.writeDestinations(ctx, pod); err != nil { // Technically this is not needed, but keeping in case this gets refactored in // a different order if inject.ConsulNamespaceIsNotFound(err) { @@ -449,11 +449,11 @@ func (r *Controller) writeHealthStatus(ctx context.Context, pod corev1.Pod) erro // has been configured with and will only delete tokens for the provided podName. // func (r *Controller) deleteACLTokensForWorkload(apiClient *api.Client, svc *api.AgentService, k8sNS, podName string) error { -// writeUpstreams will write explicit upstreams if pod annotations exist. -func (r *Controller) writeUpstreams(ctx context.Context, pod corev1.Pod) error { - uss, err := inject.ProcessPodUpstreams(pod, r.EnableConsulPartitions, r.EnableConsulNamespaces) +// writeDestinations will write explicit destinations if pod annotations exist. +func (r *Controller) writeDestinations(ctx context.Context, pod corev1.Pod) error { + uss, err := inject.ProcessPodDestinations(pod, r.EnableConsulPartitions, r.EnableConsulNamespaces) if err != nil { - return fmt.Errorf("error processing upstream annotations: %s", err.Error()) + return fmt.Errorf("error processing destination annotations: %s", err.Error()) } if uss == nil { return nil @@ -462,7 +462,7 @@ func (r *Controller) writeUpstreams(ctx context.Context, pod corev1.Pod) error { data := inject.ToProtoAny(uss) req := &pbresource.WriteRequest{ Resource: &pbresource.Resource{ - Id: getUpstreamsID(pod.GetName(), r.getConsulNamespace(pod.Namespace), r.getPartition()), + Id: getDestinationsID(pod.GetName(), r.getConsulNamespace(pod.Namespace), r.getPartition()), Metadata: metaFromPod(pod), Data: data, }, @@ -472,9 +472,9 @@ func (r *Controller) writeUpstreams(ctx context.Context, pod corev1.Pod) error { return err } -func (r *Controller) deleteUpstreams(ctx context.Context, pod types.NamespacedName) error { +func (r *Controller) deleteDestinations(ctx context.Context, pod types.NamespacedName) error { req := &pbresource.DeleteRequest{ - Id: getUpstreamsID(pod.Name, r.getConsulNamespace(pod.Namespace), r.getPartition()), + Id: getDestinationsID(pod.Name, r.getConsulNamespace(pod.Namespace), r.getPartition()), } _, err := r.ResourceClient.Delete(ctx, req) @@ -637,10 +637,10 @@ func getHealthStatusID(name, namespace, partition string) *pbresource.ID { } } -func getUpstreamsID(name, namespace, partition string) *pbresource.ID { +func getDestinationsID(name, namespace, partition string) *pbresource.ID { return &pbresource.ID{ Name: name, - Type: pbmesh.UpstreamsType, + Type: pbmesh.DestinationsType, Tenancy: &pbresource.Tenancy{ Partition: partition, Namespace: namespace, diff --git a/control-plane/connect-inject/controllers/pod/pod_controller_ent_test.go b/control-plane/connect-inject/controllers/pod/pod_controller_ent_test.go index 9b0f45c3fe..94bce9b29e 100644 --- a/control-plane/connect-inject/controllers/pod/pod_controller_ent_test.go +++ b/control-plane/connect-inject/controllers/pod/pod_controller_ent_test.go @@ -59,14 +59,14 @@ type testCase struct { existingWorkload *pbcatalog.Workload existingHealthStatus *pbcatalog.HealthStatus existingProxyConfiguration *pbmesh.ProxyConfiguration - existingUpstreams *pbmesh.Upstreams + existingDestinations *pbmesh.Destinations // Expected Consul state. expectedConsulNamespace string // This namespace will be used to query Consul for the results expectedWorkload *pbcatalog.Workload expectedHealthStatus *pbcatalog.HealthStatus expectedProxyConfiguration *pbmesh.ProxyConfiguration - expectedUpstreams *pbmesh.Upstreams + expectedDestinations *pbmesh.Destinations // Reconcile loop outputs expErr string @@ -173,7 +173,7 @@ func TestReconcileCreatePodWithMirrorNamespaces(t *testing.T) { expectedHealthStatus: createPassingHealthStatus(), }, { - name: "new pod with explicit upstreams, ns and partition", + name: "new pod with explicit destinations, ns and partition", podName: testPodName, partition: constants.DefaultConsulPartition, @@ -194,7 +194,7 @@ func TestReconcileCreatePodWithMirrorNamespaces(t *testing.T) { expectedWorkload: createWorkload(), expectedHealthStatus: createPassingHealthStatus(), expectedProxyConfiguration: createProxyConfiguration(testPodName, pbmesh.ProxyMode_PROXY_MODE_DEFAULT), - expectedUpstreams: createUpstreams(), + expectedDestinations: createDestinations(), }, { name: "namespace in Consul does not exist", @@ -315,7 +315,7 @@ func TestReconcileDeletePodWithMirrorNamespaces(t *testing.T) { expectedConsulNamespace: "bar", }, { - name: "delete pod w/ explicit upstreams", + name: "delete pod w/ explicit destinations", podName: testPodName, podNamespace: "bar", partition: testPartition, @@ -330,7 +330,7 @@ func TestReconcileDeletePodWithMirrorNamespaces(t *testing.T) { existingWorkload: createWorkload(), existingHealthStatus: createPassingHealthStatus(), existingProxyConfiguration: createProxyConfiguration(testPodName, pbmesh.ProxyMode_PROXY_MODE_DEFAULT), - existingUpstreams: createUpstreams(), + existingDestinations: createDestinations(), expectedConsulNamespace: "bar", }, @@ -417,7 +417,7 @@ func TestReconcileCreatePodWithDestinationNamespace(t *testing.T) { expectedProxyConfiguration: createProxyConfiguration(testPodName, pbmesh.ProxyMode_PROXY_MODE_TRANSPARENT), }, { - name: "new pod with explicit upstreams, ns and partition", + name: "new pod with explicit destinations, ns and partition", podName: testPodName, partition: constants.DefaultConsulPartition, @@ -439,7 +439,7 @@ func TestReconcileCreatePodWithDestinationNamespace(t *testing.T) { expectedWorkload: createWorkload(), expectedHealthStatus: createPassingHealthStatus(), expectedProxyConfiguration: createProxyConfiguration(testPodName, pbmesh.ProxyMode_PROXY_MODE_DEFAULT), - expectedUpstreams: createUpstreams(), + expectedDestinations: createDestinations(), }, { name: "kitchen sink new pod, non-default ns and partition", @@ -585,7 +585,7 @@ func TestReconcileDeletePodWithDestinationNamespace(t *testing.T) { expectedConsulNamespace: "a-penguin-walks-into-a-bar", }, { - name: "delete pod with explicit upstreams", + name: "delete pod with explicit destinations", podName: testPodName, podNamespace: "bar", partition: testPartition, @@ -600,7 +600,7 @@ func TestReconcileDeletePodWithDestinationNamespace(t *testing.T) { existingWorkload: createWorkload(), existingHealthStatus: createPassingHealthStatus(), existingProxyConfiguration: createProxyConfiguration(testPodName, pbmesh.ProxyMode_PROXY_MODE_DEFAULT), - existingUpstreams: createUpstreams(), + existingDestinations: createDestinations(), expectedConsulNamespace: "a-penguin-walks-into-a-bar", }, @@ -750,8 +750,8 @@ func runControllerTest(t *testing.T, tc testCase) { loadResource( t, resourceClient, - getUpstreamsID(tc.podName, tc.expectedConsulNamespace, tc.partition), - tc.existingUpstreams, + getDestinationsID(tc.podName, tc.expectedConsulNamespace, tc.partition), + tc.existingDestinations, nil) namespacedName := types.NamespacedName{ @@ -779,6 +779,6 @@ func runControllerTest(t *testing.T, tc testCase) { pcID := getProxyConfigurationID(tc.podName, tc.expectedConsulNamespace, tc.partition) expectedProxyConfigurationMatches(t, resourceClient, pcID, tc.expectedProxyConfiguration) - uID := getUpstreamsID(tc.podName, metav1.NamespaceDefault, constants.DefaultConsulPartition) - expectedUpstreamMatches(t, resourceClient, uID, tc.expectedUpstreams) + uID := getDestinationsID(tc.podName, metav1.NamespaceDefault, constants.DefaultConsulPartition) + expectedDestinationMatches(t, resourceClient, uID, tc.expectedDestinations) } diff --git a/control-plane/connect-inject/controllers/pod/pod_controller_test.go b/control-plane/connect-inject/controllers/pod/pod_controller_test.go index 8928b9fc18..456f8fbfaf 100644 --- a/control-plane/connect-inject/controllers/pod/pod_controller_test.go +++ b/control-plane/connect-inject/controllers/pod/pod_controller_test.go @@ -761,9 +761,9 @@ func TestProxyConfigurationDelete(t *testing.T) { } } -// TestUpstreamsWrite does a subsampling of tests covered in TestProcessUpstreams to make sure things are hooked up +// TestDestinationsWrite does a subsampling of tests covered in TestProcessUpstreams to make sure things are hooked up // correctly. For the sake of test speed, more exhaustive testing is performed in TestProcessUpstreams. -func TestUpstreamsWrite(t *testing.T) { +func TestDestinationsWrite(t *testing.T) { t.Parallel() const podName = "pod1" @@ -771,23 +771,23 @@ func TestUpstreamsWrite(t *testing.T) { cases := []struct { name string pod func() *corev1.Pod - expected *pbmesh.Upstreams + expected *pbmesh.Destinations expErr string consulNamespacesEnabled bool consulPartitionsEnabled bool }{ { - name: "labeled annotated upstream with svc only", + name: "labeled annotated destination with svc only", pod: func() *corev1.Pod { pod1 := createPod(podName, "", true, true) pod1.Annotations[constants.AnnotationMeshDestinations] = "myPort.port.upstream1.svc:1234" return pod1 }, - expected: &pbmesh.Upstreams{ + expected: &pbmesh.Destinations{ Workloads: &pbcatalog.WorkloadSelector{ Names: []string{podName}, }, - Upstreams: []*pbmesh.Upstream{ + Destinations: []*pbmesh.Destination{ { DestinationRef: &pbresource.Reference{ Type: pbcatalog.ServiceType, @@ -800,7 +800,7 @@ func TestUpstreamsWrite(t *testing.T) { }, DestinationPort: "myPort", Datacenter: "", - ListenAddr: &pbmesh.Upstream_IpPort{ + ListenAddr: &pbmesh.Destination_IpPort{ IpPort: &pbmesh.IPPortAddress{ Port: uint32(1234), Ip: consulNodeAddress, @@ -813,19 +813,19 @@ func TestUpstreamsWrite(t *testing.T) { consulPartitionsEnabled: false, }, { - name: "labeled annotated upstream with svc, ns, and peer", + name: "labeled annotated destination with svc, ns, and peer", pod: func() *corev1.Pod { pod1 := createPod(podName, "", true, true) pod1.Annotations[constants.AnnotationMeshDestinations] = "myPort.port.upstream1.svc.ns1.ns.peer1.peer:1234" return pod1 }, - expErr: "error processing upstream annotations: upstream currently does not support peers: myPort.port.upstream1.svc.ns1.ns.peer1.peer:1234", + expErr: "error processing destination annotations: destination currently does not support peers: myPort.port.upstream1.svc.ns1.ns.peer1.peer:1234", // TODO: uncomment this and remove expErr when peers is supported - //expected: &pbmesh.Upstreams{ + //expected: &pbmesh.Destinations{ // Workloads: &pbcatalog.WorkloadSelector{ // Names: []string{podName}, // }, - // Upstreams: []*pbmesh.Upstream{ + // Destinations: []*pbmesh.Destination{ // { // DestinationRef: &pbresource.Reference{ // Type: pbcatalog.ServiceType, @@ -838,7 +838,7 @@ func TestUpstreamsWrite(t *testing.T) { // }, // DestinationPort: "myPort", // Datacenter: "", - // ListenAddr: &pbmesh.Upstream_IpPort{ + // ListenAddr: &pbmesh.Destination_IpPort{ // IpPort: &pbmesh.IPPortAddress{ // Port: uint32(1234), // Ip: consulNodeAddress, @@ -851,17 +851,17 @@ func TestUpstreamsWrite(t *testing.T) { consulPartitionsEnabled: false, }, { - name: "labeled annotated upstream with svc, ns, and partition", + name: "labeled annotated destination with svc, ns, and partition", pod: func() *corev1.Pod { pod1 := createPod(podName, "", true, true) pod1.Annotations[constants.AnnotationMeshDestinations] = "myPort.port.upstream1.svc.ns1.ns.part1.ap:1234" return pod1 }, - expected: &pbmesh.Upstreams{ + expected: &pbmesh.Destinations{ Workloads: &pbcatalog.WorkloadSelector{ Names: []string{podName}, }, - Upstreams: []*pbmesh.Upstream{ + Destinations: []*pbmesh.Destination{ { DestinationRef: &pbresource.Reference{ Type: pbcatalog.ServiceType, @@ -874,7 +874,7 @@ func TestUpstreamsWrite(t *testing.T) { }, DestinationPort: "myPort", Datacenter: "", - ListenAddr: &pbmesh.Upstream_IpPort{ + ListenAddr: &pbmesh.Destination_IpPort{ IpPort: &pbmesh.IPPortAddress{ Port: uint32(1234), Ip: consulNodeAddress, @@ -887,28 +887,28 @@ func TestUpstreamsWrite(t *testing.T) { consulPartitionsEnabled: true, }, { - name: "error labeled annotated upstream error: invalid partition/dc/peer", + name: "error labeled annotated destination error: invalid partition/dc/peer", pod: func() *corev1.Pod { pod1 := createPod(podName, "", true, true) pod1.Annotations[constants.AnnotationMeshDestinations] = "myPort.port.upstream1.svc.ns1.ns.part1.err:1234" return pod1 }, - expErr: "error processing upstream annotations: upstream structured incorrectly: myPort.port.upstream1.svc.ns1.ns.part1.err:1234", + expErr: "error processing destination annotations: destination structured incorrectly: myPort.port.upstream1.svc.ns1.ns.part1.err:1234", consulNamespacesEnabled: true, consulPartitionsEnabled: false, }, { - name: "unlabeled single upstream", + name: "unlabeled single destination", pod: func() *corev1.Pod { pod1 := createPod(podName, "", true, true) pod1.Annotations[constants.AnnotationMeshDestinations] = "myPort.upstream:1234" return pod1 }, - expected: &pbmesh.Upstreams{ + expected: &pbmesh.Destinations{ Workloads: &pbcatalog.WorkloadSelector{ Names: []string{podName}, }, - Upstreams: []*pbmesh.Upstream{ + Destinations: []*pbmesh.Destination{ { DestinationRef: &pbresource.Reference{ Type: pbcatalog.ServiceType, @@ -921,7 +921,7 @@ func TestUpstreamsWrite(t *testing.T) { }, DestinationPort: "myPort", Datacenter: "", - ListenAddr: &pbmesh.Upstream_IpPort{ + ListenAddr: &pbmesh.Destination_IpPort{ IpPort: &pbmesh.IPPortAddress{ Port: uint32(1234), Ip: consulNodeAddress, @@ -934,17 +934,17 @@ func TestUpstreamsWrite(t *testing.T) { consulPartitionsEnabled: false, }, { - name: "unlabeled single upstream with namespace and partition", + name: "unlabeled single destination with namespace and partition", pod: func() *corev1.Pod { pod1 := createPod(podName, "", true, true) pod1.Annotations[constants.AnnotationMeshDestinations] = "myPort.upstream.foo.bar:1234" return pod1 }, - expected: &pbmesh.Upstreams{ + expected: &pbmesh.Destinations{ Workloads: &pbcatalog.WorkloadSelector{ Names: []string{podName}, }, - Upstreams: []*pbmesh.Upstream{ + Destinations: []*pbmesh.Destination{ { DestinationRef: &pbresource.Reference{ Type: pbcatalog.ServiceType, @@ -957,7 +957,7 @@ func TestUpstreamsWrite(t *testing.T) { }, DestinationPort: "myPort", Datacenter: "", - ListenAddr: &pbmesh.Upstream_IpPort{ + ListenAddr: &pbmesh.Destination_IpPort{ IpPort: &pbmesh.IPPortAddress{ Port: uint32(1234), Ip: consulNodeAddress, @@ -992,44 +992,44 @@ func TestUpstreamsWrite(t *testing.T) { ResourceClient: resourceClient, } - err = pc.writeUpstreams(context.Background(), *tt.pod()) + err = pc.writeDestinations(context.Background(), *tt.pod()) if tt.expErr != "" { require.EqualError(t, err, tt.expErr) } else { require.NoError(t, err) - uID := getUpstreamsID(tt.pod().Name, metav1.NamespaceDefault, constants.DefaultConsulPartition) - expectedUpstreamMatches(t, resourceClient, uID, tt.expected) + uID := getDestinationsID(tt.pod().Name, metav1.NamespaceDefault, constants.DefaultConsulPartition) + expectedDestinationMatches(t, resourceClient, uID, tt.expected) } }) } } -func TestUpstreamsDelete(t *testing.T) { +func TestDestinationsDelete(t *testing.T) { t.Parallel() const podName = "pod1" cases := []struct { - name string - pod func() *corev1.Pod - existingUpstreams *pbmesh.Upstreams - expErr string - configEntry func() api.ConfigEntry - consulUnavailable bool + name string + pod func() *corev1.Pod + existingDestinations *pbmesh.Destinations + expErr string + configEntry func() api.ConfigEntry + consulUnavailable bool }{ { - name: "labeled annotated upstream with svc only", + name: "labeled annotated destination with svc only", pod: func() *corev1.Pod { pod1 := createPod(podName, "", true, true) pod1.Annotations[constants.AnnotationMeshDestinations] = "myPort.port.upstream1.svc:1234" return pod1 }, - existingUpstreams: &pbmesh.Upstreams{ + existingDestinations: &pbmesh.Destinations{ Workloads: &pbcatalog.WorkloadSelector{ Names: []string{podName}, }, - Upstreams: []*pbmesh.Upstream{ + Destinations: []*pbmesh.Destination{ { DestinationRef: &pbresource.Reference{ Type: pbcatalog.ServiceType, @@ -1042,7 +1042,7 @@ func TestUpstreamsDelete(t *testing.T) { }, DestinationPort: "myPort", Datacenter: "", - ListenAddr: &pbmesh.Upstream_IpPort{ + ListenAddr: &pbmesh.Destination_IpPort{ IpPort: &pbmesh.IPPortAddress{ Port: uint32(1234), Ip: consulNodeAddress, @@ -1074,23 +1074,23 @@ func TestUpstreamsDelete(t *testing.T) { // Load in the upstream for us to delete and check that it's there loadResource(t, resourceClient, - getUpstreamsID(tt.pod().Name, constants.DefaultConsulNS, constants.DefaultConsulPartition), - tt.existingUpstreams, + getDestinationsID(tt.pod().Name, constants.DefaultConsulNS, constants.DefaultConsulPartition), + tt.existingDestinations, nil) - uID := getUpstreamsID(tt.pod().Name, metav1.NamespaceDefault, constants.DefaultConsulPartition) - expectedUpstreamMatches(t, resourceClient, uID, tt.existingUpstreams) + uID := getDestinationsID(tt.pod().Name, metav1.NamespaceDefault, constants.DefaultConsulPartition) + expectedDestinationMatches(t, resourceClient, uID, tt.existingDestinations) // Delete the upstream nn := types.NamespacedName{Name: tt.pod().Name} - err = pc.deleteUpstreams(context.Background(), nn) + err = pc.deleteDestinations(context.Background(), nn) // Verify the upstream has been deleted or that an expected error has been returned if tt.expErr != "" { require.EqualError(t, err, tt.expErr) } else { require.NoError(t, err) - uID := getUpstreamsID(tt.pod().Name, metav1.NamespaceDefault, constants.DefaultConsulPartition) - expectedUpstreamMatches(t, resourceClient, uID, nil) + uID := getDestinationsID(tt.pod().Name, metav1.NamespaceDefault, constants.DefaultConsulPartition) + expectedDestinationMatches(t, resourceClient, uID, nil) } }) } @@ -1119,7 +1119,7 @@ func TestReconcileCreatePod(t *testing.T) { expectedWorkload *pbcatalog.Workload expectedHealthStatus *pbcatalog.HealthStatus expectedProxyConfiguration *pbmesh.ProxyConfiguration - expectedUpstreams *pbmesh.Upstreams + expectedDestinations *pbmesh.Destinations tproxy bool overwriteProbes bool @@ -1197,8 +1197,8 @@ func TestReconcileCreatePod(t *testing.T) { pcID := getProxyConfigurationID(tc.podName, metav1.NamespaceDefault, constants.DefaultConsulPartition) expectedProxyConfigurationMatches(t, resourceClient, pcID, tc.expectedProxyConfiguration) - uID := getUpstreamsID(tc.podName, metav1.NamespaceDefault, constants.DefaultConsulPartition) - expectedUpstreamMatches(t, resourceClient, uID, tc.expectedUpstreams) + uID := getDestinationsID(tc.podName, metav1.NamespaceDefault, constants.DefaultConsulPartition) + expectedDestinationMatches(t, resourceClient, uID, tc.expectedDestinations) } testCases := []testCase{ @@ -1276,7 +1276,7 @@ func TestReconcileCreatePod(t *testing.T) { expectedWorkload: createWorkload(), expectedHealthStatus: createPassingHealthStatus(), expectedProxyConfiguration: createProxyConfiguration("foo", pbmesh.ProxyMode_PROXY_MODE_DEFAULT), - expectedUpstreams: createUpstreams(), + expectedDestinations: createDestinations(), }, // TODO: make sure multi-error accumulates errors } @@ -1310,12 +1310,12 @@ func TestReconcileUpdatePod(t *testing.T) { existingWorkload *pbcatalog.Workload existingHealthStatus *pbcatalog.HealthStatus existingProxyConfiguration *pbmesh.ProxyConfiguration - existingUpstreams *pbmesh.Upstreams + existingDestinations *pbmesh.Destinations expectedWorkload *pbcatalog.Workload expectedHealthStatus *pbcatalog.HealthStatus expectedProxyConfiguration *pbmesh.ProxyConfiguration - expectedUpstreams *pbmesh.Upstreams + expectedDestinations *pbmesh.Destinations tproxy bool overwriteProbes bool @@ -1384,8 +1384,8 @@ func TestReconcileUpdatePod(t *testing.T) { nil) loadResource(t, resourceClient, - getUpstreamsID(tc.podName, constants.DefaultConsulNS, constants.DefaultConsulPartition), - tc.existingUpstreams, + getDestinationsID(tc.podName, constants.DefaultConsulNS, constants.DefaultConsulPartition), + tc.existingDestinations, nil) namespacedName := types.NamespacedName{ @@ -1412,8 +1412,8 @@ func TestReconcileUpdatePod(t *testing.T) { pcID := getProxyConfigurationID(tc.podName, metav1.NamespaceDefault, constants.DefaultConsulPartition) expectedProxyConfigurationMatches(t, resourceClient, pcID, tc.expectedProxyConfiguration) - uID := getUpstreamsID(tc.podName, metav1.NamespaceDefault, constants.DefaultConsulPartition) - expectedUpstreamMatches(t, resourceClient, uID, tc.expectedUpstreams) + uID := getDestinationsID(tc.podName, metav1.NamespaceDefault, constants.DefaultConsulPartition) + expectedDestinationMatches(t, resourceClient, uID, tc.expectedDestinations) } testCases := []testCase{ @@ -1506,7 +1506,7 @@ func TestReconcileUpdatePod(t *testing.T) { }, }, { - name: "pod update explicit upstreams", + name: "pod update explicit destination", podName: "foo", k8sObjects: func() []runtime.Object { pod := createPod("foo", "", true, true) @@ -1515,11 +1515,11 @@ func TestReconcileUpdatePod(t *testing.T) { }, existingWorkload: createWorkload(), existingHealthStatus: createPassingHealthStatus(), - existingUpstreams: &pbmesh.Upstreams{ + existingDestinations: &pbmesh.Destinations{ Workloads: &pbcatalog.WorkloadSelector{ Names: []string{"foo"}, }, - Upstreams: []*pbmesh.Upstream{ + Destinations: []*pbmesh.Destination{ { DestinationRef: &pbresource.Reference{ Type: pbcatalog.ServiceType, @@ -1532,7 +1532,7 @@ func TestReconcileUpdatePod(t *testing.T) { }, DestinationPort: "myPort2", Datacenter: "", - ListenAddr: &pbmesh.Upstream_IpPort{ + ListenAddr: &pbmesh.Destination_IpPort{ IpPort: &pbmesh.IPPortAddress{ Port: uint32(1234), Ip: consulNodeAddress, @@ -1543,7 +1543,7 @@ func TestReconcileUpdatePod(t *testing.T) { }, expectedWorkload: createWorkload(), expectedHealthStatus: createPassingHealthStatus(), - expectedUpstreams: createUpstreams(), + expectedDestinations: createDestinations(), }, } @@ -1574,12 +1574,12 @@ func TestReconcileDeletePod(t *testing.T) { existingWorkload *pbcatalog.Workload existingHealthStatus *pbcatalog.HealthStatus existingProxyConfiguration *pbmesh.ProxyConfiguration - existingUpstreams *pbmesh.Upstreams + existingDestinations *pbmesh.Destinations expectedWorkload *pbcatalog.Workload expectedHealthStatus *pbcatalog.HealthStatus expectedProxyConfiguration *pbmesh.ProxyConfiguration - expectedUpstreams *pbmesh.Upstreams + expectedDestinations *pbmesh.Destinations aclsEnabled bool @@ -1641,8 +1641,8 @@ func TestReconcileDeletePod(t *testing.T) { loadResource( t, resourceClient, - getUpstreamsID(tc.podName, constants.DefaultConsulNS, constants.DefaultConsulPartition), - tc.existingUpstreams, + getDestinationsID(tc.podName, constants.DefaultConsulNS, constants.DefaultConsulPartition), + tc.existingDestinations, nil) namespacedName := types.NamespacedName{ @@ -1669,8 +1669,8 @@ func TestReconcileDeletePod(t *testing.T) { pcID := getProxyConfigurationID(tc.podName, metav1.NamespaceDefault, constants.DefaultConsulPartition) expectedProxyConfigurationMatches(t, resourceClient, pcID, tc.expectedProxyConfiguration) - uID := getUpstreamsID(tc.podName, metav1.NamespaceDefault, constants.DefaultConsulPartition) - expectedUpstreamMatches(t, resourceClient, uID, tc.expectedUpstreams) + uID := getDestinationsID(tc.podName, metav1.NamespaceDefault, constants.DefaultConsulPartition) + expectedDestinationMatches(t, resourceClient, uID, tc.expectedDestinations) } testCases := []testCase{ @@ -1687,7 +1687,7 @@ func TestReconcileDeletePod(t *testing.T) { existingWorkload: createWorkload(), existingHealthStatus: createPassingHealthStatus(), existingProxyConfiguration: createProxyConfiguration("foo", pbmesh.ProxyMode_PROXY_MODE_DEFAULT), - existingUpstreams: createUpstreams(), + existingDestinations: createDestinations(), }, // TODO: enable ACLs and make sure they are deleted } @@ -1869,12 +1869,12 @@ func createProxyConfiguration(podName string, mode pbmesh.ProxyMode) *pbmesh.Pro } // createCriticalHealthStatus creates a failing HealthStatus that matches the pod from createPod. -func createUpstreams() *pbmesh.Upstreams { - return &pbmesh.Upstreams{ +func createDestinations() *pbmesh.Destinations { + return &pbmesh.Destinations{ Workloads: &pbcatalog.WorkloadSelector{ Names: []string{"foo"}, }, - Upstreams: []*pbmesh.Upstream{ + Destinations: []*pbmesh.Destination{ { DestinationRef: &pbresource.Reference{ Type: pbcatalog.ServiceType, @@ -1887,7 +1887,7 @@ func createUpstreams() *pbmesh.Upstreams { }, DestinationPort: "myPort", Datacenter: "", - ListenAddr: &pbmesh.Upstream_IpPort{ + ListenAddr: &pbmesh.Destination_IpPort{ IpPort: &pbmesh.IPPortAddress{ Port: uint32(24601), Ip: consulNodeAddress, @@ -1984,7 +1984,7 @@ func expectedProxyConfigurationMatches(t *testing.T, client pbresource.ResourceS require.Equal(t, "", diff, "ProxyConfigurations do not match") } -func expectedUpstreamMatches(t *testing.T, client pbresource.ResourceServiceClient, id *pbresource.ID, expectedUpstreams *pbmesh.Upstreams) { +func expectedDestinationMatches(t *testing.T, client pbresource.ResourceServiceClient, id *pbresource.ID, expectedUpstreams *pbmesh.Destinations) { req := &pbresource.ReadRequest{Id: id} res, err := client.Read(context.Background(), req) @@ -2003,7 +2003,7 @@ func expectedUpstreamMatches(t *testing.T, client pbresource.ResourceServiceClie require.NotNil(t, res.GetResource().GetData()) - actualUpstreams := &pbmesh.Upstreams{} + actualUpstreams := &pbmesh.Destinations{} err = res.GetResource().GetData().UnmarshalTo(actualUpstreams) require.NoError(t, err) diff --git a/control-plane/connect-inject/webhookv2/container_env.go b/control-plane/connect-inject/webhookv2/container_env.go index 568a07f124..b612b3c6aa 100644 --- a/control-plane/connect-inject/webhookv2/container_env.go +++ b/control-plane/connect-inject/webhookv2/container_env.go @@ -14,27 +14,27 @@ import ( ) func (w *MeshWebhook) containerEnvVars(pod corev1.Pod) ([]corev1.EnvVar, error) { - upstreams, err := common.ProcessPodUpstreamsForMeshWebhook(pod) + destinations, err := common.ProcessPodDestinationsForMeshWebhook(pod) if err != nil { - return nil, fmt.Errorf("error processing the upstream for container environment variable creation: %s", err.Error()) + return nil, fmt.Errorf("error processing the destination for container environment variable creation: %s", err.Error()) } - if upstreams == nil { + if destinations == nil { return nil, nil } var result []corev1.EnvVar - for _, upstream := range upstreams.Upstreams { - serviceName := strings.TrimSpace(upstream.DestinationRef.Name) + for _, destination := range destinations.Destinations { + serviceName := strings.TrimSpace(destination.DestinationRef.Name) serviceName = strings.ToUpper(strings.Replace(serviceName, "-", "_", -1)) - portName := strings.TrimSpace(upstream.DestinationPort) + portName := strings.TrimSpace(destination.DestinationPort) portName = strings.ToUpper(strings.Replace(portName, "-", "_", -1)) result = append(result, corev1.EnvVar{ Name: fmt.Sprintf("%s_%s_CONNECT_SERVICE_HOST", serviceName, portName), - Value: upstream.GetIpPort().Ip, + Value: destination.GetIpPort().Ip, }, corev1.EnvVar{ Name: fmt.Sprintf("%s_%s_CONNECT_SERVICE_PORT", serviceName, portName), - Value: strconv.Itoa(int(upstream.GetIpPort().Port)), + Value: strconv.Itoa(int(destination.GetIpPort().Port)), }) } diff --git a/control-plane/connect-inject/webhookv2/mesh_webhook_test.go b/control-plane/connect-inject/webhookv2/mesh_webhook_test.go index ffaf499621..ee68db5c14 100644 --- a/control-plane/connect-inject/webhookv2/mesh_webhook_test.go +++ b/control-plane/connect-inject/webhookv2/mesh_webhook_test.go @@ -210,7 +210,7 @@ func TestHandlerHandle(t *testing.T) { }, }, { - "pod with upstreams specified", + "pod with destinations specified", MeshWebhook{ Log: logrtest.New(t), AllowK8sNamespacesSet: mapset.NewSetWith("*"), @@ -268,7 +268,7 @@ func TestHandlerHandle(t *testing.T) { }, }, { - "error pod with incorrect upstreams specified", + "error pod with incorrect destinations specified", MeshWebhook{ Log: logrtest.New(t), AllowK8sNamespacesSet: mapset.NewSetWith("*"), diff --git a/control-plane/go.mod b/control-plane/go.mod index 920e3b3907..d8c28e4932 100644 --- a/control-plane/go.mod +++ b/control-plane/go.mod @@ -3,7 +3,7 @@ module github.com/hashicorp/consul-k8s/control-plane // TODO: remove these when the SDK is released for Consul 1.17 and coinciding patch releases replace ( // This replace directive is needed because `api` requires 0.4.1 of proto-public but we need an unreleased version - github.com/hashicorp/consul/proto-public v0.4.1 => github.com/hashicorp/consul/proto-public v0.1.2-0.20230922204015-ac9209d8fba9 + github.com/hashicorp/consul/proto-public v0.4.1 => github.com/hashicorp/consul/proto-public v0.1.2-0.20230923011829-3436b2921af5 // This replace directive is needed because `api` requires 0.14.1 of `sdk` but we need an unreleased version github.com/hashicorp/consul/sdk v0.14.1 => github.com/hashicorp/consul/sdk v0.4.1-0.20230911164019-a69e901660bd ) diff --git a/control-plane/go.sum b/control-plane/go.sum index 8d35f5daaf..65ce4fd014 100644 --- a/control-plane/go.sum +++ b/control-plane/go.sum @@ -265,8 +265,8 @@ github.com/hashicorp/consul-server-connection-manager v0.1.4 h1:wrcSRV6WGXFBNpNb github.com/hashicorp/consul-server-connection-manager v0.1.4/go.mod h1:LMqHkALoLP0HUQKOG21xXYr0YPUayIQIHNTlmxG100E= github.com/hashicorp/consul/api v1.10.1-0.20230914174054-e5808d85f751 h1:LnzgDq4e7ZfM1+XS6S21B9taQrbfdydXenL1xHyG1PQ= github.com/hashicorp/consul/api v1.10.1-0.20230914174054-e5808d85f751/go.mod h1:/Fz5sgOC0a5XY0BmPGj7aDSZRNgySLm02lV4xkU4DS4= -github.com/hashicorp/consul/proto-public v0.1.2-0.20230922204015-ac9209d8fba9 h1:3Xux09euVvBRlu66yJaPVasZf+OxFUlmCBzaigHwtEs= -github.com/hashicorp/consul/proto-public v0.1.2-0.20230922204015-ac9209d8fba9/go.mod h1:KAOxsaELPpA7JX10kMeygAskAqsQnu3SPgeruMhYZMU= +github.com/hashicorp/consul/proto-public v0.1.2-0.20230923011829-3436b2921af5 h1:5u/qgx4HVbPlTN6xvbgEd7ayjl1AL2pXThvipdE7ofw= +github.com/hashicorp/consul/proto-public v0.1.2-0.20230923011829-3436b2921af5/go.mod h1:KAOxsaELPpA7JX10kMeygAskAqsQnu3SPgeruMhYZMU= github.com/hashicorp/consul/sdk v0.4.1-0.20230911164019-a69e901660bd h1:tRrSgVY71Jl6T2lJUokMLj3T1MO9uiSvW0CieBkjTvo= github.com/hashicorp/consul/sdk v0.4.1-0.20230911164019-a69e901660bd/go.mod h1:vFt03juSzocLRFo59NkeQHHmQa6+g7oU0pfzdI1mUhg= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=