Skip to content

Commit

Permalink
Rename pbmesh.Upstreams to pbmesh.Destinations (#3005)
Browse files Browse the repository at this point in the history
* Rename pbmesh.Upstreams to pbmesh.Destinations

* fix traffic perm acceptance tests fixture

* more mesh v2 acceptance test fixes

* kick off k8s tests

* update images
  • Loading branch information
ishustava authored Sep 26, 2023
1 parent 04a8d98 commit 78ad376
Show file tree
Hide file tree
Showing 12 changed files with 284 additions and 282 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ spec:
destination:
identityName: multiport
action: allow
sources:
- identityName: static-client
permissions:
- sources:
- identityName: static-client
14 changes: 7 additions & 7 deletions acceptance/tests/mesh_v2/mesh_inject_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ func TestMeshInject_MultiportService(t *testing.T) {
ctx := suite.Environment().DefaultContext(t)

helmValues := map[string]string{
"global.image": "thisisnotashwin/consul:foo",
"global.imageK8S": "thisisnotashwin/consul-k8s:foo",
"global.imageConsulDataplane": "jmurrethc/consul-dataplane-dev",
"global.experiments[0]": "resource-apis",
"global.experiments[0]": "resource-apis",
// The UI is not supported for v2 in 1.17, so for now it must be disabled.
"ui.enabled": "false",
"connectInject.enabled": "true",
Expand Down Expand Up @@ -79,16 +76,19 @@ func TestMeshInject_MultiportService(t *testing.T) {

{
// TODO: once ACLs are implemented, only run this block when secure=true and delete the below line and fixture since the default is deny when secure is true.
k8s.DeployKustomize(t, ctx.KubectlOptions(t), cfg.NoCleanupOnFailure, cfg.NoCleanup, cfg.DebugDirectory, "../../tests/fixtures/cases/trafficpermissions-deny")
k8s.KubectlApplyK(t, ctx.KubectlOptions(t), "../../tests/fixtures/cases/trafficpermissions-deny")
// Now test that traffic is denied between the source and the destination.
if cfg.EnableTransparentProxy {
k8s.CheckStaticServerConnectionMultipleFailureMessages(t, ctx.KubectlOptions(t), connhelper.StaticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "", "http://multiport:8080")
k8s.CheckStaticServerConnectionMultipleFailureMessages(t, ctx.KubectlOptions(t), connhelper.StaticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "", "http://multiport:9090")
} else {
k8s.CheckStaticServerConnectionMultipleFailureMessages(t, ctx.KubectlOptions(t), connhelper.StaticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "", "http://localhost:1234")
k8s.CheckStaticServerConnectionMultipleFailureMessages(t, ctx.KubectlOptions(t), connhelper.StaticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "", "http://multiport:2345")
k8s.CheckStaticServerConnectionMultipleFailureMessages(t, ctx.KubectlOptions(t), connhelper.StaticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "", "http://localhost:2345")
}
k8s.DeployKustomize(t, ctx.KubectlOptions(t), cfg.NoCleanupOnFailure, cfg.NoCleanup, cfg.DebugDirectory, "../../tests/fixtures/bases/trafficpermissions")
k8s.KubectlApplyK(t, ctx.KubectlOptions(t), "../../tests/fixtures/bases/trafficpermissions")
helpers.Cleanup(t, cfg.NoCleanupOnFailure, cfg.NoCleanup, func() {
k8s.KubectlDeleteK(t, ctx.KubectlOptions(t), "../../tests/fixtures/bases/trafficpermissions")
})
}

// Check connection from static-client to multiport.
Expand Down
86 changes: 43 additions & 43 deletions control-plane/connect-inject/common/annotation_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,27 @@ const (
ConsulNodeAddress = "127.0.0.1"
)

// ProcessPodUpstreamsForMeshWebhook reads the list of upstreams from the Pod annotation and converts them into a pbmesh.Upstreams
// ProcessPodDestinationsForMeshWebhook reads the list of destinations from the Pod annotation and converts them into a pbmesh.Destinations
// object.
func ProcessPodUpstreamsForMeshWebhook(pod corev1.Pod) (*pbmesh.Upstreams, error) {
return ProcessPodUpstreams(pod, true, true)
func ProcessPodDestinationsForMeshWebhook(pod corev1.Pod) (*pbmesh.Destinations, error) {
return ProcessPodDestinations(pod, true, true)
}

// ProcessPodUpstreams reads the list of upstreams from the Pod annotation and converts them into a pbmesh.Upstreams
// ProcessPodDestinations reads the list of destinations from the Pod annotation and converts them into a pbmesh.Destinations
// object.
func ProcessPodUpstreams(pod corev1.Pod, enablePartitions, enableNamespaces bool) (*pbmesh.Upstreams, error) {
upstreams := &pbmesh.Upstreams{}
func ProcessPodDestinations(pod corev1.Pod, enablePartitions, enableNamespaces bool) (*pbmesh.Destinations, error) {
destinations := &pbmesh.Destinations{}
raw, ok := pod.Annotations[constants.AnnotationMeshDestinations]
if !ok || raw == "" {
return nil, nil
}

upstreams.Workloads = &pbcatalog.WorkloadSelector{
destinations.Workloads = &pbcatalog.WorkloadSelector{
Names: []string{pod.Name},
}

for _, raw := range strings.Split(raw, ",") {
var upstream *pbmesh.Upstream
var destination *pbmesh.Destination

// Determine the type of processing required unlabeled or labeled
// [service-port-name].[service-name].[service-namespace].[service-partition]:[port]:[optional datacenter]
Expand All @@ -62,38 +62,38 @@ func ProcessPodUpstreams(pod corev1.Pod, enablePartitions, enableNamespaces bool

if labeledFormat {
var err error
upstream, err = processPodLabeledUpstream(pod, raw, enablePartitions, enableNamespaces)
destination, err = processPodLabeledDestination(pod, raw, enablePartitions, enableNamespaces)
if err != nil {
return &pbmesh.Upstreams{}, err
return nil, err
}
} else {
var err error
upstream, err = processPodUnlabeledUpstream(pod, raw, enablePartitions, enableNamespaces)
destination, err = processPodUnlabeledDestination(pod, raw, enablePartitions, enableNamespaces)
if err != nil {
return &pbmesh.Upstreams{}, err
return nil, err
}
}

upstreams.Upstreams = append(upstreams.Upstreams, upstream)
destinations.Destinations = append(destinations.Destinations, destination)
}

return upstreams, nil
return destinations, nil
}

// processPodLabeledUpstream processes an upstream in the format:
// processPodLabeledDestination processes a destination in the format:
// [service-port-name].port.[service-name].svc.[service-namespace].ns.[service-peer].peer:[port]
// [service-port-name].port.[service-name].svc.[service-namespace].ns.[service-partition].ap:[port]
// [service-port-name].port.[service-name].svc.[service-namespace].ns.[service-datacenter].dc:[port].
// peer/ap/dc are mutually exclusive. At minimum service-port-name and service-name are required.
// The ordering matters for labeled as well as unlabeled. The ordering of the labeled parameters should follow
// the order and requirements of the unlabeled parameters.
// TODO: enable dc and peer support when ready, currently return errors if set.
func processPodLabeledUpstream(pod corev1.Pod, rawUpstream string, enablePartitions, enableNamespaces bool) (*pbmesh.Upstream, error) {
func processPodLabeledDestination(pod corev1.Pod, rawUpstream string, enablePartitions, enableNamespaces bool) (*pbmesh.Destination, error) {
parts := strings.SplitN(rawUpstream, ":", 3)
var port int32
port, _ = PortValue(pod, strings.TrimSpace(parts[1]))
if port <= 0 {
return &pbmesh.Upstream{}, fmt.Errorf("port value %d in upstream is invalid: %s", port, rawUpstream)
return nil, fmt.Errorf("port value %d in destination is invalid: %s", port, rawUpstream)
}

service := parts[0]
Expand All @@ -108,37 +108,37 @@ func processPodLabeledUpstream(pod corev1.Pod, rawUpstream string, enablePartiti
case "peer":
// TODO: uncomment and remove error when peers supported
//peer = strings.TrimSpace(pieces[6])
return &pbmesh.Upstream{}, fmt.Errorf("upstream currently does not support peers: %s", rawUpstream)
return nil, fmt.Errorf("destination currently does not support peers: %s", rawUpstream)
case "ap":
partition = strings.TrimSpace(pieces[6])
case "dc":
// TODO: uncomment and remove error when datacenters are supported
//datacenter = strings.TrimSpace(pieces[6])
return &pbmesh.Upstream{}, fmt.Errorf("upstream currently does not support datacenters: %s", rawUpstream)
return nil, fmt.Errorf("destination currently does not support datacenters: %s", rawUpstream)
default:
return &pbmesh.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream)
return nil, fmt.Errorf("destination structured incorrectly: %s", rawUpstream)
}
fallthrough
case 6:
if strings.TrimSpace(pieces[5]) == "ns" {
namespace = strings.TrimSpace(pieces[4])
} else {
return &pbmesh.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream)
return nil, fmt.Errorf("destination structured incorrectly: %s", rawUpstream)
}
fallthrough
case 4:
if strings.TrimSpace(pieces[3]) == "svc" {
svcName = strings.TrimSpace(pieces[2])
} else {
return &pbmesh.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream)
return nil, fmt.Errorf("destination structured incorrectly: %s", rawUpstream)
}
if strings.TrimSpace(pieces[1]) == "port" {
portName = strings.TrimSpace(pieces[0])
} else {
return &pbmesh.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream)
return nil, fmt.Errorf("destination structured incorrectly: %s", rawUpstream)
}
default:
return &pbmesh.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream)
return nil, fmt.Errorf("destination structured incorrectly: %s", rawUpstream)
}
} else {
switch len(pieces) {
Expand All @@ -148,33 +148,33 @@ func processPodLabeledUpstream(pod corev1.Pod, rawUpstream string, enablePartiti
case "peer":
// TODO: uncomment and remove error when peers supported
//peer = strings.TrimSpace(pieces[4])
return &pbmesh.Upstream{}, fmt.Errorf("upstream currently does not support peers: %s", rawUpstream)
return nil, fmt.Errorf("destination currently does not support peers: %s", rawUpstream)
case "dc":
// TODO: uncomment and remove error when datacenter supported
//datacenter = strings.TrimSpace(pieces[4])
return &pbmesh.Upstream{}, fmt.Errorf("upstream currently does not support datacenters: %s", rawUpstream)
return nil, fmt.Errorf("destination currently does not support datacenters: %s", rawUpstream)
default:
return &pbmesh.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream)
return nil, fmt.Errorf("destination structured incorrectly: %s", rawUpstream)
}
// TODO: uncomment and remove error when datacenter and/or peers supported
//fallthrough
case 4:
if strings.TrimSpace(pieces[3]) == "svc" {
svcName = strings.TrimSpace(pieces[2])
} else {
return &pbmesh.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream)
return nil, fmt.Errorf("destination structured incorrectly: %s", rawUpstream)
}
if strings.TrimSpace(pieces[1]) == "port" {
portName = strings.TrimSpace(pieces[0])
} else {
return &pbmesh.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream)
return nil, fmt.Errorf("destination structured incorrectly: %s", rawUpstream)
}
default:
return &pbmesh.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream)
return nil, fmt.Errorf("destination structured incorrectly: %s", rawUpstream)
}
}

upstream := pbmesh.Upstream{
destination := pbmesh.Destination{
DestinationRef: &pbresource.Reference{
Type: pbcatalog.ServiceType,
Tenancy: &pbresource.Tenancy{
Expand All @@ -186,32 +186,32 @@ func processPodLabeledUpstream(pod corev1.Pod, rawUpstream string, enablePartiti
},
DestinationPort: portName,
Datacenter: datacenter,
ListenAddr: &pbmesh.Upstream_IpPort{
ListenAddr: &pbmesh.Destination_IpPort{
IpPort: &pbmesh.IPPortAddress{
Port: uint32(port),
Ip: ConsulNodeAddress,
},
},
}

return &upstream, nil
return &destination, nil
}

// processPodUnlabeledUpstream processes an upstream in the format:
// processPodUnlabeledDestination processes a destination in the format:
// [service-port-name].[service-name].[service-namespace].[service-partition]:[port]:[optional datacenter].
// There is no unlabeled field for peering.
// TODO: enable dc and peer support when ready, currently return errors if set.
func processPodUnlabeledUpstream(pod corev1.Pod, rawUpstream string, enablePartitions, enableNamespaces bool) (*pbmesh.Upstream, error) {
func processPodUnlabeledDestination(pod corev1.Pod, rawUpstream string, enablePartitions, enableNamespaces bool) (*pbmesh.Destination, error) {
var portName, datacenter, svcName, namespace, partition string
var port int32
var upstream pbmesh.Upstream
var destination pbmesh.Destination

parts := strings.SplitN(rawUpstream, ":", 3)

port, _ = PortValue(pod, strings.TrimSpace(parts[1]))

// If Consul Namespaces or Admin Partitions are enabled, attempt to parse the
// upstream for a namespace.
// destination for a namespace.
if enableNamespaces || enablePartitions {
pieces := strings.SplitN(parts[0], ".", 4)
switch len(pieces) {
Expand All @@ -225,12 +225,12 @@ func processPodUnlabeledUpstream(pod corev1.Pod, rawUpstream string, enableParti
svcName = strings.TrimSpace(pieces[1])
portName = strings.TrimSpace(pieces[0])
default:
return &pbmesh.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream)
return nil, fmt.Errorf("destination structured incorrectly: %s", rawUpstream)
}
} else {
pieces := strings.SplitN(parts[0], ".", 2)
if len(pieces) < 2 {
return &pbmesh.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream)
return nil, fmt.Errorf("destination structured incorrectly: %s", rawUpstream)
}
svcName = strings.TrimSpace(pieces[1])
portName = strings.TrimSpace(pieces[0])
Expand All @@ -240,11 +240,11 @@ func processPodUnlabeledUpstream(pod corev1.Pod, rawUpstream string, enableParti
if len(parts) > 2 {
// TODO: uncomment and remove error when datacenters supported
//datacenter = strings.TrimSpace(parts[2])
return &pbmesh.Upstream{}, fmt.Errorf("upstream currently does not support datacenters: %s", rawUpstream)
return nil, fmt.Errorf("destination currently does not support datacenters: %s", rawUpstream)
}

if port > 0 {
upstream = pbmesh.Upstream{
destination = pbmesh.Destination{
DestinationRef: &pbresource.Reference{
Type: pbcatalog.ServiceType,
Tenancy: &pbresource.Tenancy{
Expand All @@ -256,13 +256,13 @@ func processPodUnlabeledUpstream(pod corev1.Pod, rawUpstream string, enableParti
},
DestinationPort: portName,
Datacenter: datacenter,
ListenAddr: &pbmesh.Upstream_IpPort{
ListenAddr: &pbmesh.Destination_IpPort{
IpPort: &pbmesh.IPPortAddress{
Port: uint32(port),
Ip: ConsulNodeAddress,
},
},
}
}
return &upstream, nil
return &destination, nil
}
Loading

0 comments on commit 78ad376

Please sign in to comment.