Skip to content

Commit

Permalink
fix: avoid resources lock contention (#8172) (#20329)
Browse files Browse the repository at this point in the history
* fix: avoid resources lock contention

Signed-off-by: Mykola Pelekh <[email protected]>

* feat: allow enabling batch events processing

Signed-off-by: Mykola Pelekh <[email protected]>

* fix: update ParseDurationFromEnv to handle duration in ms

Signed-off-by: Mykola Pelekh <[email protected]>

* feat: make eventProcessingInterval option configurable (default is 0.1s)

Signed-off-by: Mykola Pelekh <[email protected]>

* use upstream gitops-engine

Signed-off-by: Michael Crenshaw <[email protected]>

---------

Signed-off-by: Mykola Pelekh <[email protected]>
Signed-off-by: Michael Crenshaw <[email protected]>
Co-authored-by: Michael Crenshaw <[email protected]>
  • Loading branch information
mpelekh and crenshaw-dev authored Dec 16, 2024
1 parent c090f84 commit dc3f40c
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 29 deletions.
20 changes: 20 additions & 0 deletions controller/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ const (
// EnvClusterCacheRetryUseBackoff is the env variable to control whether to use a backoff strategy with the retry during cluster cache sync
EnvClusterCacheRetryUseBackoff = "ARGOCD_CLUSTER_CACHE_RETRY_USE_BACKOFF"

// EnvClusterCacheBatchEventsProcessing is the env variable to control whether to enable batch events processing
EnvClusterCacheBatchEventsProcessing = "ARGOCD_CLUSTER_CACHE_BATCH_EVENTS_PROCESSING"

// EnvClusterCacheEventProcessingInterval is the env variable to control the interval between processing events when BatchEventsProcessing is enabled
EnvClusterCacheEventProcessingInterval = "ARGOCD_CLUSTER_CACHE_EVENT_PROCESSING_INTERVAL"

// AnnotationIgnoreResourceUpdates when set to true on an untracked resource,
// argo will apply `ignoreResourceUpdates` configuration on it.
AnnotationIgnoreResourceUpdates = "argocd.argoproj.io/ignore-resource-updates"
Expand Down Expand Up @@ -103,6 +109,12 @@ var (

// clusterCacheRetryUseBackoff specifies whether to use a backoff strategy on cluster cache sync, if retry is enabled
clusterCacheRetryUseBackoff bool = false

// clusterCacheBatchEventsProcessing specifies whether to enable batch events processing
clusterCacheBatchEventsProcessing bool = false

// clusterCacheEventProcessingInterval specifies the interval between processing events when BatchEventsProcessing is enabled
clusterCacheEventProcessingInterval = 100 * time.Millisecond
)

func init() {
Expand All @@ -114,6 +126,8 @@ func init() {
clusterCacheListSemaphoreSize = env.ParseInt64FromEnv(EnvClusterCacheListSemaphore, clusterCacheListSemaphoreSize, 0, math.MaxInt64)
clusterCacheAttemptLimit = int32(env.ParseNumFromEnv(EnvClusterCacheAttemptLimit, int(clusterCacheAttemptLimit), 1, math.MaxInt32))
clusterCacheRetryUseBackoff = env.ParseBoolFromEnv(EnvClusterCacheRetryUseBackoff, false)
clusterCacheBatchEventsProcessing = env.ParseBoolFromEnv(EnvClusterCacheBatchEventsProcessing, false)
clusterCacheEventProcessingInterval = env.ParseDurationFromEnv(EnvClusterCacheEventProcessingInterval, clusterCacheEventProcessingInterval, 0, math.MaxInt64)
}

type LiveStateCache interface {
Expand Down Expand Up @@ -554,6 +568,8 @@ func (c *liveStateCache) getCluster(server string) (clustercache.ClusterCache, e
clustercache.SetLogr(logutils.NewLogrusLogger(log.WithField("server", cluster.Server))),
clustercache.SetRetryOptions(clusterCacheAttemptLimit, clusterCacheRetryUseBackoff, isRetryableError),
clustercache.SetRespectRBAC(respectRBAC),
clustercache.SetBatchEventsProcessing(clusterCacheBatchEventsProcessing),
clustercache.SetEventProcessingInterval(clusterCacheEventProcessingInterval),
}

clusterCache = clustercache.NewClusterCache(clusterCacheConfig, clusterCacheOpts...)
Expand Down Expand Up @@ -608,6 +624,10 @@ func (c *liveStateCache) getCluster(server string) (clustercache.ClusterCache, e
c.metricsServer.IncClusterEventsCount(cluster.Server, gvk.Group, gvk.Kind)
})

_ = clusterCache.OnProcessEventsHandler(func(duration time.Duration, processedEventsNumber int) {
c.metricsServer.ObserveResourceEventsProcessingDuration(cluster.Server, duration, processedEventsNumber)
})

c.clusters[server] = clusterCache

return clusterCache, nil
Expand Down
72 changes: 50 additions & 22 deletions controller/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,20 @@ import (

type MetricsServer struct {
*http.Server
syncCounter *prometheus.CounterVec
kubectlExecCounter *prometheus.CounterVec
kubectlExecPendingGauge *prometheus.GaugeVec
orphanedResourcesGauge *prometheus.GaugeVec
k8sRequestCounter *prometheus.CounterVec
clusterEventsCounter *prometheus.CounterVec
redisRequestCounter *prometheus.CounterVec
reconcileHistogram *prometheus.HistogramVec
redisRequestHistogram *prometheus.HistogramVec
registry *prometheus.Registry
hostname string
cron *cron.Cron
syncCounter *prometheus.CounterVec
kubectlExecCounter *prometheus.CounterVec
kubectlExecPendingGauge *prometheus.GaugeVec
orphanedResourcesGauge *prometheus.GaugeVec
k8sRequestCounter *prometheus.CounterVec
clusterEventsCounter *prometheus.CounterVec
redisRequestCounter *prometheus.CounterVec
reconcileHistogram *prometheus.HistogramVec
redisRequestHistogram *prometheus.HistogramVec
resourceEventsProcessingHistogram *prometheus.HistogramVec
resourceEventsNumberGauge *prometheus.GaugeVec
registry *prometheus.Registry
hostname string
cron *cron.Cron
}

const (
Expand Down Expand Up @@ -153,6 +155,20 @@ var (
},
descAppDefaultLabels,
)

resourceEventsProcessingHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "argocd_resource_events_processing",
Help: "Time to process resource events in seconds.",
Buckets: []float64{0.25, .5, 1, 2, 4, 8, 16},
},
[]string{"server"},
)

resourceEventsNumberGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "argocd_resource_events_processed_in_batch",
Help: "Number of resource events processed in batch",
}, []string{"server"})
)

// NewMetricsServer returns a new prometheus server which collects application metrics
Expand Down Expand Up @@ -202,23 +218,27 @@ func NewMetricsServer(addr string, appLister applister.ApplicationLister, appFil
registry.MustRegister(clusterEventsCounter)
registry.MustRegister(redisRequestCounter)
registry.MustRegister(redisRequestHistogram)
registry.MustRegister(resourceEventsProcessingHistogram)
registry.MustRegister(resourceEventsNumberGauge)

return &MetricsServer{
registry: registry,
Server: &http.Server{
Addr: addr,
Handler: mux,
},
syncCounter: syncCounter,
k8sRequestCounter: k8sRequestCounter,
kubectlExecCounter: kubectlExecCounter,
kubectlExecPendingGauge: kubectlExecPendingGauge,
orphanedResourcesGauge: orphanedResourcesGauge,
reconcileHistogram: reconcileHistogram,
clusterEventsCounter: clusterEventsCounter,
redisRequestCounter: redisRequestCounter,
redisRequestHistogram: redisRequestHistogram,
hostname: hostname,
syncCounter: syncCounter,
k8sRequestCounter: k8sRequestCounter,
kubectlExecCounter: kubectlExecCounter,
kubectlExecPendingGauge: kubectlExecPendingGauge,
orphanedResourcesGauge: orphanedResourcesGauge,
reconcileHistogram: reconcileHistogram,
clusterEventsCounter: clusterEventsCounter,
redisRequestCounter: redisRequestCounter,
redisRequestHistogram: redisRequestHistogram,
resourceEventsProcessingHistogram: resourceEventsProcessingHistogram,
resourceEventsNumberGauge: resourceEventsNumberGauge,
hostname: hostname,
// This cron is used to expire the metrics cache.
// Currently clearing the metrics cache is logging and deleting from the map
// so there is no possibility of panic, but we will add a chain to keep robfig/cron v1 behavior.
Expand Down Expand Up @@ -284,6 +304,12 @@ func (m *MetricsServer) ObserveRedisRequestDuration(duration time.Duration) {
m.redisRequestHistogram.WithLabelValues(m.hostname, common.ApplicationController).Observe(duration.Seconds())
}

// ObserveResourceEventsProcessingDuration observes resource events processing duration
func (m *MetricsServer) ObserveResourceEventsProcessingDuration(server string, duration time.Duration, processedEventsNumber int) {
m.resourceEventsProcessingHistogram.WithLabelValues(server).Observe(duration.Seconds())
m.resourceEventsNumberGauge.WithLabelValues(server).Set(float64(processedEventsNumber))
}

// IncReconcile increments the reconcile counter for an application
func (m *MetricsServer) IncReconcile(app *argoappv1.Application, duration time.Duration) {
m.reconcileHistogram.WithLabelValues(app.Namespace, app.Spec.Destination.Server).Observe(duration.Seconds())
Expand Down Expand Up @@ -311,6 +337,8 @@ func (m *MetricsServer) SetExpiration(cacheExpiration time.Duration) error {
m.redisRequestCounter.Reset()
m.reconcileHistogram.Reset()
m.redisRequestHistogram.Reset()
m.resourceEventsProcessingHistogram.Reset()
m.resourceEventsNumberGauge.Reset()
})
if err != nil {
return err
Expand Down
9 changes: 9 additions & 0 deletions docs/operator-manual/high_availability.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@ stringData:
count (grouped by k8s api version, the granule of parallelism for list operations). In this case, all resources will
be buffered in memory -- no api server request will be blocked by processing.

* `ARGOCD_CLUSTER_CACHE_BATCH_EVENTS_PROCESSING` - environment variable that enables the controller to collect events
for Kubernetes resources and process them in a batch. This is useful when the cluster contains a large number of resources,
and the controller is overwhelmed by the number of events. The default value is `false`, which means that the controller
processes events one by one.

* `ARGOCD_CLUSTER_CACHE_BATCH_EVENTS_PROCESSING_INTERVAL` - environment variable controlling the interval for processing events in a batch.
The valid value is in the format of Go time duration string, e.g. `1ms`, `1s`, `1m`, `1h`. The default value is `100ms`.
The variable is used only when `ARGOCD_CLUSTER_CACHE_BATCH_EVENTS_PROCESSING` is set to `true`.

* `ARGOCD_APPLICATION_TREE_SHARD_SIZE` - environment variable controlling the max number of resources stored in one Redis
key. Splitting application tree into multiple keys helps to reduce the amount of traffic between the controller and Redis.
The default value is 0, which means that the application tree is stored in a single Redis key. The reasonable value is 100.
Expand Down
2 changes: 2 additions & 0 deletions docs/operator-manual/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ Metrics about applications. Scraped at the `argocd-metrics:8082/metrics` endpoin
| `argocd_kubectl_exec_total` | counter | Number of kubectl executions |
| `argocd_redis_request_duration` | histogram | Redis requests duration. |
| `argocd_redis_request_total` | counter | Number of redis requests executed during application reconciliation |
| `argocd_resource_events_processing` | histogram | Time to process resource events in batch in seconds |
| `argocd_resource_events_processed_in_batch` | gauge | Number of resource events processed in batch |

If you use Argo CD with many application and project creation and deletion,
the metrics page will keep in cache your application and project's history.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/TomOnTime/utfutil v0.0.0-20180511104225-09c41003ee1d
github.com/alicebob/miniredis/v2 v2.33.0
github.com/antonmedv/expr v1.15.1
github.com/argoproj/gitops-engine v0.7.1-0.20241211202847-8849c3f30c55
github.com/argoproj/gitops-engine v0.7.1-0.20241216155226-54992bf42431
github.com/argoproj/notifications-engine v0.4.1-0.20241007194503-2fef5c9049fd
github.com/argoproj/pkg v0.13.7-0.20230626144333-d56162821bd1
github.com/aws/aws-sdk-go v1.55.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ github.com/antonmedv/expr v1.15.1/go.mod h1:0E/6TxnOlRNp81GMzX9QfDPAmHo2Phg00y4J
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/appscode/go v0.0.0-20191119085241-0887d8ec2ecc/go.mod h1:OawnOmAL4ZX3YaPdN+8HTNwBveT1jMsqP74moa9XUbE=
github.com/argoproj/gitops-engine v0.7.1-0.20241211202847-8849c3f30c55 h1:GEyH0LY7gB/BKnahEl6pgNHc6EdjX308b090tn6odKc=
github.com/argoproj/gitops-engine v0.7.1-0.20241211202847-8849c3f30c55/go.mod h1:WsnykM8idYRUnneeT31cM/Fq/ZsjkefCbjiD8ioCJkU=
github.com/argoproj/gitops-engine v0.7.1-0.20241216155226-54992bf42431 h1:ku0Gzp1dHr7yn83B/xmMrmbB5sJbe32LXaYSDSBd6/c=
github.com/argoproj/gitops-engine v0.7.1-0.20241216155226-54992bf42431/go.mod h1:WsnykM8idYRUnneeT31cM/Fq/ZsjkefCbjiD8ioCJkU=
github.com/argoproj/notifications-engine v0.4.1-0.20241007194503-2fef5c9049fd h1:lOVVoK89j9Nd4+JYJiKAaMNYC1402C0jICROOfUPWn0=
github.com/argoproj/notifications-engine v0.4.1-0.20241007194503-2fef5c9049fd/go.mod h1:N0A4sEws2soZjEpY4hgZpQS8mRIEw6otzwfkgc3g9uQ=
github.com/argoproj/pkg v0.13.7-0.20230626144333-d56162821bd1 h1:qsHwwOJ21K2Ao0xPju1sNuqphyMnMYkyB3ZLoLtxWpo=
Expand Down
5 changes: 1 addition & 4 deletions util/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"strings"
"time"

timeutil "github.com/argoproj/pkg/time"

log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -133,13 +131,12 @@ func ParseDurationFromEnv(env string, defaultValue, min, max time.Duration) time
if str == "" {
return defaultValue
}
durPtr, err := timeutil.ParseDuration(str)
dur, err := time.ParseDuration(str)
if err != nil {
log.Warnf("Could not parse '%s' as a duration string from environment %s", str, env)
return defaultValue
}

dur := *durPtr
if dur < min {
log.Warnf("Value in %s is %s, which is less than minimum %s allowed", env, dur, min)
return defaultValue
Expand Down
4 changes: 4 additions & 0 deletions util/env/env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ func TestParseDurationFromEnv(t *testing.T) {
name: "ValidValueSet",
env: "2s",
expected: time.Second * 2,
}, {
name: "ValidValueSetMs",
env: "2500ms",
expected: time.Millisecond * 2500,
}, {
name: "MoreThanMaxSet",
env: "6s",
Expand Down

0 comments on commit dc3f40c

Please sign in to comment.