diff --git a/controller/cache/cache.go b/controller/cache/cache.go index b17afbca5234b..266c6ab0fd6cc 100644 --- a/controller/cache/cache.go +++ b/controller/cache/cache.go @@ -69,6 +69,12 @@ const ( // EnvClusterCacheRetryUseBackoff is the env variable to control whether to use a backoff strategy with the retry during cluster cache sync EnvClusterCacheRetryUseBackoff = "ARGOCD_CLUSTER_CACHE_RETRY_USE_BACKOFF" + // EnvClusterCacheBatchEventsProcessing is the env variable to control whether to enable batch events processing + EnvClusterCacheBatchEventsProcessing = "ARGOCD_CLUSTER_CACHE_BATCH_EVENTS_PROCESSING" + + // EnvClusterCacheEventProcessingInterval is the env variable to control the interval between processing events when BatchEventsProcessing is enabled + EnvClusterCacheEventProcessingInterval = "ARGOCD_CLUSTER_CACHE_EVENT_PROCESSING_INTERVAL" + // AnnotationIgnoreResourceUpdates when set to true on an untracked resource, // argo will apply `ignoreResourceUpdates` configuration on it. AnnotationIgnoreResourceUpdates = "argocd.argoproj.io/ignore-resource-updates" @@ -103,6 +109,12 @@ var ( // clusterCacheRetryUseBackoff specifies whether to use a backoff strategy on cluster cache sync, if retry is enabled clusterCacheRetryUseBackoff bool = false + + // clusterCacheBatchEventsProcessing specifies whether to enable batch events processing + clusterCacheBatchEventsProcessing bool = false + + // clusterCacheEventProcessingInterval specifies the interval between processing events when BatchEventsProcessing is enabled + clusterCacheEventProcessingInterval = 100 * time.Millisecond ) func init() { @@ -114,6 +126,8 @@ func init() { clusterCacheListSemaphoreSize = env.ParseInt64FromEnv(EnvClusterCacheListSemaphore, clusterCacheListSemaphoreSize, 0, math.MaxInt64) clusterCacheAttemptLimit = int32(env.ParseNumFromEnv(EnvClusterCacheAttemptLimit, int(clusterCacheAttemptLimit), 1, math.MaxInt32)) clusterCacheRetryUseBackoff = env.ParseBoolFromEnv(EnvClusterCacheRetryUseBackoff, false) + clusterCacheBatchEventsProcessing = env.ParseBoolFromEnv(EnvClusterCacheBatchEventsProcessing, false) + clusterCacheEventProcessingInterval = env.ParseDurationFromEnv(EnvClusterCacheEventProcessingInterval, clusterCacheEventProcessingInterval, 0, math.MaxInt64) } type LiveStateCache interface { @@ -554,6 +568,8 @@ func (c *liveStateCache) getCluster(server string) (clustercache.ClusterCache, e clustercache.SetLogr(logutils.NewLogrusLogger(log.WithField("server", cluster.Server))), clustercache.SetRetryOptions(clusterCacheAttemptLimit, clusterCacheRetryUseBackoff, isRetryableError), clustercache.SetRespectRBAC(respectRBAC), + clustercache.SetBatchEventsProcessing(clusterCacheBatchEventsProcessing), + clustercache.SetEventProcessingInterval(clusterCacheEventProcessingInterval), } clusterCache = clustercache.NewClusterCache(clusterCacheConfig, clusterCacheOpts...) @@ -608,6 +624,10 @@ func (c *liveStateCache) getCluster(server string) (clustercache.ClusterCache, e c.metricsServer.IncClusterEventsCount(cluster.Server, gvk.Group, gvk.Kind) }) + _ = clusterCache.OnProcessEventsHandler(func(duration time.Duration, processedEventsNumber int) { + c.metricsServer.ObserveResourceEventsProcessingDuration(cluster.Server, duration, processedEventsNumber) + }) + c.clusters[server] = clusterCache return clusterCache, nil diff --git a/controller/metrics/metrics.go b/controller/metrics/metrics.go index b8674cd231b09..70339499e0a13 100644 --- a/controller/metrics/metrics.go +++ b/controller/metrics/metrics.go @@ -30,18 +30,20 @@ import ( type MetricsServer struct { *http.Server - syncCounter *prometheus.CounterVec - kubectlExecCounter *prometheus.CounterVec - kubectlExecPendingGauge *prometheus.GaugeVec - orphanedResourcesGauge *prometheus.GaugeVec - k8sRequestCounter *prometheus.CounterVec - clusterEventsCounter *prometheus.CounterVec - redisRequestCounter *prometheus.CounterVec - reconcileHistogram *prometheus.HistogramVec - redisRequestHistogram *prometheus.HistogramVec - registry *prometheus.Registry - hostname string - cron *cron.Cron + syncCounter *prometheus.CounterVec + kubectlExecCounter *prometheus.CounterVec + kubectlExecPendingGauge *prometheus.GaugeVec + orphanedResourcesGauge *prometheus.GaugeVec + k8sRequestCounter *prometheus.CounterVec + clusterEventsCounter *prometheus.CounterVec + redisRequestCounter *prometheus.CounterVec + reconcileHistogram *prometheus.HistogramVec + redisRequestHistogram *prometheus.HistogramVec + resourceEventsProcessingHistogram *prometheus.HistogramVec + resourceEventsNumberGauge *prometheus.GaugeVec + registry *prometheus.Registry + hostname string + cron *cron.Cron } const ( @@ -153,6 +155,20 @@ var ( }, descAppDefaultLabels, ) + + resourceEventsProcessingHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "argocd_resource_events_processing", + Help: "Time to process resource events in seconds.", + Buckets: []float64{0.25, .5, 1, 2, 4, 8, 16}, + }, + []string{"server"}, + ) + + resourceEventsNumberGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "argocd_resource_events_processed_in_batch", + Help: "Number of resource events processed in batch", + }, []string{"server"}) ) // NewMetricsServer returns a new prometheus server which collects application metrics @@ -202,6 +218,8 @@ func NewMetricsServer(addr string, appLister applister.ApplicationLister, appFil registry.MustRegister(clusterEventsCounter) registry.MustRegister(redisRequestCounter) registry.MustRegister(redisRequestHistogram) + registry.MustRegister(resourceEventsProcessingHistogram) + registry.MustRegister(resourceEventsNumberGauge) return &MetricsServer{ registry: registry, @@ -209,16 +227,18 @@ func NewMetricsServer(addr string, appLister applister.ApplicationLister, appFil Addr: addr, Handler: mux, }, - syncCounter: syncCounter, - k8sRequestCounter: k8sRequestCounter, - kubectlExecCounter: kubectlExecCounter, - kubectlExecPendingGauge: kubectlExecPendingGauge, - orphanedResourcesGauge: orphanedResourcesGauge, - reconcileHistogram: reconcileHistogram, - clusterEventsCounter: clusterEventsCounter, - redisRequestCounter: redisRequestCounter, - redisRequestHistogram: redisRequestHistogram, - hostname: hostname, + syncCounter: syncCounter, + k8sRequestCounter: k8sRequestCounter, + kubectlExecCounter: kubectlExecCounter, + kubectlExecPendingGauge: kubectlExecPendingGauge, + orphanedResourcesGauge: orphanedResourcesGauge, + reconcileHistogram: reconcileHistogram, + clusterEventsCounter: clusterEventsCounter, + redisRequestCounter: redisRequestCounter, + redisRequestHistogram: redisRequestHistogram, + resourceEventsProcessingHistogram: resourceEventsProcessingHistogram, + resourceEventsNumberGauge: resourceEventsNumberGauge, + hostname: hostname, // This cron is used to expire the metrics cache. // Currently clearing the metrics cache is logging and deleting from the map // so there is no possibility of panic, but we will add a chain to keep robfig/cron v1 behavior. @@ -284,6 +304,12 @@ func (m *MetricsServer) ObserveRedisRequestDuration(duration time.Duration) { m.redisRequestHistogram.WithLabelValues(m.hostname, common.ApplicationController).Observe(duration.Seconds()) } +// ObserveResourceEventsProcessingDuration observes resource events processing duration +func (m *MetricsServer) ObserveResourceEventsProcessingDuration(server string, duration time.Duration, processedEventsNumber int) { + m.resourceEventsProcessingHistogram.WithLabelValues(server).Observe(duration.Seconds()) + m.resourceEventsNumberGauge.WithLabelValues(server).Set(float64(processedEventsNumber)) +} + // IncReconcile increments the reconcile counter for an application func (m *MetricsServer) IncReconcile(app *argoappv1.Application, duration time.Duration) { m.reconcileHistogram.WithLabelValues(app.Namespace, app.Spec.Destination.Server).Observe(duration.Seconds()) @@ -311,6 +337,8 @@ func (m *MetricsServer) SetExpiration(cacheExpiration time.Duration) error { m.redisRequestCounter.Reset() m.reconcileHistogram.Reset() m.redisRequestHistogram.Reset() + m.resourceEventsProcessingHistogram.Reset() + m.resourceEventsNumberGauge.Reset() }) if err != nil { return err diff --git a/docs/operator-manual/high_availability.md b/docs/operator-manual/high_availability.md index ddcce80fab25a..98e8a709f8852 100644 --- a/docs/operator-manual/high_availability.md +++ b/docs/operator-manual/high_availability.md @@ -130,6 +130,15 @@ stringData: count (grouped by k8s api version, the granule of parallelism for list operations). In this case, all resources will be buffered in memory -- no api server request will be blocked by processing. +* `ARGOCD_CLUSTER_CACHE_BATCH_EVENTS_PROCESSING` - environment variable that enables the controller to collect events + for Kubernetes resources and process them in a batch. This is useful when the cluster contains a large number of resources, + and the controller is overwhelmed by the number of events. The default value is `false`, which means that the controller + processes events one by one. + +* `ARGOCD_CLUSTER_CACHE_BATCH_EVENTS_PROCESSING_INTERVAL` - environment variable controlling the interval for processing events in a batch. + The valid value is in the format of Go time duration string, e.g. `1ms`, `1s`, `1m`, `1h`. The default value is `100ms`. + The variable is used only when `ARGOCD_CLUSTER_CACHE_BATCH_EVENTS_PROCESSING` is set to `true`. + * `ARGOCD_APPLICATION_TREE_SHARD_SIZE` - environment variable controlling the max number of resources stored in one Redis key. Splitting application tree into multiple keys helps to reduce the amount of traffic between the controller and Redis. The default value is 0, which means that the application tree is stored in a single Redis key. The reasonable value is 100. diff --git a/docs/operator-manual/metrics.md b/docs/operator-manual/metrics.md index 36d3d4dd03b37..91cfdadf826e6 100644 --- a/docs/operator-manual/metrics.md +++ b/docs/operator-manual/metrics.md @@ -24,6 +24,8 @@ Metrics about applications. Scraped at the `argocd-metrics:8082/metrics` endpoin | `argocd_kubectl_exec_total` | counter | Number of kubectl executions | | `argocd_redis_request_duration` | histogram | Redis requests duration. | | `argocd_redis_request_total` | counter | Number of redis requests executed during application reconciliation | +| `argocd_resource_events_processing` | histogram | Time to process resource events in batch in seconds | +| `argocd_resource_events_processed_in_batch` | gauge | Number of resource events processed in batch | If you use Argo CD with many application and project creation and deletion, the metrics page will keep in cache your application and project's history. diff --git a/go.mod b/go.mod index b3fe09bfb6f19..7792ef5985c6a 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/TomOnTime/utfutil v0.0.0-20180511104225-09c41003ee1d github.com/alicebob/miniredis/v2 v2.33.0 github.com/antonmedv/expr v1.15.1 - github.com/argoproj/gitops-engine v0.7.1-0.20241211202847-8849c3f30c55 + github.com/argoproj/gitops-engine v0.7.1-0.20241216155226-54992bf42431 github.com/argoproj/notifications-engine v0.4.1-0.20241007194503-2fef5c9049fd github.com/argoproj/pkg v0.13.7-0.20230626144333-d56162821bd1 github.com/aws/aws-sdk-go v1.55.5 diff --git a/go.sum b/go.sum index 40dac1ab3fa8a..8ee2d235addc9 100644 --- a/go.sum +++ b/go.sum @@ -88,8 +88,8 @@ github.com/antonmedv/expr v1.15.1/go.mod h1:0E/6TxnOlRNp81GMzX9QfDPAmHo2Phg00y4J github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/appscode/go v0.0.0-20191119085241-0887d8ec2ecc/go.mod h1:OawnOmAL4ZX3YaPdN+8HTNwBveT1jMsqP74moa9XUbE= -github.com/argoproj/gitops-engine v0.7.1-0.20241211202847-8849c3f30c55 h1:GEyH0LY7gB/BKnahEl6pgNHc6EdjX308b090tn6odKc= -github.com/argoproj/gitops-engine v0.7.1-0.20241211202847-8849c3f30c55/go.mod h1:WsnykM8idYRUnneeT31cM/Fq/ZsjkefCbjiD8ioCJkU= +github.com/argoproj/gitops-engine v0.7.1-0.20241216155226-54992bf42431 h1:ku0Gzp1dHr7yn83B/xmMrmbB5sJbe32LXaYSDSBd6/c= +github.com/argoproj/gitops-engine v0.7.1-0.20241216155226-54992bf42431/go.mod h1:WsnykM8idYRUnneeT31cM/Fq/ZsjkefCbjiD8ioCJkU= github.com/argoproj/notifications-engine v0.4.1-0.20241007194503-2fef5c9049fd h1:lOVVoK89j9Nd4+JYJiKAaMNYC1402C0jICROOfUPWn0= github.com/argoproj/notifications-engine v0.4.1-0.20241007194503-2fef5c9049fd/go.mod h1:N0A4sEws2soZjEpY4hgZpQS8mRIEw6otzwfkgc3g9uQ= github.com/argoproj/pkg v0.13.7-0.20230626144333-d56162821bd1 h1:qsHwwOJ21K2Ao0xPju1sNuqphyMnMYkyB3ZLoLtxWpo= diff --git a/util/env/env.go b/util/env/env.go index e9c2ff41d393e..686ef8089154c 100644 --- a/util/env/env.go +++ b/util/env/env.go @@ -7,8 +7,6 @@ import ( "strings" "time" - timeutil "github.com/argoproj/pkg/time" - log "github.com/sirupsen/logrus" ) @@ -133,13 +131,12 @@ func ParseDurationFromEnv(env string, defaultValue, min, max time.Duration) time if str == "" { return defaultValue } - durPtr, err := timeutil.ParseDuration(str) + dur, err := time.ParseDuration(str) if err != nil { log.Warnf("Could not parse '%s' as a duration string from environment %s", str, env) return defaultValue } - dur := *durPtr if dur < min { log.Warnf("Value in %s is %s, which is less than minimum %s allowed", env, dur, min) return defaultValue diff --git a/util/env/env_test.go b/util/env/env_test.go index 5be065908084d..48da08b0668f2 100644 --- a/util/env/env_test.go +++ b/util/env/env_test.go @@ -115,6 +115,10 @@ func TestParseDurationFromEnv(t *testing.T) { name: "ValidValueSet", env: "2s", expected: time.Second * 2, + }, { + name: "ValidValueSetMs", + env: "2500ms", + expected: time.Millisecond * 2500, }, { name: "MoreThanMaxSet", env: "6s",