Skip to content

Commit

Permalink
[release-1.13] fix: triggers use correct consumer group id generated …
Browse files Browse the repository at this point in the history
…from template (#3779) (#3800)

* fix: triggers use correct consumer group id generated from template (#3779)

* add rekt test to catch trigger cg being wrong

Signed-off-by: Calum Murray <[email protected]>

* fix cm key name

Signed-off-by: Calum Murray <[email protected]>

* fix: trigger controller now watches for changes to the config-kafka-features cm

Signed-off-by: Calum Murray <[email protected]>

* fixed unit test

Signed-off-by: Calum Murray <[email protected]>

* also watch cm for namespaced triggers

Signed-off-by: Calum Murray <[email protected]>

* goimports

Signed-off-by: Calum Murray <[email protected]>

* don't use manifest.InstallYamlFS because it deletes the config-kafka-features cm after the test

Signed-off-by: Calum Murray <[email protected]>

---------

Signed-off-by: Calum Murray <[email protected]>

* cleanup: consumergroup template test does not modify configmap now (#3782)

* cleanup: consumergroup template test does not modify configmap now

Signed-off-by: Calum Murray <[email protected]>

* account for different possible cm keys

Signed-off-by: Calum Murray <[email protected]>

---------

Signed-off-by: Calum Murray <[email protected]>

---------

Signed-off-by: Calum Murray <[email protected]>
  • Loading branch information
Cali0707 authored Apr 3, 2024
1 parent 9acb72f commit 2792efb
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 2 deletions.
15 changes: 15 additions & 0 deletions control-plane/pkg/reconciler/trigger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,21 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
impl.FilteredGlobalResync(filterTriggers(reconciler.BrokerLister, kafka.BrokerClass, FinalizerName), triggerInformer.Informer())
}

kafkaConfigStore := apisconfig.NewStore(ctx, func(name string, value *apisconfig.KafkaFeatureFlags) {
reconciler.KafkaFeatureFlags.Reset(value)
if globalResync != nil {
globalResync(nil)
}
})
kafkaConfigStore.WatchConfigs(watcher)

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
if globalResync != nil {
globalResync(nil)
}
})
featureStore.WatchConfigs(watcher)

configmapInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithNameAndNamespace(configs.DataPlaneConfigMapNamespace, configs.ContractConfigMapName),
Handler: cache.ResourceEventHandlerFuncs{
Expand Down
4 changes: 4 additions & 0 deletions control-plane/pkg/reconciler/trigger/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func TestNewController(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "config-features",
},
}, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-kafka-features",
},
}), &config.Env{})
if controller == nil {
t.Error("failed to create controller: <nil>")
Expand Down
17 changes: 17 additions & 0 deletions control-plane/pkg/reconciler/trigger/namespaced_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

"github.com/IBM/sarama"
apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/offset"

"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -81,6 +82,7 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con
NewKafkaClient: sarama.NewClient,
NewKafkaClusterAdminClient: sarama.NewClusterAdmin,
InitOffsetsFunc: offset.InitOffsets,
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
}

impl := triggerreconciler.NewImpl(ctx, reconciler, func(impl *controller.Impl) controller.Options {
Expand Down Expand Up @@ -111,6 +113,21 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con
impl.GlobalResync(brokerInformer.Informer())
}

kafkaConfigStore := apisconfig.NewStore(ctx, func(name string, value *apisconfig.KafkaFeatureFlags) {
reconciler.KafkaFeatureFlags.Reset(value)
if globalResync != nil {
globalResync(nil)
}
})
kafkaConfigStore.WatchConfigs(watcher)

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
if globalResync != nil {
globalResync(nil)
}
})
featureStore.WatchConfigs(watcher)

configmapInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: kafka.FilterWithLabel(kafka.NamespacedBrokerDataplaneLabelKey, kafka.NamespacedBrokerDataplaneLabelValue),
Handler: cache.ResourceEventHandlerFuncs{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func TestNewNamespacedController(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "config-features",
},
}, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-kafka-features",
},
}), &config.Env{})
if controller == nil {
t.Error("failed to create controller: <nil>")
Expand Down
3 changes: 2 additions & 1 deletion control-plane/pkg/reconciler/trigger/namespaced_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type NamespacedReconciler struct {
NewKafkaClusterAdminClient kafka.NewClusterAdminClientFunc
NewKafkaClient kafka.NewClientFunc
InitOffsetsFunc kafka.InitOffsetsFunc
KafkaFeatureFlags *apisconfig.KafkaFeatureFlags
}

func (r *NamespacedReconciler) ReconcileKind(ctx context.Context, trigger *eventing.Trigger) reconciler.Event {
Expand Down Expand Up @@ -89,7 +90,7 @@ func (r *NamespacedReconciler) createReconcilerForTriggerInstance(trigger *event
// override
BrokerClass: kafka.NamespacedBrokerClass,
DataPlaneConfigMapLabeler: kafka.NamespacedDataplaneLabelConfigmapOption,
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
KafkaFeatureFlags: r.KafkaFeatureFlags,
NewKafkaClusterAdminClient: r.NewKafkaClusterAdminClient,
NewKafkaClient: r.NewKafkaClient,
InitOffsetsFunc: r.InitOffsetsFunc,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
triggerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/trigger"
reconcilertesting "knative.dev/eventing/pkg/reconciler/testing/v1"

apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"
kafkatesting "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/testing"
Expand Down Expand Up @@ -196,6 +197,7 @@ func useNamespacedTable(t *testing.T, table TableTest, env *config.Env) {
T: t,
}, nil
},
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
}

reconciler.Resolver = resolver.NewURIResolverFromTracker(ctx, tracker.New(func(name types.NamespacedName) {}, 0))
Expand Down
26 changes: 26 additions & 0 deletions test/config-kafka-features/new-cg-id.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright 2024 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: v1
kind: ConfigMap
metadata:
name: config-kafka-features
namespace: knative-eventing
data:
dispatcher-rate-limiter: "disabled"
dispatcher-ordered-executor-metrics: "disabled"
controller-autoscaler-keda: "disabled"
triggers-consumergroup-template: "test-{{ .Namespace }}-{{ .Name }}-trigger"
brokers-topic-template: "knative-broker-{{ .Namespace }}-{{ .Name }}"
channels-topic-template: "knative-messaging-kafka.{{ .Namespace }}.{{ .Name }}"
26 changes: 26 additions & 0 deletions test/config-kafka-features/restore-cg-id.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright 2024 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: v1
kind: ConfigMap
metadata:
name: config-kafka-features
namespace: knative-eventing
data:
dispatcher-rate-limiter: "disabled"
dispatcher-ordered-executor-metrics: "disabled"
controller-autoscaler-keda: "disabled"
triggers-consumergroup-template: "knative-trigger-{{ .Namespace }}-{{ .Name }}"
brokers-topic-template: "knative-broker-{{ .Namespace }}-{{ .Name }}"
channels-topic-template: "knative-messaging-kafka.{{ .Namespace }}.{{ .Name }}"
12 changes: 12 additions & 0 deletions test/e2e_new/broker_sasl_ssl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ func TestRestrictedBrokerAuthSslSaslScram512(t *testing.T) {
env.Test(ctx, t, features.SetupBrokerAuthRestrictedSslSaslScram512(ctx))
}

func TestTriggerUsesConsumerGroupIDFromTemplate(t *testing.T) {
ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
)

env.Test(ctx, t, features.TriggerUsesConsumerGroupIDTemplate())
}

func TestBrokerNotReadyWithoutAuthSecret(t *testing.T) {
t.Parallel()

Expand Down
8 changes: 8 additions & 0 deletions test/reconciler-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ go_test_e2e -tags=e2e,cloudevents -timeout=1h ./test/e2e_new_channel/... || fail

go_test_e2e -tags=deletecm ./test/e2e_new/... || fail_test "E2E (new deletecm) suite failed"

echo "Running E2E Reconciler tests with consumergroup id template changed"

kubectl apply -f "$(dirname "$0")/config-kafka-features/new-cg-id.yaml"

go_test_e2e -tags=e2e -timeout=15m ./test/e2e_new -run TestTriggerUsesConsumerGroupIDFromTemplate

kubectl apply -f "$(dirname "$0")/config-kafka-features/restore-cg-id.yaml"

echo "Running E2E Reconciler Tests with strict transport encryption"

kubectl apply -Rf "$(dirname "$0")/config-transport-encryption"
Expand Down
82 changes: 81 additions & 1 deletion test/rekt/features/broker_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,34 @@
package features

import (
"bytes"
"context"
"text/template"
"time"

kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/system"

"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/resources/service"

"github.com/cloudevents/sdk-go/v2/test"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/google/uuid"
testpkg "knative.dev/eventing-kafka-broker/test/pkg"
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkaauthsecret"

triggersclient "knative.dev/eventing/pkg/client/injection/client"
"knative.dev/eventing/test/rekt/resources/broker"
"knative.dev/eventing/test/rekt/resources/trigger"

"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/eventshub/assert"
"knative.dev/reconciler-test/pkg/feature"
"knative.dev/reconciler-test/pkg/manifest"

"knative.dev/reconciler-test/resources/svc"

brokerconfigmap "knative.dev/eventing-kafka-broker/test/rekt/resources/configmap/broker"
Expand Down Expand Up @@ -142,3 +153,72 @@ func BrokerNotReadyWithoutAuthSecret() *feature.Feature {

return f
}

func TriggerUsesConsumerGroupIDTemplate() *feature.Feature {
f := feature.NewFeature()

brokerName := feature.MakeRandomK8sName("broker")
triggerName := feature.MakeRandomK8sName("trigger")
sinkName := feature.MakeRandomK8sName("sink")

f.Setup("install broker", broker.Install(brokerName))
f.Setup("install sink", eventshub.Install(sinkName, eventshub.StartReceiver))

f.Setup("broker is ready", broker.IsReady(brokerName))
f.Setup("broker is addressable", broker.IsAddressable(brokerName))

f.Requirement("install trigger", trigger.Install(triggerName, brokerName, trigger.WithSubscriber(service.AsKReference(sinkName), "")))
f.Requirement("trigger is ready", trigger.IsReady(triggerName))

// check that the trigger has the correct annotation
f.Assert("trigger has correct consumergroup template", checkTriggerConsumerGroupIDAnnotation(triggerName))

return f
}

func checkTriggerConsumerGroupIDAnnotation(triggerName string) feature.StepFn {
return func(ctx context.Context, t feature.T) {
ns := environment.FromContext(ctx).Namespace()

cm, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(ctx, "config-kafka-features", metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}

expectedTemplateStr, ok := cm.Data["triggers-consumergroup-template"]
if !ok {
// there are two keys the value could be in
expectedTemplateStr, ok = cm.Data["triggers.consumergroup.template"]
if !ok {
t.Fatal("no consumergroup template in config-kafka-features")
}
}

expectedTemplate, err := template.New("consumergroup-id").Parse(expectedTemplateStr)
if err != nil {
t.Fatal(err)
}

trig, err := triggersclient.Get(ctx).EventingV1().Triggers(ns).Get(ctx, triggerName, metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}

var expectedBytes bytes.Buffer
err = expectedTemplate.Execute(&expectedBytes, trig.ObjectMeta)
if err != nil {
t.Fatal(err)
}

expectedAnnotation := expectedBytes.String()

cgAnnotation, ok := trig.Status.Annotations[kafka.GroupIdAnnotation]
if !ok {
t.Fatal("no consumer group annotation present on the trigger")
}

if cgAnnotation != expectedAnnotation {
t.Fatalf("consumer group id annotation was not equal to expected value. expected %s, got %s", expectedAnnotation, cgAnnotation)
}
}
}

0 comments on commit 2792efb

Please sign in to comment.