Skip to content

Commit

Permalink
CrossNamespace: e2e test to verify event delivery for broker and trig…
Browse files Browse the repository at this point in the history
…ger (knative#7933)

* data plane changes

* moved global variables inside function

* updated other functions

* feature flag

* verifying events get delivered

* main test

* test for broker in different namespace

* changed to two environments

* updated namespace variables

* moved feature flag prerequisite

* fix global undefined problem

* updated creating namespace

* ctx variable name update

* getting rid of trigger goes ready since it's done in feature

* adding withbrokerref option for trigger install

* changing the way parameter is passed in

* refactor brokername parameter

* clean up unused variables

* added missing broker name

* deleted unused parameter

* enabling feature flag

* fixed argument

* cleaned up unused variable

* manually resolving URL for sending events

* use StartSenderToNamespacedResource

* Update code

* comments clean up

* Update go.mod

* Update go.sum

* Update modules.txt

* Update forwarder.go

* update codegen

* debug unit test

* comments clean up

* clean up unused variables

* undo changes to owners_aliases

* updated broker filter handler

* install broker

* install trigger and broker

* adding subscriber to triggerCfg

* added withbrokerref function

* updated filter handler checks

* updated broker in reportArgs

* updated broker handling in handleDispatchToSubscribeRequest

* updated brokerref in handleDispatchToReplyRequest

* updated WithBrokerRef fcn

* fix(test): the crossnamespace e2e test passes now (#1)

Signed-off-by: Calum Murray <[email protected]>

* fixed typo

* fix: feature flags are passed correctly (#2)

Signed-off-by: Calum Murray <[email protected]>

* Update test/rekt/crossnamespace_test.go

Co-authored-by: Calum Murray <[email protected]>

* Update test/rekt/features/broker/crossnamespace.go

Co-authored-by: Calum Murray <[email protected]>

* Update test/rekt/features/trigger/crossnamespace.go

Co-authored-by: Calum Murray <[email protected]>

* cleaned up comments

---------

Signed-off-by: Calum Murray <[email protected]>
Co-authored-by: Calum Murray <[email protected]>
  • Loading branch information
yijie-04 and Cali0707 authored Jul 9, 2024
1 parent 71d7e5e commit 8e7c775
Show file tree
Hide file tree
Showing 30 changed files with 356 additions and 95 deletions.
59 changes: 39 additions & 20 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,16 +227,24 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
}

func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eventingv1.Trigger, writer http.ResponseWriter, request *http.Request, event *event.Event) {
var brokerRef, brokerNamespace string
if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef.Namespace != "" {
brokerRef = trigger.Spec.BrokerRef.Name
brokerNamespace = trigger.Spec.BrokerRef.Namespace
var brokerName, brokerNamespace string
if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef != nil {
if trigger.Spec.BrokerRef.Name != "" {
brokerName = trigger.Spec.BrokerRef.Name
} else {
brokerName = trigger.Spec.Broker
}
if trigger.Spec.BrokerRef.Namespace != "" {
brokerNamespace = trigger.Spec.BrokerRef.Namespace
} else {
brokerNamespace = trigger.Namespace
}
} else {
brokerRef = trigger.Spec.Broker
brokerName = trigger.Spec.Broker
brokerNamespace = trigger.Namespace
}

broker, err := h.brokerLister.Brokers(brokerNamespace).Get(brokerRef)
broker, err := h.brokerLister.Brokers(brokerNamespace).Get(brokerName)
if err != nil {
h.logger.Info("Unable to get the Broker", zap.Error(err))
writer.WriteHeader(http.StatusBadRequest)
Expand All @@ -248,7 +256,7 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve
reportArgs := &ReportArgs{
ns: trigger.Namespace,
trigger: trigger.Name,
broker: brokerRef,
broker: brokerName,
requestType: "reply_forward",
}

Expand All @@ -265,16 +273,23 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve
}

func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *eventingv1.Trigger, writer http.ResponseWriter, request *http.Request, event *event.Event) {
var brokerRef, brokerNamespace string
if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef.Namespace != "" {
brokerRef = trigger.Spec.BrokerRef.Name
brokerNamespace = trigger.Spec.BrokerRef.Namespace
var brokerName, brokerNamespace string
if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef != nil {
if trigger.Spec.BrokerRef.Name != "" {
brokerName = trigger.Spec.BrokerRef.Name
} else {
brokerName = trigger.Spec.Broker
}
if trigger.Spec.BrokerRef.Namespace != "" {
brokerNamespace = trigger.Spec.BrokerRef.Namespace
} else {
brokerNamespace = trigger.Namespace
}
} else {
brokerRef = trigger.Spec.Broker
brokerName = trigger.Spec.Broker
brokerNamespace = trigger.Namespace
}

broker, err := h.brokerLister.Brokers(brokerNamespace).Get(brokerRef)
broker, err := h.brokerLister.Brokers(brokerNamespace).Get(brokerName)
if err != nil {
h.logger.Info("Unable to get the Broker", zap.Error(err))
writer.WriteHeader(http.StatusBadRequest)
Expand All @@ -299,7 +314,7 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event
reportArgs := &ReportArgs{
ns: trigger.Namespace,
trigger: trigger.Name,
broker: trigger.Spec.Broker,
broker: brokerName,
requestType: "dls_forward",
}

Expand All @@ -316,11 +331,15 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event
}

func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger *eventingv1.Trigger, writer http.ResponseWriter, request *http.Request, event *event.Event) {
var brokerRef string
if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef.Namespace != "" {
brokerRef = trigger.Spec.BrokerRef.Name
var brokerName string
if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef != nil {
if trigger.Spec.BrokerRef.Name != "" {
brokerName = trigger.Spec.BrokerRef.Name
} else {
brokerName = trigger.Spec.Broker
}
} else {
brokerRef = trigger.Spec.Broker
brokerName = trigger.Spec.Broker
}

triggerRef := types.NamespacedName{
Expand All @@ -346,7 +365,7 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger
reportArgs := &ReportArgs{
ns: trigger.Namespace,
trigger: trigger.Name,
broker: brokerRef,
broker: brokerName,
filterType: triggerFilterAttribute(trigger.Spec.Filter, "type"),
requestType: "filter",
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/reconciler/broker/trigger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func NewController(
FilterFunc: brokerFilter,
Handler: controller.HandleAll(func(obj interface{}) {
if broker, ok := obj.(*eventing.Broker); ok {
for _, t := range getTriggersForBroker(ctx, logger, triggerLister, broker) {
for _, t := range getTriggersForBroker(logger, triggerLister, broker, featureStore.Load()) {
impl.Enqueue(t)
}
}
Expand Down Expand Up @@ -163,6 +163,8 @@ func filterTriggers(featureStore *feature.Store, lister eventinglisters.BrokerLi
return false
}

var broker string
var brokerNamespace string
if featureStore.IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef != nil {
broker = trigger.Spec.BrokerRef.Name
brokerNamespace = trigger.Spec.BrokerRef.Namespace
Expand All @@ -185,7 +187,7 @@ func filterTriggers(featureStore *feature.Store, lister eventinglisters.BrokerLi
// the Triggers belonging to it. As there is no way to return failures in the
// Informers EventHandler, errors are logged, and an empty array is returned in case
// of failures.
func getTriggersForBroker(ctx context.Context, logger *zap.SugaredLogger, triggerLister eventinglisters.TriggerLister, broker *eventing.Broker) []*eventing.Trigger {
func getTriggersForBroker(logger *zap.SugaredLogger, triggerLister eventinglisters.TriggerLister, broker *eventing.Broker, features feature.Flags) []*eventing.Trigger {
r := make([]*eventing.Trigger, 0)
selector := labels.SelectorFromSet(map[string]string{apiseventing.BrokerLabelKey: broker.Name})
triggers, err := triggerLister.Triggers(metav1.NamespaceAll).List(selector)
Expand All @@ -194,10 +196,9 @@ func getTriggersForBroker(ctx context.Context, logger *zap.SugaredLogger, trigge
return r
}
for _, t := range triggers {
if feature.FromContext(ctx).IsCrossNamespaceEventLinks() && t.Spec.BrokerRef != nil && t.Spec.BrokerRef.Namespace == broker.Namespace {
if features.IsCrossNamespaceEventLinks() && t.Spec.BrokerRef != nil && t.Spec.BrokerRef.Namespace == broker.Namespace {
r = append(r, t)
}
if t.Namespace == broker.Namespace {
} else if t.Namespace == broker.Namespace {
r = append(r, t)
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/reconciler/broker/trigger/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,8 @@ func TestGetTriggersForBroker(t *testing.T) {
ls := testingv1.NewListers(tt.in)
logger := logtesting.TestLogger(t)
triggerLister := ls.GetTriggerLister()
ctx := feature.ToContext(context.TODO(), feature.FromContextOrDefaults(context.TODO()))
triggers := getTriggersForBroker(ctx, logger, triggerLister, ReadyBroker())
flags := feature.FromContextOrDefaults(context.TODO())
triggers := getTriggersForBroker(logger, triggerLister, ReadyBroker(), flags)
var found []string
for _, want := range tt.out {
for _, got := range triggers {
Expand Down Expand Up @@ -348,8 +348,8 @@ func (failer *TriggerNamespaceListerFailer) Get(name string) (*eventing.Trigger,
func TestListFailure(t *testing.T) {
logger := logtesting.TestLogger(t)
triggerListerFailer := &TriggerListerFailer{}
ctx := feature.ToContext(context.TODO(), feature.FromContextOrDefaults(context.TODO()))
if len(getTriggersForBroker(ctx, logger, triggerListerFailer, ReadyBroker())) != 0 {
flags := feature.FromContextOrDefaults(context.TODO())
if len(getTriggersForBroker(logger, triggerListerFailer, ReadyBroker(), flags)) != 0 {
t.Fatalf("Got back triggers when not expecting any")
}
}
4 changes: 2 additions & 2 deletions pkg/reconciler/broker/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ import (
)

var brokerGVK = eventingv1.SchemeGroupVersion.WithKind("Broker")
var brokerNamespace string
var broker string

const (
// Name of the corev1.Events emitted from the Trigger reconciliation process.
Expand Down Expand Up @@ -91,6 +89,8 @@ type Reconciler struct {
func (r *Reconciler) ReconcileKind(ctx context.Context, t *eventingv1.Trigger) pkgreconciler.Event {
logging.FromContext(ctx).Infow("Reconciling", zap.Any("Trigger", t))

var broker string
var brokerNamespace string
if t.Spec.BrokerRef != nil && feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) {
broker = t.Spec.BrokerRef.Name
brokerNamespace = t.Spec.BrokerRef.Namespace
Expand Down
5 changes: 4 additions & 1 deletion pkg/reconciler/sugar/trigger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func NewController(
triggerInformer := trigger.Get(ctx)
brokerInformer := broker.Get(ctx)

featureStore := feature.NewStore(logging.FromContext(ctx).Named("config-features"))
featureStore.WatchConfigs(cmw)

r := &Reconciler{
eventingClientSet: eventingclient.Get(ctx),
brokerLister: brokerInformer.Lister(),
Expand All @@ -73,7 +76,7 @@ func NewController(
return
}
for _, t := range triggers {
if feature.FromContext(ctx).IsCrossNamespaceEventLinks() && t.Spec.BrokerRef != nil && t.Spec.BrokerRef.Namespace == b.Namespace {
if featureStore.Load().IsCrossNamespaceEventLinks() && t.Spec.BrokerRef != nil && t.Spec.BrokerRef.Namespace == b.Namespace {
impl.Enqueue(t)
} else if t.Namespace == b.Namespace {
impl.Enqueue(t)
Expand Down
6 changes: 6 additions & 0 deletions pkg/reconciler/sugar/trigger/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ func TestNew(t *testing.T) {
"_example": "test-config",
},
},
&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-features",
Namespace: "knative-eventing",
},
},
))

if c == nil {
Expand Down
2 changes: 1 addition & 1 deletion test/config/config-features.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ data:

# ALPHA feature: The cross-namespace-event-links flag allows you to use cross-namespace referencing for Eventing.
# For more details: https://github.com/knative/eventing/issues/7739
cross-namespace-event-links: "disabled"
cross-namespace-event-links: "enabled"

# ALPHA feature: The new-apiserversource-filters flag allows you to use the new `filters` field
# in APIServerSource objects with its rich filtering capabilities.
Expand Down
2 changes: 1 addition & 1 deletion test/e2e-common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ function scale_controlplane() {

function create_knsubscribe_rolebinding() {
kubectl delete clusterrolebinding knsubscribe-test-rb --ignore-not-found=true
kubectl create clusterrolebinding knsubscribe-test-rb --user=$(kubectl auth whoami -ojson | jq .status.userInfo.username -r) --clusterrole=crossnamespace=subscriber
kubectl create clusterrolebinding knsubscribe-test-rb --user=$(kubectl auth whoami -ojson | jq .status.userInfo.username -r) --clusterrole=crossnamespace-subscriber
}

# Install Knative Monitoring in the current cluster.
Expand Down
6 changes: 3 additions & 3 deletions test/experimental/features/eventtype_autocreate/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func AutoCreateEventTypesOnBroker(brokerName string) *feature.Feature {
sink := feature.MakeRandomK8sName("sink")

f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver))
f.Setup("install subscription", trigger.Install(triggerName, brokerName, trigger.WithSubscriber(service.AsKReference(sink), "")))
f.Setup("install subscription", trigger.Install(triggerName, trigger.WithBrokerName(brokerName), trigger.WithSubscriber(service.AsKReference(sink), "")))

f.Setup("trigger is ready", trigger.IsReady(triggerName))
f.Setup("broker is addressable", k8s.IsAddressable(broker.GVR(), brokerName))
Expand Down Expand Up @@ -158,7 +158,7 @@ func AutoCreateEventTypesOnTrigger(brokerName string) *feature.Feature {
replyData := ""

f.Setup("install sink", eventshub.Install(sink, eventshub.ReplyWithTransformedEvent(replyType, replySource, replyData), eventshub.StartReceiver))
f.Setup("install trigger", trigger.Install(triggerName, brokerName, trigger.WithSubscriberFromDestination(service.AsDestinationRef(sink)), trigger.WithFilter(map[string]string{
f.Setup("install trigger", trigger.Install(triggerName, trigger.WithBrokerName(brokerName), trigger.WithSubscriberFromDestination(service.AsDestinationRef(sink)), trigger.WithFilter(map[string]string{
"type": event.Type(),
})))

Expand Down Expand Up @@ -194,7 +194,7 @@ func AutoCreateEventTypeEventsFromPingSource() *feature.Feature {
f.Setup("broker is ready", broker.IsReady(brokerName))
f.Setup("broker is addressable", broker.IsAddressable(brokerName))
f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver))
f.Setup("install trigger", trigger.Install(via, brokerName, trigger.WithSubscriber(service.AsKReference(sink), "")))
f.Setup("install trigger", trigger.Install(via, trigger.WithBrokerName(brokerName), trigger.WithSubscriber(service.AsKReference(sink), "")))
f.Setup("trigger goes ready", trigger.IsReady(via))

f.Requirement("install pingsource", func(ctx context.Context, t feature.T) {
Expand Down
54 changes: 54 additions & 0 deletions test/rekt/crossnamespace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//go:build e2e
// +build e2e

/*
Copyright 2024 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package rekt

import (
"testing"

"knative.dev/pkg/system"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/k8s"
"knative.dev/reconciler-test/pkg/knative"

"knative.dev/eventing/test/rekt/features/trigger"
)

func TestBrokerTriggerCrossNamespaceReference(t *testing.T) {
t.Parallel()

brokerEnvCtx, _ := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
)

triggerEnvCtx, triggerEnv := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
)

// brokerEnv.Test(brokerEnvCtx, t, broker.GoesReady(brokerName))
triggerEnv.Test(triggerEnvCtx, t, trigger.CrossNamespaceEventLinks(brokerEnvCtx))
}
4 changes: 2 additions & 2 deletions test/rekt/features/apiserversource/data_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func SendsEventsWithEventTypes() *feature.Feature {
f.Setup("broker is ready", broker.IsReady(brokerName))
f.Setup("broker is addressable", broker.IsAddressable(brokerName))
f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver))
f.Setup("install trigger", trigger.Install(via, brokerName, trigger.WithSubscriber(service.AsKReference(sink), "")))
f.Setup("install trigger", trigger.Install(via, trigger.WithBrokerName(brokerName), trigger.WithSubscriber(service.AsKReference(sink), "")))
f.Setup("trigger goes ready", trigger.IsReady(via))

sacmName := feature.MakeRandomK8sName("apiserversource")
Expand Down Expand Up @@ -888,7 +888,7 @@ func SendsEventsWithBrokerAsSinkTLS() *feature.Feature {
f.Setup("install trigger", func(ctx context.Context, t feature.T) {
d := service.AsDestinationRef(sinkName)
d.CACerts = eventshub.GetCaCerts(ctx)
trigger.Install(triggerName, brokerName, trigger.WithSubscriberFromDestination(d))(ctx, t)
trigger.Install(triggerName, trigger.WithBrokerName(brokerName), trigger.WithSubscriberFromDestination(d))(ctx, t)
})
f.Setup("Wait for Trigger to become ready", trigger.IsReady(triggerName))

Expand Down
10 changes: 5 additions & 5 deletions test/rekt/features/broker/control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func ControlPlaneTrigger_GivenBroker(brokerName string) *feature.Feature {
service.WithSelectors(map[string]string{"bad": "svc"})))

triggerName := feature.MakeRandomK8sName("trigger")
f.Setup("Create a Trigger", triggerresources.Install(triggerName, brokerName,
f.Setup("Create a Trigger", triggerresources.Install(triggerName, triggerresources.WithBrokerName(brokerName),
triggerresources.WithSubscriber(service.AsKReference(subscriberName), ""),
))

Expand Down Expand Up @@ -142,7 +142,7 @@ func ControlPlaneTrigger_GivenBrokerTriggerReady(brokerName string) *feature.Fea
service.WithSelectors(map[string]string{"bad": "svc"})))

triggerName := feature.MakeRandomK8sName("trigger")
f.Setup("Create a Trigger", triggerresources.Install(triggerName, brokerName,
f.Setup("Create a Trigger", triggerresources.Install(triggerName, triggerresources.WithBrokerName(brokerName),
triggerresources.WithSubscriber(service.AsKReference(subscriberName), ""),
))

Expand All @@ -167,7 +167,7 @@ func ControlPlaneTrigger_WithBrokerLifecycle(brokerOpts ...manifest.CfgFn) *feat
brokerName := feature.MakeRandomK8sName("broker")

triggerName := feature.MakeRandomK8sName("trigger")
f.Setup("Create a Trigger", triggerresources.Install(triggerName, brokerName,
f.Setup("Create a Trigger", triggerresources.Install(triggerName, triggerresources.WithBrokerName(brokerName),
triggerresources.WithSubscriber(service.AsKReference(subscriberName), ""),
))

Expand Down Expand Up @@ -211,7 +211,7 @@ func ControlPlaneTrigger_WithValidFilters(brokerName string) *feature.Feature {
}

triggerName := feature.MakeRandomK8sName("trigger")
f.Setup("Create a Trigger", triggerresources.Install(triggerName, brokerName,
f.Setup("Create a Trigger", triggerresources.Install(triggerName, triggerresources.WithBrokerName(brokerName),
triggerresources.WithSubscriber(service.AsKReference(subscriberName), ""),
triggerresources.WithFilter(filters),
))
Expand Down Expand Up @@ -261,7 +261,7 @@ func ControlPlaneTrigger_WithInvalidFilters(brokerName string) *feature.Feature
}

triggerName := feature.MakeRandomK8sName("trigger")
f.Setup("Create a Trigger", triggerresources.Install(triggerName, brokerName,
f.Setup("Create a Trigger", triggerresources.Install(triggerName, triggerresources.WithBrokerName(brokerName),
triggerresources.WithSubscriber(service.AsKReference(subscriberName), ""),
))

Expand Down
Loading

0 comments on commit 8e7c775

Please sign in to comment.