diff --git a/internal/provider/kubernetes/controller.go b/internal/provider/kubernetes/controller.go index e866ef5d2c7..7051d21f4d2 100644 --- a/internal/provider/kubernetes/controller.go +++ b/internal/provider/kubernetes/controller.go @@ -986,6 +986,9 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M epPredicates...)); err != nil { return err } + if err := addEnvoyProxyIndexers(ctx, mgr); err != nil { + return err + } // Watch Gateway CRUDs and reconcile affected GatewayClass. gPredicates := []predicate.TypedPredicate[*gwapiv1.Gateway]{ diff --git a/internal/provider/kubernetes/indexers.go b/internal/provider/kubernetes/indexers.go index 62b11f68903..57c13997fe5 100644 --- a/internal/provider/kubernetes/indexers.go +++ b/internal/provider/kubernetes/indexers.go @@ -9,6 +9,7 @@ import ( "context" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" gwapiv1 "sigs.k8s.io/gateway-api/apis/v1" @@ -40,6 +41,7 @@ const ( secretCtpIndex = "secretCtpIndex" configMapBtlsIndex = "configMapBtlsIndex" backendEnvoyExtensionPolicyIndex = "backendEnvoyExtensionPolicyIndex" + backendEnvoyProxyTelemetryIndex = "backendEnvoyProxyTelemetryIndex" ) func addReferenceGrantIndexers(ctx context.Context, mgr manager.Manager) error { @@ -109,6 +111,99 @@ func backendHTTPRouteIndexFunc(rawObj client.Object) []string { return backendRefs } +func addEnvoyProxyIndexers(ctx context.Context, mgr manager.Manager) error { + if err := mgr.GetFieldIndexer().IndexField(ctx, &v1alpha1.EnvoyProxy{}, backendEnvoyProxyTelemetryIndex, backendEnvoyProxyTelemetryIndexFunc); err != nil { + return err + } + + return nil +} + +func backendEnvoyProxyTelemetryIndexFunc(rawObj client.Object) []string { + ep := rawObj.(*v1alpha1.EnvoyProxy) + + refs := sets.New[string]() + refs.Insert(accessLogRefs(ep)...) + refs.Insert(metricRefs(ep)...) + refs.Insert(traceRefs(ep)...) + + return refs.UnsortedList() +} + +func accessLogRefs(ep *v1alpha1.EnvoyProxy) []string { + var refs []string + + if ep.Spec.Telemetry == nil || ep.Spec.Telemetry.Metrics == nil { + return refs + } + + for _, sink := range ep.Spec.Telemetry.Metrics.Sinks { + if sink.OpenTelemetry != nil { + otel := sink.OpenTelemetry + if otel.BackendRefs != nil { + for _, ref := range otel.BackendRefs { + if ref.Kind == nil || string(*ref.Kind) == gatewayapi.KindService { + refs = append(refs, + types.NamespacedName{ + Namespace: gatewayapi.NamespaceDerefOr(ref.Namespace, ep.Namespace), + Name: string(ref.Name), + }.String(), + ) + } + } + } + } + } + + return refs +} + +func metricRefs(ep *v1alpha1.EnvoyProxy) []string { + var refs []string + + if ep.Spec.Telemetry == nil || ep.Spec.Telemetry.Metrics == nil { + return refs + } + + for _, sink := range ep.Spec.Telemetry.Metrics.Sinks { + if sink.OpenTelemetry != nil { + for _, backend := range sink.OpenTelemetry.BackendRefs { + if backend.Kind == nil || string(*backend.Kind) == gatewayapi.KindService { + refs = append(refs, + types.NamespacedName{ + Namespace: gatewayapi.NamespaceDerefOr(backend.Namespace, ep.Namespace), + Name: string(backend.Name), + }.String(), + ) + } + } + } + } + + return refs +} + +func traceRefs(ep *v1alpha1.EnvoyProxy) []string { + var refs []string + + if ep.Spec.Telemetry == nil || ep.Spec.Telemetry.Tracing == nil { + return refs + } + + for _, ref := range ep.Spec.Telemetry.Tracing.Provider.BackendRefs { + if ref.Kind == nil || string(*ref.Kind) == gatewayapi.KindService { + refs = append(refs, + types.NamespacedName{ + Namespace: gatewayapi.NamespaceDerefOr(ref.Namespace, ep.Namespace), + Name: string(ref.Name), + }.String(), + ) + } + } + + return refs +} + // addGRPCRouteIndexers adds indexing on GRPCRoute, for Service objects that are // referenced in GRPCRoute objects via `.spec.rules.backendRefs`. This helps in // querying for GRPCRoutes that are affected by a particular Service CRUD. diff --git a/internal/provider/kubernetes/predicates.go b/internal/provider/kubernetes/predicates.go index c852bffda73..9b6eb4a8724 100644 --- a/internal/provider/kubernetes/predicates.go +++ b/internal/provider/kubernetes/predicates.go @@ -249,6 +249,10 @@ func (r *gatewayAPIReconciler) validateServiceForReconcile(obj client.Object) bo return true } + if r.isEnvoyProxyReferencingBackend(&nsName) { + return true + } + return r.isEnvoyExtensionPolicyReferencingBackend(&nsName) } @@ -547,3 +551,15 @@ func (r *gatewayAPIReconciler) isEnvoyExtensionPolicyReferencingBackend(nsName * return len(spList.Items) > 0 } + +func (r *gatewayAPIReconciler) isEnvoyProxyReferencingBackend(nn *types.NamespacedName) bool { + proxyList := &egv1a1.EnvoyProxyList{} + if err := r.client.List(context.Background(), proxyList, &client.ListOptions{ + FieldSelector: fields.OneTermEqualSelector(backendEnvoyProxyTelemetryIndex, nn.String()), + }); err != nil { + r.log.Error(err, "unable to find associated EnvoyProxies") + return false + } + + return len(proxyList.Items) > 0 +} diff --git a/internal/provider/kubernetes/predicates_test.go b/internal/provider/kubernetes/predicates_test.go index debeacdf1f7..f5416c198d0 100644 --- a/internal/provider/kubernetes/predicates_test.go +++ b/internal/provider/kubernetes/predicates_test.go @@ -379,6 +379,70 @@ func TestValidateEndpointSliceForReconcile(t *testing.T) { func TestValidateServiceForReconcile(t *testing.T) { sampleGateway := test.GetGateway(types.NamespacedName{Namespace: "default", Name: "scheduled-status-test"}, "test-gc", 8080) mergeGatewaysConfig := test.GetEnvoyProxy(types.NamespacedName{Namespace: "default", Name: "merge-gateways-config"}, true) + telemetryEnabledGatewaysConfig := &v1alpha1.EnvoyProxy{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "telemetry", + }, + Spec: v1alpha1.EnvoyProxySpec{ + Telemetry: &v1alpha1.ProxyTelemetry{ + AccessLog: &v1alpha1.ProxyAccessLog{ + Settings: []v1alpha1.ProxyAccessLogSetting{ + { + Sinks: []v1alpha1.ProxyAccessLogSink{ + { + Type: v1alpha1.ProxyAccessLogSinkTypeOpenTelemetry, + OpenTelemetry: &v1alpha1.OpenTelemetryEnvoyProxyAccessLog{ + BackendRefs: []v1alpha1.BackendRef{ + { + BackendObjectReference: gwapiv1.BackendObjectReference{ + Name: "otel-collector", + Namespace: ptr.To(gwapiv1.Namespace("default")), + Port: ptr.To(gwapiv1.PortNumber(4317)), + }, + }, + }, + }, + }, + }, + }, + }, + }, + Metrics: &v1alpha1.ProxyMetrics{ + Sinks: []v1alpha1.ProxyMetricSink{ + { + Type: v1alpha1.MetricSinkTypeOpenTelemetry, + OpenTelemetry: &v1alpha1.ProxyOpenTelemetrySink{ + BackendRefs: []v1alpha1.BackendRef{ + { + BackendObjectReference: gwapiv1.BackendObjectReference{ + Name: "otel-collector", + Namespace: ptr.To(gwapiv1.Namespace("default")), + Port: ptr.To(gwapiv1.PortNumber(4317)), + }, + }, + }, + }, + }, + }, + }, + Tracing: &v1alpha1.ProxyTracing{ + Provider: v1alpha1.TracingProvider{ + Type: v1alpha1.TracingProviderTypeOpenTelemetry, + BackendRefs: []v1alpha1.BackendRef{ + { + BackendObjectReference: gwapiv1.BackendObjectReference{ + Name: "otel-collector", + Namespace: ptr.To(gwapiv1.Namespace("default")), + Port: ptr.To(gwapiv1.PortNumber(4317)), + }, + }, + }, + }, + }, + }, + }, + } testCases := []struct { name string @@ -487,6 +551,34 @@ func TestValidateServiceForReconcile(t *testing.T) { service: test.GetService(types.NamespacedName{Name: "service"}, nil, nil), expect: true, }, + { + name: "service referenced by EnvoyProxy updated", + configs: []client.Object{ + test.GetGatewayClass("test-mg", v1alpha1.GatewayControllerName, &test.GroupKindNamespacedName{ + Group: gwapiv1.Group(telemetryEnabledGatewaysConfig.GroupVersionKind().Group), + Kind: gwapiv1.Kind(telemetryEnabledGatewaysConfig.Kind), + Namespace: gwapiv1.Namespace(telemetryEnabledGatewaysConfig.Namespace), + Name: gwapiv1.ObjectName(telemetryEnabledGatewaysConfig.Name), + }), + telemetryEnabledGatewaysConfig, + }, + service: test.GetService(types.NamespacedName{Name: "otel-collector", Namespace: "default"}, nil, nil), + expect: true, + }, + { + name: "service referenced by EnvoyProxy unrelated", + configs: []client.Object{ + test.GetGatewayClass("test-mg", v1alpha1.GatewayControllerName, &test.GroupKindNamespacedName{ + Group: gwapiv1.Group(telemetryEnabledGatewaysConfig.GroupVersionKind().Group), + Kind: gwapiv1.Kind(telemetryEnabledGatewaysConfig.Kind), + Namespace: gwapiv1.Namespace(telemetryEnabledGatewaysConfig.Namespace), + Name: gwapiv1.ObjectName(telemetryEnabledGatewaysConfig.Name), + }), + telemetryEnabledGatewaysConfig, + }, + service: test.GetService(types.NamespacedName{Name: "otel-collector-unrelated", Namespace: "default"}, nil, nil), + expect: false, + }, { name: "service referenced by SecurityPolicy ExtAuth HTTP service", configs: []client.Object{ @@ -634,6 +726,7 @@ func TestValidateServiceForReconcile(t *testing.T) { WithIndex(&gwapiv1a2.UDPRoute{}, backendUDPRouteIndex, backendUDPRouteIndexFunc). WithIndex(&v1alpha1.SecurityPolicy{}, backendSecurityPolicyIndex, backendSecurityPolicyIndexFunc). WithIndex(&v1alpha1.EnvoyExtensionPolicy{}, backendEnvoyExtensionPolicyIndex, backendEnvoyExtensionPolicyIndexFunc). + WithIndex(&v1alpha1.EnvoyProxy{}, backendEnvoyProxyTelemetryIndex, backendEnvoyProxyTelemetryIndexFunc). Build() t.Run(tc.name, func(t *testing.T) { res := r.validateServiceForReconcile(tc.service)