Skip to content

Commit

Permalink
notify leader election subscribers on leadership state change (#32323)
Browse files Browse the repository at this point in the history
  • Loading branch information
adel121 authored Dec 22, 2024
1 parent 7f64411 commit 847ecc3
Show file tree
Hide file tree
Showing 19 changed files with 169 additions and 111 deletions.
30 changes: 14 additions & 16 deletions cmd/cluster-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,13 +455,12 @@ func start(log log.Component,
if config.GetBool("admission_controller.enabled") {
if config.GetBool("admission_controller.auto_instrumentation.patcher.enabled") {
patchCtx := admissionpatch.ControllerContext{
IsLeaderFunc: le.IsLeader,
LeaderSubscribeFunc: le.Subscribe,
K8sClient: apiCl.Cl,
RcClient: rcClient,
ClusterName: clusterName,
ClusterID: clusterID,
StopCh: stopCh,
LeadershipStateSubscribeFunc: le.Subscribe,
K8sClient: apiCl.Cl,
RcClient: rcClient,
ClusterName: clusterName,
ClusterID: clusterID,
StopCh: stopCh,
}
if err := admissionpatch.StartControllers(patchCtx); err != nil {
log.Errorf("Cannot start auto instrumentation patcher: %v", err)
Expand All @@ -471,15 +470,14 @@ func start(log log.Component,
}

admissionCtx := admissionpkg.ControllerContext{
IsLeaderFunc: le.IsLeader,
LeaderSubscribeFunc: le.Subscribe,
SecretInformers: apiCl.CertificateSecretInformerFactory,
ValidatingInformers: apiCl.WebhookConfigInformerFactory,
MutatingInformers: apiCl.WebhookConfigInformerFactory,
Client: apiCl.Cl,
StopCh: stopCh,
ValidatingStopCh: validatingStopCh,
Demultiplexer: demultiplexer,
LeadershipStateSubscribeFunc: le.Subscribe,
SecretInformers: apiCl.CertificateSecretInformerFactory,
ValidatingInformers: apiCl.WebhookConfigInformerFactory,
MutatingInformers: apiCl.WebhookConfigInformerFactory,
Client: apiCl.Cl,
StopCh: stopCh,
ValidatingStopCh: validatingStopCh,
Demultiplexer: demultiplexer,
}

webhooks, err := admissionpkg.StartControllers(admissionCtx, wmeta, pa, datadogConfig)
Expand Down
32 changes: 17 additions & 15 deletions pkg/clusteragent/admission/controllers/secret/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,19 @@ import (
// Controller is responsible for creating and refreshing the Secret object
// that contains the certificate of the Admission Webhook.
type Controller struct {
clientSet kubernetes.Interface
secretsLister corelisters.SecretLister
secretsSynced cache.InformerSynced
config Config
dnsNames []string
dnsNamesDigest uint64
queue workqueue.TypedRateLimitingInterface[string]
isLeaderFunc func() bool
isLeaderNotif <-chan struct{}
clientSet kubernetes.Interface
secretsLister corelisters.SecretLister
secretsSynced cache.InformerSynced
config Config
dnsNames []string
dnsNamesDigest uint64
queue workqueue.TypedRateLimitingInterface[string]
isLeaderFunc func() bool
leadershipStateNotif <-chan struct{}
}

// NewController returns a new Secret Controller.
func NewController(client kubernetes.Interface, secretInformer coreinformers.SecretInformer, isLeaderFunc func() bool, isLeaderNotif <-chan struct{}, config Config) *Controller {
func NewController(client kubernetes.Interface, secretInformer coreinformers.SecretInformer, isLeaderFunc func() bool, leadershipStateNotif <-chan struct{}, config Config) *Controller {
dnsNames := generateDNSNames(config.GetNs(), config.GetSvc())
controller := &Controller{
clientSet: client,
Expand All @@ -59,8 +59,8 @@ func NewController(client kubernetes.Interface, secretInformer coreinformers.Sec
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{Name: "secrets"},
),
isLeaderFunc: isLeaderFunc,
isLeaderNotif: isLeaderNotif,
isLeaderFunc: isLeaderFunc,
leadershipStateNotif: leadershipStateNotif,
}
if _, err := secretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
Expand Down Expand Up @@ -101,9 +101,11 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
func (c *Controller) enqueueOnLeaderNotif(stop <-chan struct{}) {
for {
select {
case <-c.isLeaderNotif:
log.Infof("Got a leader notification, enqueuing a reconciliation for %s/%s", c.config.GetNs(), c.config.GetName())
c.triggerReconciliation()
case <-c.leadershipStateNotif:
if c.isLeaderFunc() {
log.Infof("Got a leader notification, enqueuing a reconciliation for %s/%s", c.config.GetNs(), c.config.GetName())
c.triggerReconciliation()
}
case <-stop:
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (f *fixture) run(stopCh <-chan struct{}) *Controller {
f.client,
factory.Core().V1().Secrets(),
func() bool { return true },
make(chan struct{}),
make(<-chan struct{}),
cfg,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ func NewController(
validatingInformers admissionregistration.Interface,
mutatingInformers admissionregistration.Interface,
isLeaderFunc func() bool,
isLeaderNotif <-chan struct{},
leadershipStateNotif <-chan struct{},
config Config,
wmeta workloadmeta.Component,
pa workload.PodPatcher,
datadogConfig config.Component,
demultiplexer demultiplexer.Component,
) Controller {
if config.useAdmissionV1() {
return NewControllerV1(client, secretInformer, validatingInformers.V1().ValidatingWebhookConfigurations(), mutatingInformers.V1().MutatingWebhookConfigurations(), isLeaderFunc, isLeaderNotif, config, wmeta, pa, datadogConfig, demultiplexer)
return NewControllerV1(client, secretInformer, validatingInformers.V1().ValidatingWebhookConfigurations(), mutatingInformers.V1().MutatingWebhookConfigurations(), isLeaderFunc, leadershipStateNotif, config, wmeta, pa, datadogConfig, demultiplexer)
}
return NewControllerV1beta1(client, secretInformer, validatingInformers.V1beta1().ValidatingWebhookConfigurations(), mutatingInformers.V1beta1().MutatingWebhookConfigurations(), isLeaderFunc, isLeaderNotif, config, wmeta, pa, datadogConfig, demultiplexer)
return NewControllerV1beta1(client, secretInformer, validatingInformers.V1beta1().ValidatingWebhookConfigurations(), mutatingInformers.V1beta1().MutatingWebhookConfigurations(), isLeaderFunc, leadershipStateNotif, config, wmeta, pa, datadogConfig, demultiplexer)
}

// Webhook represents an admission webhook
Expand Down Expand Up @@ -162,7 +162,7 @@ type controllerBase struct {
mutatingWebhooksSynced cache.InformerSynced //nolint:structcheck
queue workqueue.TypedRateLimitingInterface[string]
isLeaderFunc func() bool
isLeaderNotif <-chan struct{}
leadershipStateNotif <-chan struct{}
webhooks []Webhook
}

Expand All @@ -186,9 +186,11 @@ func (c *controllerBase) EnabledWebhooks() []Webhook {
func (c *controllerBase) enqueueOnLeaderNotif(stop <-chan struct{}) {
for {
select {
case <-c.isLeaderNotif:
log.Infof("Got a leader notification, enqueuing a reconciliation for %q", c.config.getWebhookName())
c.triggerReconciliation()
case <-c.leadershipStateNotif:
if c.isLeaderFunc() {
log.Infof("Got a leader notification, enqueuing a reconciliation for %q", c.config.getWebhookName())
c.triggerReconciliation()
}
case <-stop:
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestNewController(t *testing.T) {
factory.Admissionregistration(),
factory.Admissionregistration(),
func() bool { return true },
make(chan struct{}),
make(<-chan struct{}),
getV1Cfg(t),
wmeta,
nil,
Expand All @@ -52,7 +52,7 @@ func TestNewController(t *testing.T) {
factory.Admissionregistration(),
factory.Admissionregistration(),
func() bool { return true },
make(chan struct{}),
make(<-chan struct{}),
getV1beta1Cfg(t),
wmeta,
nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewControllerV1(
validatingWebhookInformer admissioninformers.ValidatingWebhookConfigurationInformer,
mutatingWebhookInformer admissioninformers.MutatingWebhookConfigurationInformer,
isLeaderFunc func() bool,
isLeaderNotif <-chan struct{},
leadershipStateNotif <-chan struct{},
config Config,
wmeta workloadmeta.Component,
pa workload.PodPatcher,
Expand All @@ -75,7 +75,7 @@ func NewControllerV1(
workqueue.TypedRateLimitingQueueConfig[string]{Name: "webhooks"},
)
controller.isLeaderFunc = isLeaderFunc
controller.isLeaderNotif = isLeaderNotif
controller.leadershipStateNotif = leadershipStateNotif
controller.webhooks = controller.generateWebhooks(wmeta, pa, datadogConfig, demultiplexer)
controller.generateTemplates()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,7 @@ func (f *fixtureV1) createController() (*ControllerV1, informers.SharedInformerF
factory.Admissionregistration().V1().ValidatingWebhookConfigurations(),
factory.Admissionregistration().V1().MutatingWebhookConfigurations(),
func() bool { return true },
make(chan struct{}),
make(<-chan struct{}),
getV1Cfg(f.t),
wmeta,
nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewControllerV1beta1(
validatingWebhookInformer admissioninformers.ValidatingWebhookConfigurationInformer,
mutatingWebhookInformer admissioninformers.MutatingWebhookConfigurationInformer,
isLeaderFunc func() bool,
isLeaderNotif <-chan struct{},
leadershipStateNotif <-chan struct{},
config Config,
wmeta workloadmeta.Component,
pa workload.PodPatcher,
Expand All @@ -76,7 +76,7 @@ func NewControllerV1beta1(
workqueue.TypedRateLimitingQueueConfig[string]{Name: "webhooks"},
)
controller.isLeaderFunc = isLeaderFunc
controller.isLeaderNotif = isLeaderNotif
controller.leadershipStateNotif = leadershipStateNotif
controller.webhooks = controller.generateWebhooks(wmeta, pa, datadogConfig, demultiplexer)
controller.generateTemplates()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,7 @@ func (f *fixtureV1beta1) createController() (*ControllerV1beta1, informers.Share
factory.Admissionregistration().V1beta1().ValidatingWebhookConfigurations(),
factory.Admissionregistration().V1beta1().MutatingWebhookConfigurations(),
func() bool { return true },
make(chan struct{}),
make(<-chan struct{}),
getV1beta1Cfg(f.t),
wmeta,
nil,
Expand Down
24 changes: 14 additions & 10 deletions pkg/clusteragent/admission/patch/file_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (
// filePatchProvider this is a stub and will be used for e2e testing only
type filePatchProvider struct {
file string
isLeaderNotif <-chan struct{}
leadershipStateNotif <-chan struct{}
isLeaderFunc func() bool
pollInterval time.Duration
subscribers map[TargetObjKind]chan Request
lastSuccessfulRefresh time.Time
Expand All @@ -27,13 +28,14 @@ type filePatchProvider struct {

var _ patchProvider = &filePatchProvider{}

func newfileProvider(file string, isLeaderNotif <-chan struct{}, clusterName string) *filePatchProvider {
func newfileProvider(file string, isLeaderFunc func() bool, leadershipStateNotif <-chan struct{}, clusterName string) *filePatchProvider {
return &filePatchProvider{
file: file,
isLeaderNotif: isLeaderNotif,
pollInterval: 15 * time.Second,
subscribers: make(map[TargetObjKind]chan Request),
clusterName: clusterName,
file: file,
leadershipStateNotif: leadershipStateNotif,
pollInterval: 15 * time.Second,
subscribers: make(map[TargetObjKind]chan Request),
clusterName: clusterName,
isLeaderFunc: isLeaderFunc,
}
}

Expand All @@ -49,9 +51,11 @@ func (fpp *filePatchProvider) start(stopCh <-chan struct{}) {
defer ticker.Stop()
for {
select {
case <-fpp.isLeaderNotif:
log.Info("Got a leader notification, polling from file")
fpp.process(true)
case <-fpp.leadershipStateNotif:
if fpp.isLeaderFunc() {
log.Info("Got a leader notification, polling from file")
fpp.process(true)
}
case <-ticker.C:
fpp.process(false)
case <-stopCh:
Expand Down
2 changes: 1 addition & 1 deletion pkg/clusteragent/admission/patch/file_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

func TestFileProviderProcess(t *testing.T) {
fpp := newfileProvider("testdata/auto-instru.json", make(chan struct{}), "dev")
fpp := newfileProvider("testdata/auto-instru.json", func() bool { return true }, make(<-chan struct{}), "dev")
notifs := fpp.subscribe(KindDeployment)
fpp.process(false)
require.Len(t, notifs, 1)
Expand Down
6 changes: 3 additions & 3 deletions pkg/clusteragent/admission/patch/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ type patchProvider interface {
subscribe(kind TargetObjKind) chan Request
}

func newPatchProvider(rcClient *rcclient.Client, isLeaderNotif <-chan struct{}, telemetryCollector telemetry.TelemetryCollector, clusterName string) (patchProvider, error) {
func newPatchProvider(rcClient *rcclient.Client, isLeaderFunc func() bool, leadershipStateNotif <-chan struct{}, telemetryCollector telemetry.TelemetryCollector, clusterName string) (patchProvider, error) {
if pkgconfigsetup.IsRemoteConfigEnabled(pkgconfigsetup.Datadog()) {
return newRemoteConfigProvider(rcClient, isLeaderNotif, telemetryCollector, clusterName)
return newRemoteConfigProvider(rcClient, isLeaderFunc, leadershipStateNotif, telemetryCollector, clusterName)
}
if pkgconfigsetup.Datadog().GetBool("admission_controller.auto_instrumentation.patcher.fallback_to_file_provider") {
// Use the file config provider for e2e testing only (it replaces RC as a source of configs)
file := pkgconfigsetup.Datadog().GetString("admission_controller.auto_instrumentation.patcher.file_provider_path")
return newfileProvider(file, isLeaderNotif, clusterName), nil
return newfileProvider(file, isLeaderFunc, leadershipStateNotif, clusterName), nil
}
return nil, errors.New("remote config is disabled")
}
32 changes: 18 additions & 14 deletions pkg/clusteragent/admission/patch/rc_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,27 @@ import (

// remoteConfigProvider consumes tracing configs from RC and delivers them to the patcher
type remoteConfigProvider struct {
client *rcclient.Client
isLeaderNotif <-chan struct{}
subscribers map[TargetObjKind]chan Request
clusterName string
telemetryCollector telemetry.TelemetryCollector
client *rcclient.Client
leadershipStateNotif <-chan struct{}
isLeaderFunc func() bool
subscribers map[TargetObjKind]chan Request
clusterName string
telemetryCollector telemetry.TelemetryCollector
}

var _ patchProvider = &remoteConfigProvider{}

func newRemoteConfigProvider(client *rcclient.Client, isLeaderNotif <-chan struct{}, telemetryCollector telemetry.TelemetryCollector, clusterName string) (*remoteConfigProvider, error) {
func newRemoteConfigProvider(client *rcclient.Client, isLeaderFunc func() bool, leadershipStateNotif <-chan struct{}, telemetryCollector telemetry.TelemetryCollector, clusterName string) (*remoteConfigProvider, error) {
if client == nil {
return nil, errors.New("remote config client not initialized")
}
return &remoteConfigProvider{
client: client,
isLeaderNotif: isLeaderNotif,
subscribers: make(map[TargetObjKind]chan Request),
clusterName: clusterName,
telemetryCollector: telemetryCollector,
client: client,
leadershipStateNotif: leadershipStateNotif,
subscribers: make(map[TargetObjKind]chan Request),
clusterName: clusterName,
telemetryCollector: telemetryCollector,
isLeaderFunc: isLeaderFunc,
}, nil
}

Expand All @@ -48,9 +50,11 @@ func (rcp *remoteConfigProvider) start(stopCh <-chan struct{}) {
rcp.client.Start()
for {
select {
case <-rcp.isLeaderNotif:
log.Info("Got a leader notification, polling from remote-config")
rcp.process(rcp.client.GetConfigs(state.ProductAPMTracing), rcp.client.UpdateApplyStatus)
case <-rcp.leadershipStateNotif:
if rcp.isLeaderFunc() {
log.Info("Got a leader notification, polling from remote-config")
rcp.process(rcp.client.GetConfigs(state.ProductAPMTracing), rcp.client.UpdateApplyStatus)
}
case <-stopCh:
log.Info("Shutting down remote-config patch provider")
rcp.client.Close()
Expand Down
4 changes: 2 additions & 2 deletions pkg/clusteragent/admission/patch/rc_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (
"testing"

"github.com/DataDog/datadog-agent/pkg/clusteragent/telemetry"

rcclient "github.com/DataDog/datadog-agent/pkg/config/remote/client"
"github.com/DataDog/datadog-agent/pkg/remoteconfig/state"

"github.com/stretchr/testify/require"
)

Expand All @@ -40,7 +40,7 @@ func TestProcess(t *testing.T) {
`
return []byte(fmt.Sprintf(base, cluster, kind))
}
rcp, err := newRemoteConfigProvider(&rcclient.Client{}, make(chan struct{}), telemetry.NewNoopCollector(), "dev")
rcp, err := newRemoteConfigProvider(&rcclient.Client{}, func() bool { return true }, make(<-chan struct{}), telemetry.NewNoopCollector(), "dev")
require.NoError(t, err)
notifs := rcp.subscribe(KindDeployment)
in := map[string]state.RawConfig{
Expand Down
Loading

0 comments on commit 847ecc3

Please sign in to comment.