From a02fd537607acb5701645e0d08e8b569991aa526 Mon Sep 17 00:00:00 2001 From: Anita Akaeze Date: Thu, 29 Aug 2024 15:19:28 -0700 Subject: [PATCH] NET-10124: Add metrics endpoint and labels to sync catalog deployment (#4212) add initial ideas --- .changelog/4212.txt | 3 + .../templates/sync-catalog-deployment.yaml | 22 ++ .../test/unit/sync-catalog-deployment.bats | 22 ++ charts/consul/values.yaml | 13 ++ control-plane/catalog/metrics/metrics.go | 65 ++++++ control-plane/catalog/to-consul/resource.go | 6 + control-plane/catalog/to-consul/syncer.go | 82 +++++++ .../catalog/to-consul/syncer_test.go | 2 + control-plane/catalog/to-k8s/sink.go | 46 ++++ control-plane/catalog/to-k8s/sink_test.go | 8 +- .../configentry_controller_test.go | 2 +- control-plane/go.mod | 4 +- .../subcommand/common/metrics_util.go | 29 +++ .../subcommand/gateway-resources/command.go | 41 +--- .../subcommand/sync-catalog/command.go | 110 ++++++++- .../sync-catalog/command_ent_test.go | 101 ++++----- .../subcommand/sync-catalog/command_test.go | 213 ++++++++++-------- 17 files changed, 564 insertions(+), 205 deletions(-) create mode 100644 .changelog/4212.txt create mode 100644 control-plane/catalog/metrics/metrics.go create mode 100644 control-plane/subcommand/common/metrics_util.go diff --git a/.changelog/4212.txt b/.changelog/4212.txt new file mode 100644 index 0000000000..d320896673 --- /dev/null +++ b/.changelog/4212.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +sync-catalog: expose prometheus scrape metrics on sync-catalog pods +``` \ No newline at end of file diff --git a/charts/consul/templates/sync-catalog-deployment.yaml b/charts/consul/templates/sync-catalog-deployment.yaml index 963e6b2485..94260b5e44 100644 --- a/charts/consul/templates/sync-catalog-deployment.yaml +++ b/charts/consul/templates/sync-catalog-deployment.yaml @@ -61,6 +61,13 @@ spec: "vault.hashicorp.com/namespace": "{{ .Values.global.secretsBackend.vault.vaultNamespace }}" {{- end }} {{- end }} + {{- if .Values.syncCatalog.metrics.enabled | default .Values.global.metrics.enabled }} + "prometheus.io/scrape": "true" + {{- if not (hasKey (default "" .Values.syncCatalog.annotations | fromYaml) "prometheus.io/path")}} + "prometheus.io/path": {{ default "/metrics" .Values.syncCatalog.metrics.path }} + {{- end }} + "prometheus.io/port": {{ .Values.syncCatalog.metrics.port | default "20300" | quote }} + {{- end }} spec: serviceAccountName: {{ template "consul.fullname" . }}-sync-catalog volumes: @@ -196,6 +203,16 @@ spec: {{- if .Values.syncCatalog.syncLoadBalancerEndpoints }} -sync-lb-services-endpoints=true \ {{- end }} + {{- if .Values.syncCatalog.metrics.enabled | default .Values.global.metrics.enabled }} + -enable-metrics \ + {{- end }} + {{- if .Values.syncCatalog.metrics.path }} + -metrics-path={{ .Values.syncCatalog.metrics.path }} \ + {{- end }} + {{- if .Values.syncCatalog.metrics.port }} + -metrics-port={{ .Values.syncCatalog.metrics.port }} \ + {{- end }} + -prometheus-retention-time={{ .Values.global.metrics.agentMetricsRetentionTime }} \ livenessProbe: httpGet: path: /health/ready @@ -220,6 +237,11 @@ spec: resources: {{- toYaml . | nindent 10 }} {{- end }} + {{- if or (eq (.Values.syncCatalog.metrics.enabled | toString) "-") .Values.syncCatalog.metrics.enabled .Values.global.metrics.enabled }} + ports: + - name: prometheus + containerPort: {{ .Values.syncCatalog.metrics.port | default "20300" | int }} + {{- end }} {{- if .Values.syncCatalog.priorityClassName }} priorityClassName: {{ .Values.syncCatalog.priorityClassName | quote }} {{- end }} diff --git a/charts/consul/test/unit/sync-catalog-deployment.bats b/charts/consul/test/unit/sync-catalog-deployment.bats index b60b030ac0..c3c6c4378a 100755 --- a/charts/consul/test/unit/sync-catalog-deployment.bats +++ b/charts/consul/test/unit/sync-catalog-deployment.bats @@ -1025,6 +1025,28 @@ load _helpers [ "${actual}" = "bar" ] } +@test "syncCatalog/Deployment: metrics annotations can be set" { + cd `chart_dir` + local object=$(helm template \ + -s templates/sync-catalog-deployment.yaml \ + --set 'syncCatalog.enabled=true' \ + --set 'syncCatalog.metrics.enabled=true' \ + . | tee /dev/stderr | + yq -r '.spec.template.metadata.annotations | + del(."consul.hashicorp.com/connect-inject") | + del(."consul.hashicorp.com/mesh-inject")' | + tee /dev/stderr) + + # Annotations to check + annotations=("prometheus.io/scrape" "prometheus.io/path" "prometheus.io/port") + + # Check each annotation + for annotation in "${annotations[@]}"; do + actual=$(echo "$object" | yq -r "has(\"$annotation\")") + [ "$actual" = "true" ] + done +} + #-------------------------------------------------------------------- # logLevel diff --git a/charts/consul/values.yaml b/charts/consul/values.yaml index 6f49cb67c8..934df7c46a 100644 --- a/charts/consul/values.yaml +++ b/charts/consul/values.yaml @@ -2203,6 +2203,19 @@ syncCatalog: # If false, LoadBalancer endpoints are not synced to Consul. syncLoadBalancerEndpoints: false + # Metrics settings for syncCatalog + metrics: + # This value enables or disables metrics collection for registered services, overriding the global metrics collection settings. + # @type: boolean + enabled: false + # This value sets the port to use for scraping syncCatalog metrics via prometheus, defaults to 20300 if not set. Must be in the port + # range of 1024-65535. + # @type: int + port: null + # This value sets the path to use for scraping syncCatalog metrics via prometheus, defaults to /metrics if not set. + # @type: string + path: null + ingress: # Syncs the hostname from a Kubernetes Ingress resource to service registrations # when a rule matched a service. Currently only supports host based routing and diff --git a/control-plane/catalog/metrics/metrics.go b/control-plane/catalog/metrics/metrics.go new file mode 100644 index 0000000000..17fabf0bdc --- /dev/null +++ b/control-plane/catalog/metrics/metrics.go @@ -0,0 +1,65 @@ +package metrics + +import ( + "strconv" + + "github.com/armon/go-metrics" + metricsutil "github.com/hashicorp/consul-k8s/control-plane/subcommand/common" +) + +const ( + defaultScrapePort = 20300 + defaultScrapePath = "/metrics" +) + +type Config struct { + // EnableSyncCatalogMetrics indicates whether or not SyncCatalog metrics should be enabled + // by default on a deployed consul-sync-catalog, passed from the helm chart via command-line flags to our controller. + EnableSyncCatalogMetrics bool + + // The default path to use for scraping prometheus metrics, passed from the helm chart via command-line flags to our controller. + DefaultPrometheusScrapePath string + + // The default port to use for scraping prometheus metrics, passed from the helm chart via command-line flags to our controller. + DefaultPrometheusScrapePort int + + // Configures the retention time for metrics in the metrics store, passed from the helm chart via command-line flags to our controller. + PrometheusMetricsRetentionTime string +} + +func syncCatalogMetricsPort(portString string) int { + port, err := strconv.Atoi(portString) + if err != nil { + return defaultScrapePort + } + + if port < 1024 || port > 65535 { + // if we requested a privileged port, use the default + return defaultScrapePort + } + + return port +} + +func syncCatalogMetricsPath(path string) string { + if path, isSet := metricsutil.GetScrapePath(path); isSet { + return path + } + + // otherwise, fallback to the global helm setting + return defaultScrapePath +} + +func SyncCatalogMetricsConfig(enableMetrics bool, metricsPort, metricsPath string) Config { + return Config{ + EnableSyncCatalogMetrics: enableMetrics, + DefaultPrometheusScrapePort: syncCatalogMetricsPort(metricsPort), + DefaultPrometheusScrapePath: syncCatalogMetricsPath(metricsPath), + } +} + +func ServiceNameLabel(serviceName string) []metrics.Label { + return []metrics.Label{ + {Name: "service_name", Value: serviceName}, + } +} diff --git a/control-plane/catalog/to-consul/resource.go b/control-plane/catalog/to-consul/resource.go index 879789d4b5..09e7e17200 100644 --- a/control-plane/catalog/to-consul/resource.go +++ b/control-plane/catalog/to-consul/resource.go @@ -11,6 +11,7 @@ import ( "sync" mapset "github.com/deckarep/golang-set" + "github.com/hashicorp/consul-k8s/control-plane/catalog/metrics" "github.com/hashicorp/consul-k8s/control-plane/helper/controller" "github.com/hashicorp/consul-k8s/control-plane/helper/parsetags" "github.com/hashicorp/consul-k8s/control-plane/namespaces" @@ -102,6 +103,11 @@ type ServiceResource struct { // LoadBalancerEndpointsSync set to true (default false) will sync ServiceTypeLoadBalancer endpoints. LoadBalancerEndpointsSync bool + // MetricsConfig contains metrics configuration and has methods to determine whether + // configuration should come from the default flags or annotations. The syncCatalog uses this to configure prometheus + // annotations. + MetricsConfig metrics.Config + // NodeExternalIPSync set to true (the default) syncs NodePort services // using the node's external ip address. When false, the node's internal // ip address will be used instead. diff --git a/control-plane/catalog/to-consul/syncer.go b/control-plane/catalog/to-consul/syncer.go index 9f1df18ba6..76269630ec 100644 --- a/control-plane/catalog/to-consul/syncer.go +++ b/control-plane/catalog/to-consul/syncer.go @@ -9,6 +9,8 @@ import ( "sync" "time" + "github.com/armon/go-metrics" + "github.com/armon/go-metrics/prometheus" "github.com/cenkalti/backoff" mapset "github.com/deckarep/golang-set" "github.com/hashicorp/consul-k8s/control-plane/consul" @@ -17,6 +19,41 @@ import ( "github.com/hashicorp/go-hclog" ) +var ( + baseName = []string{"consul", "sync_catalog", "to_consul"} + registerName = append(baseName, "register") + deregisterName = append(baseName, "deregister") + registerErrorName = append(baseName, "register", "error") + deregisterErrorName = append(baseName, "deregister", "error") + syncCatalogStatus = append(baseName, "status") +) + +var SyncToConsulCounters = []prometheus.CounterDefinition{ + { + Name: registerName, + Help: "Increments for each service instance registered to Consul via catalog sync", + }, + { + Name: deregisterName, + Help: "Increments for each service deregistered from Consul via catalog sync", + }, + { + Name: registerErrorName, + Help: "Increments whenever a Consul API client returns an error for a catalog sync register request", + }, + { + Name: deregisterErrorName, + Help: "Increments whenever a Consul API client returns an error for a catalog sync deregister request request", + }, +} + +var SyncCatalogGauge = []prometheus.GaugeDefinition{ + { + Name: syncCatalogStatus, + Help: "Status of the Consul Client endpoint. 1 for connected, 0 for disconnected", + }, +} + const ( // ConsulSyncPeriod is how often the syncer will attempt to // reconcile the expected service states with the remote Consul server. @@ -101,6 +138,8 @@ type ConsulSyncer struct { // watchers is all namespaces mapped to a map of Consul service // names mapped to a cancel function for watcher routines watchers map[string]map[string]context.CancelFunc + + PrometheusSink *prometheus.PrometheusSink } // Sync implements Syncer. @@ -433,14 +472,30 @@ func (s *ConsulSyncer) syncFull(ctx context.Context) { "node-name", r.Node, "service-id", r.ServiceID, "service-consul-namespace", r.Namespace) + _, err = consulClient.Catalog().Deregister(r, nil) if err != nil { + // metric count for error deregistering k8s services from Consul + labels := []metrics.Label{ + {Name: "error", Value: err.Error()}, + } + s.PrometheusSink.IncrCounterWithLabels(deregisterErrorName, 1, labels) + s.Log.Warn("error deregistering service", "node-name", r.Node, "service-id", r.ServiceID, "service-consul-namespace", r.Namespace, "err", err) + continue + } + + // metric count for deregistering k8s services from Consul + labels := []metrics.Label{ + {Name: "id", Value: r.ServiceID}, + {Name: "node", Value: r.Node}, + {Name: "namespace", Value: r.Namespace}, } + s.PrometheusSink.IncrCounterWithLabels(deregisterName, 1, labels) } // Always clear deregistrations, they'll repopulate if we had errors @@ -465,6 +520,14 @@ func (s *ConsulSyncer) syncFull(ctx context.Context) { // Register the service. _, err = consulClient.Catalog().Register(r, nil) if err != nil { + // metric count for error syncing K8S services to Consul + label := []metrics.Label{ + {Name: "error", Value: err.Error()}, + } + s.PrometheusSink.IncrCounterWithLabels(registerErrorName, 1, label) + // Set to 0 if the endpoint is down or returns an error + s.PrometheusSink.SetGauge(syncCatalogStatus, 0) + s.Log.Warn("error registering service", "node-name", r.Node, "service-name", r.Service.Service, @@ -478,6 +541,25 @@ func (s *ConsulSyncer) syncFull(ctx context.Context) { "service-name", r.Service.Service, "consul-namespace-name", r.Service.Namespace, "service", r.Service) + + // metric count and service metadata syncing k8s services to Consul + labels := []metrics.Label{ + {Name: "id", Value: r.Service.ID}, + {Name: "service", Value: r.Service.Service}, + {Name: "node", Value: r.Node}, + {Name: "namespace", Value: r.Service.Namespace}, + {Name: "datacenter", Value: r.Datacenter}, + } + + if val, exists := r.Service.Meta["external-k8s-ref-name"]; exists && val != "" { + labels = append(labels, metrics.Label{Name: "external_k8s_ref_name", Value: val}) + } + if r.Check != nil { + labels = append(labels, metrics.Label{Name: "status", Value: r.Check.Status}) + } + s.PrometheusSink.IncrCounterWithLabels(registerName, 1, labels) + // Set to 1 if the endpoint is healthy + s.PrometheusSink.SetGauge(syncCatalogStatus, 1) } } } diff --git a/control-plane/catalog/to-consul/syncer_test.go b/control-plane/catalog/to-consul/syncer_test.go index ab2cfee0a2..3f0d19ee50 100644 --- a/control-plane/catalog/to-consul/syncer_test.go +++ b/control-plane/catalog/to-consul/syncer_test.go @@ -13,6 +13,7 @@ import ( "testing" "time" + "github.com/armon/go-metrics/prometheus" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/go-hclog" @@ -286,6 +287,7 @@ func testConsulSyncerWithConfig(testClient *test.TestServerClient, configurator ServicePollPeriod: 50 * time.Millisecond, ConsulK8STag: TestConsulK8STag, ConsulNodeName: ConsulSyncNodeName, + PrometheusSink: &prometheus.PrometheusSink{}, } configurator(s) s.init() diff --git a/control-plane/catalog/to-k8s/sink.go b/control-plane/catalog/to-k8s/sink.go index 6e201253df..794ba13431 100644 --- a/control-plane/catalog/to-k8s/sink.go +++ b/control-plane/catalog/to-k8s/sink.go @@ -9,6 +9,9 @@ import ( "sync" "time" + "github.com/armon/go-metrics" + "github.com/armon/go-metrics/prometheus" + metricsutil "github.com/hashicorp/consul-k8s/control-plane/catalog/metrics" "github.com/hashicorp/consul-k8s/control-plane/helper/coalesce" "github.com/hashicorp/go-hclog" apiv1 "k8s.io/api/core/v1" @@ -19,6 +22,33 @@ import ( "k8s.io/client-go/tools/cache" ) +var ( + baseName = []string{"consul", "sync_catalog", "to_k8s"} + registerName = append(baseName, "register") + deregisterName = append(baseName, "deregister") + registerErrorName = append(baseName, "register", "error") + deregisterErrorName = append(baseName, "deregister", "error") +) + +var SyncToK8sCounters = []prometheus.CounterDefinition{ + { + Name: registerName, + Help: "Increments for each Consul service registered to kubernetes via catalog sync", + }, + { + Name: deregisterName, + Help: "Increments for each Consul service deregistered from kubernetes via catalog sync", + }, + { + Name: registerErrorName, + Help: "Increments whenever a Consul API client returns an error for a catalog sync register request", + }, + { + Name: deregisterErrorName, + Help: "Increments whenever a Consul API client returns an error for a catalog sync deregister request request", + }, +} + const ( // K8SQuietPeriod is the time to wait for no service changes before syncing. K8SQuietPeriod = 1 * time.Second @@ -83,6 +113,8 @@ type K8SSink struct { // It's populated from Kubernetes data. serviceMapConsul map[string]*apiv1.Service triggerCh chan struct{} + + PrometheusSink *prometheus.PrometheusSink } // SetServices implements Sink. @@ -232,7 +264,15 @@ func (s *K8SSink) Run(ch <-chan struct{}) { for _, name := range delete { if err := svcClient.Delete(s.Ctx, name, metav1.DeleteOptions{}); err != nil { s.Log.Warn("error deleting service", "name", name, "error", err) + + // metric count for error syncing Consul services to K8s + labels := []metrics.Label{ + {Name: "error", Value: err.Error()}, + } + s.PrometheusSink.IncrCounterWithLabels(deregisterErrorName, 1, labels) } + // metric count for deregistering Consul services from k8s + s.PrometheusSink.IncrCounterWithLabels(deregisterName, 1, metricsutil.ServiceNameLabel(name)) } for _, svc := range update { @@ -246,7 +286,13 @@ func (s *K8SSink) Run(ch <-chan struct{}) { _, err := svcClient.Create(s.Ctx, svc, metav1.CreateOptions{}) if err != nil { s.Log.Warn("error creating service", "name", svc.Name, "error", err) + labels := []metrics.Label{ + {Name: "error", Value: err.Error()}, + } + s.PrometheusSink.IncrCounterWithLabels(registerErrorName, 1, labels) } + // metric count for registering Consul services to k8s + s.PrometheusSink.IncrCounterWithLabels(registerName, 1, metricsutil.ServiceNameLabel(svc.Name)) } } } diff --git a/control-plane/catalog/to-k8s/sink_test.go b/control-plane/catalog/to-k8s/sink_test.go index cfba502268..40b2d14df4 100644 --- a/control-plane/catalog/to-k8s/sink_test.go +++ b/control-plane/catalog/to-k8s/sink_test.go @@ -7,6 +7,7 @@ import ( "context" "testing" + "github.com/armon/go-metrics/prometheus" "github.com/hashicorp/consul-k8s/control-plane/helper/controller" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/go-hclog" @@ -398,9 +399,10 @@ func TestK8SSink_deleteReconcileLocal(t *testing.T) { func testSink(t *testing.T, client kubernetes.Interface) (*K8SSink, func()) { sink := &K8SSink{ - Client: client, - Log: hclog.Default(), - Ctx: context.Background(), + Client: client, + Log: hclog.Default(), + Ctx: context.Background(), + PrometheusSink: &prometheus.PrometheusSink{}, } closer := controller.TestControllerRun(sink) diff --git a/control-plane/controllers/configentries/configentry_controller_test.go b/control-plane/controllers/configentries/configentry_controller_test.go index 7002b20641..169709494d 100644 --- a/control-plane/controllers/configentries/configentry_controller_test.go +++ b/control-plane/controllers/configentries/configentry_controller_test.go @@ -1788,7 +1788,7 @@ func TestConfigEntryControllers_doesNotCreateUnownedConfigEntry(t *testing.T) { resp, err := reconciler.Reconcile(ctx, ctrl.Request{ NamespacedName: namespacedName, }) - req.Equal(err, c.expErr) + req.Equal(c.expErr, err) req.False(resp.Requeue) // Now check that the object in Consul is as expected. diff --git a/control-plane/go.mod b/control-plane/go.mod index 702f505d13..3e23f2a593 100644 --- a/control-plane/go.mod +++ b/control-plane/go.mod @@ -3,6 +3,7 @@ module github.com/hashicorp/consul-k8s/control-plane replace github.com/hashicorp/consul-k8s/version => ../version require ( + github.com/armon/go-metrics v0.4.1 github.com/cenkalti/backoff v2.2.1+incompatible github.com/containernetworking/cni v1.1.2 github.com/deckarep/golang-set v1.7.1 @@ -32,6 +33,7 @@ require ( github.com/mitchellh/cli v1.1.0 github.com/mitchellh/go-homedir v1.1.0 github.com/mitchellh/mapstructure v1.5.0 + github.com/prometheus/client_golang v1.16.0 github.com/stretchr/testify v1.8.4 go.uber.org/zap v1.25.0 golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 @@ -64,7 +66,6 @@ require ( github.com/Azure/go-autorest/autorest/validation v0.3.0 // indirect github.com/Azure/go-autorest/logger v0.2.1 // indirect github.com/Azure/go-autorest/tracing v0.6.0 // indirect - github.com/armon/go-metrics v0.4.1 // indirect github.com/armon/go-radix v1.0.0 // indirect github.com/aws/aws-sdk-go v1.44.262 // indirect github.com/beorn7/perks v1.0.1 // indirect @@ -127,7 +128,6 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/posener/complete v1.2.3 // indirect - github.com/prometheus/client_golang v1.16.0 // indirect github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect diff --git a/control-plane/subcommand/common/metrics_util.go b/control-plane/subcommand/common/metrics_util.go new file mode 100644 index 0000000000..d3866fdeef --- /dev/null +++ b/control-plane/subcommand/common/metrics_util.go @@ -0,0 +1,29 @@ +package common + +import "strconv" + +func ParseScrapePort(v string) (int, bool) { + port, err := strconv.Atoi(v) + if err != nil { + // we only use the port if it's actually valid + return 0, false + } + if port < 1024 || port > 65535 { + return 0, false + } + return port, true +} + +func GetScrapePath(v string) (string, bool) { + return v, v != "" +} + +func GetMetricsEnabled(v string) (bool, bool) { + if v == "true" { + return true, true + } + if v == "false" { + return false, true + } + return false, false +} diff --git a/control-plane/subcommand/gateway-resources/command.go b/control-plane/subcommand/gateway-resources/command.go index a10473239a..7218644017 100644 --- a/control-plane/subcommand/gateway-resources/command.go +++ b/control-plane/subcommand/gateway-resources/command.go @@ -11,7 +11,6 @@ import ( "fmt" "io" "os" - "strconv" "sync" "time" @@ -30,6 +29,7 @@ import ( "github.com/hashicorp/consul-k8s/control-plane/api-gateway/common" "github.com/hashicorp/consul-k8s/control-plane/api/v1alpha1" "github.com/hashicorp/consul-k8s/control-plane/subcommand" + metricsutil "github.com/hashicorp/consul-k8s/control-plane/subcommand/common" "github.com/hashicorp/consul-k8s/control-plane/subcommand/flags" ) @@ -249,13 +249,13 @@ func (c *Command) Run(args []string) int { }, } - if metricsEnabled, isSet := getMetricsEnabled(c.flagEnableMetrics); isSet { + if metricsEnabled, isSet := metricsutil.GetMetricsEnabled(c.flagEnableMetrics); isSet { classConfig.Spec.Metrics.Enabled = &metricsEnabled - if port, isValid := getScrapePort(c.flagMetricsPort); isValid { + if port, isValid := metricsutil.ParseScrapePort(c.flagMetricsPort); isValid { port32 := int32(port) classConfig.Spec.Metrics.Port = &port32 } - if path, isSet := getScrapePath(c.flagMetricsPath); isSet { + if path, isSet := metricsutil.GetScrapePath(c.flagMetricsPath); isSet { classConfig.Spec.Metrics.Path = &path } } @@ -329,13 +329,13 @@ func (c *Command) validateFlags() error { } if c.flagEnableMetrics != "" { - if _, valid := getMetricsEnabled(c.flagEnableMetrics); !valid { + if _, valid := metricsutil.GetMetricsEnabled(c.flagEnableMetrics); !valid { return errors.New("-enable-metrics must be either 'true' or 'false'") } } if c.flagMetricsPort != "" { - if _, valid := getScrapePort(c.flagMetricsPort); !valid { + if _, valid := metricsutil.ParseScrapePort(c.flagMetricsPort); !valid { return errors.New("-metrics-port must be a valid unprivileged port number") } } @@ -446,35 +446,6 @@ func exponentialBackoffWithMaxIntervalAndTime() *backoff.ExponentialBackOff { return backoff } -func getScrapePort(v string) (int, bool) { - port, err := strconv.Atoi(v) - if err != nil { - // we only use the port if it's actually valid - return 0, false - } - if port < 1024 || port > 65535 { - return 0, false - } - return port, true -} - -func getScrapePath(v string) (string, bool) { - if v == "" { - return "", false - } - return v, true -} - -func getMetricsEnabled(v string) (bool, bool) { - if v == "true" { - return true, true - } - if v == "false" { - return false, true - } - return false, false -} - func nonZeroOrNil(v int) *int32 { if v == 0 { return nil diff --git a/control-plane/subcommand/sync-catalog/command.go b/control-plane/subcommand/sync-catalog/command.go index e461121f3d..004f1bea17 100644 --- a/control-plane/subcommand/sync-catalog/command.go +++ b/control-plane/subcommand/sync-catalog/command.go @@ -5,6 +5,7 @@ package synccatalog import ( "context" + "errors" "flag" "fmt" "net/http" @@ -15,20 +16,24 @@ import ( "syscall" "time" + "github.com/armon/go-metrics/prometheus" mapset "github.com/deckarep/golang-set" "github.com/hashicorp/consul-server-connection-manager/discovery" "github.com/hashicorp/go-hclog" "github.com/mitchellh/cli" + "github.com/prometheus/client_golang/prometheus/promhttp" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + "github.com/hashicorp/consul-k8s/control-plane/catalog/metrics" catalogtoconsul "github.com/hashicorp/consul-k8s/control-plane/catalog/to-consul" catalogtok8s "github.com/hashicorp/consul-k8s/control-plane/catalog/to-k8s" "github.com/hashicorp/consul-k8s/control-plane/consul" "github.com/hashicorp/consul-k8s/control-plane/helper/controller" "github.com/hashicorp/consul-k8s/control-plane/subcommand" "github.com/hashicorp/consul-k8s/control-plane/subcommand/common" + metricsutil "github.com/hashicorp/consul-k8s/control-plane/subcommand/common" "github.com/hashicorp/consul-k8s/control-plane/subcommand/flags" ) @@ -68,6 +73,12 @@ type Command struct { flagK8SNSMirroringPrefix string // Prefix added to Consul namespaces created when mirroring flagCrossNamespaceACLPolicy string // The name of the ACL policy to add to every created namespace if ACLs are enabled + // Metrics settings. + flagEnableMetrics bool + flagMetricsPort string + flagMetricsPath string + flagMetricsRetentionTime string + // Flags to support Kubernetes Ingress resources flagEnableIngress bool // Register services using the hostname from an ingress resource flagLoadBalancerIPs bool // Use the load balancer IP of an ingress resource instead of the hostname @@ -78,11 +89,12 @@ type Command struct { // consul-server-connection-manager has finished initial initialization. ready bool - once sync.Once - sigCh chan os.Signal - help string - logger hclog.Logger - connMgr consul.ServerConnectionManager + once sync.Once + sigCh chan os.Signal + help string + logger hclog.Logger + connMgr consul.ServerConnectionManager + prometheusSink *prometheus.PrometheusSink } func (c *Command) init() { @@ -156,6 +168,11 @@ func (c *Command) init() { "[Enterprise Only] Name of the ACL policy to attach to all created Consul namespaces to allow service "+ "discovery across Consul namespaces. Only necessary if ACLs are enabled.") + c.flags.BoolVar(&c.flagEnableMetrics, "enable-metrics", false, "set this flag to enable metrics collection") + c.flags.StringVar(&c.flagMetricsPath, "metrics-path", "/metrics", "specify to set the path used for metrics scraping") + c.flags.StringVar(&c.flagMetricsPort, "metrics-port", "20300", "specify to set the port used for metrics scraping") + c.flags.StringVar(&c.flagMetricsRetentionTime, "prometheus-retention-time", "1m", "configures the retention time for metrics in the Prometheus sink") + c.flags.BoolVar(&c.flagEnableIngress, "enable-ingress", false, "[Enterprise Only] Enables namespaces, in either a single Consul namespace or mirrored.") c.flags.BoolVar(&c.flagLoadBalancerIPs, "loadBalancer-ips", false, @@ -259,6 +276,17 @@ func (c *Command) Run(args []string) int { // it will be the only allowed namespace allowSet = mapset.NewSet(c.flagK8SSourceNamespace) } + + metricsConfig := metrics.SyncCatalogMetricsConfig(c.flagEnableMetrics, c.flagMetricsPort, c.flagMetricsPath) + metricsConfig.PrometheusMetricsRetentionTime = c.flagMetricsRetentionTime + + // Create the metrics sink + sink, err := c.recordMetrics() + if err != nil { + c.logger.Error("Prometheus sink not initialized, metrics cannot be displayed", "error", err) + } + c.prometheusSink = sink + c.logger.Info("K8s namespace syncing configuration", "k8s namespaces allowed to be synced", allowSet, "k8s namespaces denied from syncing", denySet) @@ -279,6 +307,7 @@ func (c *Command) Run(args []string) int { ServicePollPeriod: c.flagConsulWritePeriod * 2, ConsulK8STag: c.flagConsulK8STag, ConsulNodeName: c.flagConsulNodeName, + PrometheusSink: c.prometheusSink, } go syncer.Run(ctx) @@ -306,6 +335,7 @@ func (c *Command) Run(args []string) int { ConsulNodeName: c.flagConsulNodeName, EnableIngress: c.flagEnableIngress, SyncLoadBalancerIPs: c.flagLoadBalancerIPs, + MetricsConfig: metricsConfig, }, } @@ -320,10 +350,11 @@ func (c *Command) Run(args []string) int { var toK8SCh chan struct{} if c.flagToK8S { sink := &catalogtok8s.K8SSink{ - Client: c.clientset, - Namespace: c.flagK8SWriteNamespace, - Log: c.logger.Named("to-k8s/sink"), - Ctx: ctx, + Client: c.clientset, + Namespace: c.flagK8SWriteNamespace, + Log: c.logger.Named("to-k8s/sink"), + Ctx: ctx, + PrometheusSink: c.prometheusSink, } source := &catalogtok8s.Source{ @@ -362,6 +393,18 @@ func (c *Command) Run(args []string) int { } }() + // Start metrics handler + go func() { + mux := http.NewServeMux() + mux.Handle(c.flagMetricsPath, c.authorizeMiddleware()(promhttp.Handler())) + var handler http.Handler = mux + + c.UI.Info(fmt.Sprintf("Listening on %q...", c.flagMetricsPort)) + if err := http.ListenAndServe(fmt.Sprintf(":%s", c.flagMetricsPort), handler); err != nil { + c.UI.Error(fmt.Sprintf("Error listening: %s", err)) + } + }() + select { // Unexpected exit case <-toConsulCh: @@ -438,9 +481,58 @@ func (c *Command) validateFlags() error { ) } + if c.flagMetricsPort != "" { + if _, valid := metricsutil.ParseScrapePort(c.flagMetricsPort); !valid { + return errors.New("-metrics-port must be a valid unprivileged port number") + } + } + return nil } +func (c *Command) recordMetrics() (*prometheus.PrometheusSink, error) { + var err error + + duration, err := time.ParseDuration(c.flagMetricsRetentionTime) + if err != nil { + return &prometheus.PrometheusSink{}, err + } + + var counters = [][]prometheus.CounterDefinition{ + catalogtoconsul.SyncToConsulCounters, + catalogtok8s.SyncToK8sCounters, + } + + var counterDefs []prometheus.CounterDefinition + + for _, counter := range counters { + counterDefs = append(counterDefs, counter...) + } + + opts := prometheus.PrometheusOpts{ + Expiration: duration, + CounterDefinitions: counterDefs, + GaugeDefinitions: catalogtoconsul.SyncCatalogGauge, + } + + sink, err := prometheus.NewPrometheusSinkFrom(opts) + if err != nil { + return &prometheus.PrometheusSink{}, err + } + + return sink, nil +} + +// authorizeMiddleware validates the token and returns http handler. +func (c *Command) authorizeMiddleware() func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // TO-DO: Validate the token and proceed to the next handler + next.ServeHTTP(w, r) + }) + } +} + const synopsis = "Sync Kubernetes services and Consul services." const help = ` Usage: consul-k8s-control-plane sync-catalog [options] diff --git a/control-plane/subcommand/sync-catalog/command_ent_test.go b/control-plane/subcommand/sync-catalog/command_ent_test.go index 8af712dcbe..f8bdccda6b 100644 --- a/control-plane/subcommand/sync-catalog/command_ent_test.go +++ b/control-plane/subcommand/sync-catalog/command_ent_test.go @@ -54,15 +54,13 @@ func TestRun_ToConsulSingleDestinationNamespace(t *testing.T) { // Run the command. ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - clientset: k8s, - connMgr: testClient.Watcher, - logger: hclog.New(&hclog.LoggerOptions{ - Name: tt.Name(), - Level: hclog.Debug, - }), - } + connMgr := testClient.Watcher + logger := hclog.New(&hclog.LoggerOptions{ + Name: tt.Name(), + Level: hclog.Debug, + }) + + cmd := NewTestCommand(t, ui, k8s, logger, connMgr) // Create two services in k8s in default and foo namespaces. _, err := k8s.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), lbService("default", "1.1.1.1"), metav1.CreateOptions{}) @@ -79,7 +77,7 @@ func TestRun_ToConsulSingleDestinationNamespace(t *testing.T) { _, err = k8s.CoreV1().Services("foo").Create(context.Background(), lbService("foo", "1.1.1.1"), metav1.CreateOptions{}) require.NoError(tt, err) - exitChan := runCommandAsynchronously(&cmd, []string{ + exitChan := runCommandAsynchronously(cmd, []string{ "-addresses", "127.0.0.1", "-http-port", strconv.Itoa(testClient.Cfg.HTTPPort), "-consul-write-interval", "500ms", @@ -90,7 +88,7 @@ func TestRun_ToConsulSingleDestinationNamespace(t *testing.T) { "-allow-k8s-namespace=*", "-add-k8s-namespace-suffix=false", }) - defer stopCommand(tt, &cmd, exitChan) + defer stopCommand(tt, cmd, exitChan) timer := &retry.Timer{Timeout: 10 * time.Second, Wait: 500 * time.Millisecond} retry.RunWith(timer, tt, func(r *retry.R) { @@ -190,15 +188,13 @@ func TestRun_ToConsulMirroringNamespaces(t *testing.T) { // Run the command. ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - clientset: k8s, - connMgr: testClient.Watcher, - logger: hclog.New(&hclog.LoggerOptions{ - Name: tt.Name(), - Level: hclog.Debug, - }), - } + connMgr := testClient.Watcher + logger := hclog.New(&hclog.LoggerOptions{ + Name: tt.Name(), + Level: hclog.Debug, + }) + + cmd := NewTestCommand(t, ui, k8s, logger, connMgr) // Create two services in k8s in default and foo namespaces. _, err := k8s.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), lbService("default", "1.1.1.1"), metav1.CreateOptions{}) @@ -226,8 +222,8 @@ func TestRun_ToConsulMirroringNamespaces(t *testing.T) { "-enable-k8s-namespace-mirroring", "-k8s-namespace-mirroring-prefix", c.MirroringPrefix, }, c.ExtraFlags...) - exitChan := runCommandAsynchronously(&cmd, args) - defer stopCommand(tt, &cmd, exitChan) + exitChan := runCommandAsynchronously(cmd, args) + defer stopCommand(tt, cmd, exitChan) timer := &retry.Timer{Timeout: 10 * time.Second, Wait: 500 * time.Millisecond} retry.RunWith(timer, tt, func(r *retry.R) { @@ -487,16 +483,15 @@ func TestRun_ToConsulChangingNamespaceFlags(t *testing.T) { // Run the first command. { - firstCmd := Command{ - UI: ui, - clientset: k8s, - connMgr: testClient.Watcher, - logger: hclog.New(&hclog.LoggerOptions{ - Name: tt.Name() + "-firstrun", - Level: hclog.Debug, - }), - } - exitChan := runCommandAsynchronously(&firstCmd, append(commonArgs, c.FirstRunFlags...)) + connMgr := testClient.Watcher + logger := hclog.New(&hclog.LoggerOptions{ + Name: tt.Name() + "-firstrun", + Level: hclog.Debug, + }) + + firstCmd := NewTestCommand(t, ui, k8s, logger, connMgr) + + exitChan := runCommandAsynchronously(firstCmd, append(commonArgs, c.FirstRunFlags...)) // Wait until the expected services are synced. timer := &retry.Timer{Timeout: 10 * time.Second, Wait: 500 * time.Millisecond} @@ -511,23 +506,21 @@ func TestRun_ToConsulChangingNamespaceFlags(t *testing.T) { require.Equal(r, instances[0].ServiceName, svcName) } }) - stopCommand(tt, &firstCmd, exitChan) + stopCommand(tt, firstCmd, exitChan) } tt.Log("first command run complete") // Run the second command. { - secondCmd := Command{ - UI: ui, - clientset: k8s, - connMgr: testClient.Watcher, - logger: hclog.New(&hclog.LoggerOptions{ - Name: tt.Name() + "-secondrun", - Level: hclog.Debug, - }), - } - exitChan := runCommandAsynchronously(&secondCmd, append(commonArgs, c.SecondRunFlags...)) - defer stopCommand(tt, &secondCmd, exitChan) + connMgr := testClient.Watcher + logger := hclog.New(&hclog.LoggerOptions{ + Name: tt.Name() + "-secondrun", + Level: hclog.Debug, + }) + + secondCmd := NewTestCommand(t, ui, k8s, logger, connMgr) + exitChan := runCommandAsynchronously(secondCmd, append(commonArgs, c.SecondRunFlags...)) + defer stopCommand(tt, secondCmd, exitChan) // Wait until the expected services are synced and the old ones // deleted. @@ -653,15 +646,13 @@ func TestRun_ToConsulNamespacesACLs(t *testing.T) { // Set up the sync command ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - clientset: k8s, - connMgr: testClient.Watcher, - logger: hclog.New(&hclog.LoggerOptions{ - Name: tt.Name(), - Level: hclog.Debug, - }), - } + connMgr := testClient.Watcher + logger := hclog.New(&hclog.LoggerOptions{ + Name: tt.Name(), + Level: hclog.Debug, + }) + + cmd := NewTestCommand(t, ui, k8s, logger, connMgr) // Set flags and run the command commonArgs := []string{ @@ -674,8 +665,8 @@ func TestRun_ToConsulNamespacesACLs(t *testing.T) { "-enable-namespaces", "-consul-cross-namespace-acl-policy=cross-namespace-policy", } - exitChan := runCommandAsynchronously(&cmd, append(commonArgs, c.Flags...)) - defer stopCommand(tt, &cmd, exitChan) + exitChan := runCommandAsynchronously(cmd, append(commonArgs, c.Flags...)) + defer stopCommand(tt, cmd, exitChan) // Check the namespaces are created correctly timer := &retry.Timer{Timeout: 10 * time.Second, Wait: 500 * time.Millisecond} diff --git a/control-plane/subcommand/sync-catalog/command_test.go b/control-plane/subcommand/sync-catalog/command_test.go index 9b7365e801..ca2aca4e37 100644 --- a/control-plane/subcommand/sync-catalog/command_test.go +++ b/control-plane/subcommand/sync-catalog/command_test.go @@ -7,18 +7,22 @@ import ( "context" "os" "strconv" + "sync" "syscall" "testing" "time" + "github.com/armon/go-metrics/prometheus" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/go-hclog" "github.com/mitchellh/cli" "github.com/stretchr/testify/require" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + "github.com/hashicorp/consul-k8s/control-plane/consul" "github.com/hashicorp/consul-k8s/control-plane/helper/test" ) @@ -45,9 +49,8 @@ func TestRun_FlagValidation(t *testing.T) { for _, c := range cases { t.Run(c.ExpErr, func(t *testing.T) { ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - } + + cmd := NewTestCommand(t, ui, nil, nil, nil) responseCode := cmd.Run(c.Flags) require.Equal(t, 1, responseCode, ui.ErrorWriter.String()) require.Contains(t, ui.ErrorWriter.String(), c.ExpErr) @@ -60,24 +63,21 @@ func TestRun_Defaults_SyncsConsulServiceToK8s(t *testing.T) { t.Parallel() k8s, testClient := completeSetup(t) + ui := cli.NewMockUi() + logger := hclog.New(&hclog.LoggerOptions{ + Name: t.Name(), + Level: hclog.Debug, + }) + connMgr := testClient.Watcher // Run the command. - ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - clientset: k8s, - logger: hclog.New(&hclog.LoggerOptions{ - Name: t.Name(), - Level: hclog.Debug, - }), - connMgr: testClient.Watcher, - } + cmd := NewTestCommand(t, ui, k8s, logger, connMgr) - exitChan := runCommandAsynchronously(&cmd, []string{ + exitChan := runCommandAsynchronously(cmd, []string{ "-addresses", "127.0.0.1", "-http-port", strconv.Itoa(testClient.Cfg.HTTPPort), }) - defer stopCommand(t, &cmd, exitChan) + defer stopCommand(t, cmd, exitChan) retry.Run(t, func(r *retry.R) { serviceList, err := k8s.CoreV1().Services(metav1.NamespaceDefault).List(context.Background(), metav1.ListOptions{}) @@ -98,22 +98,21 @@ func testSignalHandling(sig os.Signal) func(*testing.T) { return func(t *testing.T) { k8s, testClient := completeSetup(t) - // Run the command. ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - clientset: k8s, - logger: hclog.New(&hclog.LoggerOptions{ - Name: t.Name(), - Level: hclog.Debug, - }), - connMgr: testClient.Watcher, - } + logger := hclog.New(&hclog.LoggerOptions{ + Name: t.Name(), + Level: hclog.Debug, + }) + connMgr := testClient.Watcher + + // Run the command. + cmd := NewTestCommand(t, ui, k8s, logger, connMgr) - exitChan := runCommandAsynchronously(&cmd, []string{ + exitChan := runCommandAsynchronously(cmd, []string{ "-addresses", "127.0.0.1", "-http-port", strconv.Itoa(testClient.Cfg.HTTPPort), }) + cmd.sendSignal(sig) // Assert that it exits cleanly or timeout. @@ -139,31 +138,29 @@ func TestRun_ToConsulWithAddK8SNamespaceSuffix(t *testing.T) { k8s, testClient := completeSetup(t) consulClient := testClient.APIClient - // Run the command. ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - clientset: k8s, - logger: hclog.New(&hclog.LoggerOptions{ - Name: t.Name(), - Level: hclog.Debug, - }), - flagAllowK8sNamespacesList: []string{"*"}, - connMgr: testClient.Watcher, - } + logger := hclog.New(&hclog.LoggerOptions{ + Name: t.Name(), + Level: hclog.Debug, + }) + connMgr := testClient.Watcher + + // Run the command. + cmd := NewTestCommand(t, ui, k8s, logger, connMgr) + cmd.flagAllowK8sNamespacesList = []string{"*"} // create a service in k8s _, err := k8s.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), lbService("foo", "1.1.1.1"), metav1.CreateOptions{}) require.NoError(t, err) - exitChan := runCommandAsynchronously(&cmd, []string{ + exitChan := runCommandAsynchronously(cmd, []string{ "-addresses", "127.0.0.1", "-http-port", strconv.Itoa(testClient.Cfg.HTTPPort), // change the write interval, so we can see changes in Consul quicker "-consul-write-interval", "100ms", "-add-k8s-namespace-suffix", }) - defer stopCommand(t, &cmd, exitChan) + defer stopCommand(t, cmd, exitChan) retry.Run(t, func(r *retry.R) { services, _, err := consulClient.Catalog().Services(nil) @@ -179,27 +176,23 @@ func TestCommand_Run_ToConsulChangeAddK8SNamespaceSuffixToTrue(t *testing.T) { t.Parallel() k8s, testClient := completeSetup(t) - consulClient := testClient.APIClient - // Run the command. ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - clientset: k8s, - logger: hclog.New(&hclog.LoggerOptions{ - Name: t.Name(), - Level: hclog.Debug, - }), - flagAllowK8sNamespacesList: []string{"*"}, - connMgr: testClient.Watcher, - } + logger := hclog.New(&hclog.LoggerOptions{ + Name: t.Name(), + Level: hclog.Debug, + }) + connMgr := testClient.Watcher + + cmd := NewTestCommand(t, ui, k8s, logger, connMgr) + cmd.flagAllowK8sNamespacesList = []string{"*"} // create a service in k8s _, err := k8s.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), lbService("foo", "1.1.1.1"), metav1.CreateOptions{}) require.NoError(t, err) - exitChan := runCommandAsynchronously(&cmd, []string{ + exitChan := runCommandAsynchronously(cmd, []string{ "-addresses", "127.0.0.1", "-http-port", strconv.Itoa(testClient.Cfg.HTTPPort), "-consul-write-interval", "100ms", @@ -212,16 +205,16 @@ func TestCommand_Run_ToConsulChangeAddK8SNamespaceSuffixToTrue(t *testing.T) { require.Contains(r, services, "foo") }) - stopCommand(t, &cmd, exitChan) + stopCommand(t, cmd, exitChan) // restart sync with -add-k8s-namespace-suffix - exitChan = runCommandAsynchronously(&cmd, []string{ + exitChan = runCommandAsynchronously(cmd, []string{ "-addresses", "127.0.0.1", "-http-port", strconv.Itoa(testClient.Cfg.HTTPPort), "-consul-write-interval", "100ms", "-add-k8s-namespace-suffix", }) - defer stopCommand(t, &cmd, exitChan) + defer stopCommand(t, cmd, exitChan) // check that the name of the service is now namespaced retry.Run(t, func(r *retry.R) { @@ -244,16 +237,13 @@ func TestCommand_Run_ToConsulTwoServicesSameNameDifferentNamespace(t *testing.T) // Run the command. ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - clientset: k8s, - logger: hclog.New(&hclog.LoggerOptions{ - Name: t.Name(), - Level: hclog.Debug, - }), - flagAllowK8sNamespacesList: []string{"*"}, - connMgr: testClient.Watcher, - } + logger := hclog.New(&hclog.LoggerOptions{ + Name: t.Name(), + Level: hclog.Debug, + }) + connMgr := testClient.Watcher + cmd := NewTestCommand(t, ui, k8s, logger, connMgr) + cmd.flagAllowK8sNamespacesList = []string{"*"} // create two services in k8s _, err := k8s.CoreV1().Services("bar").Create(context.Background(), lbService("foo", "1.1.1.1"), metav1.CreateOptions{}) @@ -262,13 +252,13 @@ func TestCommand_Run_ToConsulTwoServicesSameNameDifferentNamespace(t *testing.T) _, err = k8s.CoreV1().Services("baz").Create(context.Background(), lbService("foo", "2.2.2.2"), metav1.CreateOptions{}) require.NoError(t, err) - exitChan := runCommandAsynchronously(&cmd, []string{ + exitChan := runCommandAsynchronously(cmd, []string{ "-addresses", "127.0.0.1", "-http-port", strconv.Itoa(testClient.Cfg.HTTPPort), "-consul-write-interval", "100ms", "-add-k8s-namespace-suffix", }) - defer stopCommand(t, &cmd, exitChan) + defer stopCommand(t, cmd, exitChan) // check that the name of the service is namespaced retry.Run(t, func(r *retry.R) { @@ -366,17 +356,16 @@ func TestRun_ToConsulAllowDenyLists(t *testing.T) { // Run the command ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - clientset: k8s, - logger: hclog.New(&hclog.LoggerOptions{ - Name: tt.Name(), - Level: hclog.Debug, - }), - connMgr: testClient.Watcher, - } - exitChan := runCommandAsynchronously(&cmd, flags) - defer stopCommand(tt, &cmd, exitChan) + logger := hclog.New(&hclog.LoggerOptions{ + Name: tt.Name(), + Level: hclog.Debug, + }) + connMgr := testClient.Watcher + + cmd := NewTestCommand(t, ui, k8s, logger, connMgr) + + exitChan := runCommandAsynchronously(cmd, flags) + defer stopCommand(tt, cmd, exitChan) retry.Run(tt, func(r *retry.R) { svcs, _, err := consulClient.Catalog().Services(nil) @@ -509,16 +498,14 @@ func TestRun_ToConsulChangingFlags(t *testing.T) { // Run the first command. { - firstCmd := Command{ - UI: ui, - clientset: k8s, - logger: hclog.New(&hclog.LoggerOptions{ - Name: tt.Name() + "-firstrun", - Level: hclog.Debug, - }), - connMgr: testClient.Watcher, - } - exitChan := runCommandAsynchronously(&firstCmd, append(commonArgs, c.FirstRunFlags...)) + logger := hclog.New(&hclog.LoggerOptions{ + Name: tt.Name() + "-firstrun", + Level: hclog.Debug, + }) + connMgr := testClient.Watcher + firstCmd := NewTestCommand(t, ui, k8s, logger, connMgr) + + exitChan := runCommandAsynchronously(firstCmd, append(commonArgs, c.FirstRunFlags...)) // Wait until the expected services are synced. retry.Run(tt, func(r *retry.R) { @@ -529,23 +516,22 @@ func TestRun_ToConsulChangingFlags(t *testing.T) { require.Equal(r, instances[0].ServiceName, svcName) } }) - stopCommand(tt, &firstCmd, exitChan) + stopCommand(tt, firstCmd, exitChan) } tt.Log("first command run complete") // Run the second command. { - secondCmd := Command{ - UI: ui, - clientset: k8s, - logger: hclog.New(&hclog.LoggerOptions{ - Name: tt.Name() + "-secondrun", - Level: hclog.Debug, - }), - connMgr: testClient.Watcher, - } - exitChan := runCommandAsynchronously(&secondCmd, append(commonArgs, c.SecondRunFlags...)) - defer stopCommand(tt, &secondCmd, exitChan) + + logger := hclog.New(&hclog.LoggerOptions{ + Name: tt.Name() + "-secondrun", + Level: hclog.Debug, + }) + connMgr := testClient.Watcher + secondCmd := NewTestCommand(t, ui, k8s, logger, connMgr) + + exitChan := runCommandAsynchronously(secondCmd, append(commonArgs, c.SecondRunFlags...)) + defer stopCommand(tt, secondCmd, exitChan) // Wait until the expected services are synced and the old ones // deleted. @@ -630,3 +616,30 @@ func lbService(name, lbIP string) *apiv1.Service { }, } } + +var ( + prometheusSinkOnce sync.Once + prometheusSink *prometheus.PrometheusSink +) + +func getPrometheusSink(t *testing.T) *prometheus.PrometheusSink { + var err error + prometheusSinkOnce.Do(func() { + prometheusSink, err = prometheus.NewPrometheusSinkFrom(prometheus.PrometheusOpts{}) + require.NoError(t, err) + }) + + return prometheusSink +} + +func NewTestCommand(t *testing.T, ui *cli.MockUi, client kubernetes.Interface, logger hclog.Logger, connMgr consul.ServerConnectionManager) *Command { + sink := getPrometheusSink(t) + return &Command{ + UI: ui, + clientset: client, + logger: logger, + connMgr: connMgr, + flagMetricsRetentionTime: "1m", + prometheusSink: sink, + } +}