diff --git a/.changelog/4266.txt b/.changelog/4266.txt new file mode 100644 index 0000000000..44ea4ecf1c --- /dev/null +++ b/.changelog/4266.txt @@ -0,0 +1,3 @@ +```release-note:bug +sync-catalog: fix infinite retry loop when the catalog fails to connect to consul-server during the sync process +``` diff --git a/control-plane/catalog/to-consul/syncer.go b/control-plane/catalog/to-consul/syncer.go index 9f1df18ba6..93d158fdaa 100644 --- a/control-plane/catalog/to-consul/syncer.go +++ b/control-plane/catalog/to-consul/syncer.go @@ -11,10 +11,11 @@ import ( "github.com/cenkalti/backoff" mapset "github.com/deckarep/golang-set" - "github.com/hashicorp/consul-k8s/control-plane/consul" - "github.com/hashicorp/consul-k8s/control-plane/namespaces" "github.com/hashicorp/consul/api" "github.com/hashicorp/go-hclog" + + "github.com/hashicorp/consul-k8s/control-plane/consul" + "github.com/hashicorp/consul-k8s/control-plane/namespaces" ) const ( @@ -173,10 +174,31 @@ func (s *ConsulSyncer) watchReapableServices(ctx context.Context) { // because we have no tracked services in our maps yet. <-s.initialSync + // Run immediately the first time, then wait for the retry period + waitCh := time.After(0) + waitBeforeRetry := s.SyncPeriod / 4 + + for { + select { + case <-waitCh: + s.deregisterRemovedServices(ctx) + waitCh = time.After(waitBeforeRetry) + case <-ctx.Done(): + return + } + } +} + +// deregisterRemovedServices queries the Consul catalog for all services and +// schedules for deregistration any that no longer have a corresponding k8s +// service. +// +// This function is very similar to [deregisterRemovedService] but handles the case +// where the ServiceWatcher has been terminated but the service hasn't been deregistered +// yet. +func (s *ConsulSyncer) deregisterRemovedServices(ctx context.Context) { opts := &api.QueryOptions{ AllowStale: true, - WaitIndex: 1, - WaitTime: 1 * time.Minute, Filter: fmt.Sprintf("\"%s\" in Tags", s.ConsulK8STag), } @@ -184,80 +206,59 @@ func (s *ConsulSyncer) watchReapableServices(ctx context.Context) { opts.Namespace = "*" } - // minWait is the minimum time to wait between scheduling service deletes. - // This prevents a lot of churn in services causing high CPU usage. - minWait := s.SyncPeriod / 4 - minWaitCh := time.After(0) - for { - // Create a new consul client. - consulClient, err := consul.NewClientFromConnMgr(s.ConsulClientConfig, s.ConsulServerConnMgr) - if err != nil { - s.Log.Error("failed to create Consul API client", "err", err) - return - } + consulClient, err := consul.NewClientFromConnMgr(s.ConsulClientConfig, s.ConsulServerConnMgr) + if err != nil { + s.Log.Error("failed to create Consul API client", "error", err) + return + } - var services *api.CatalogNodeServiceList - var meta *api.QueryMeta - err = backoff.Retry(func() error { - services, meta, err = consulClient.Catalog().NodeServiceList(s.ConsulNodeName, opts) - return err - }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx)) + // Limit our backoff so that we don't try forever with a bad client + b := backoff.WithContext( + backoff.WithMaxRetries( + backoff.NewExponentialBackOff(), 5), ctx) + var services *api.CatalogNodeServiceList + err = backoff.Retry(func() error { + services, _, err = consulClient.Catalog().NodeServiceList(s.ConsulNodeName, opts) if err != nil { - s.Log.Warn("error querying services, will retry", "err", err) - } else { - s.Log.Debug("[watchReapableServices] services returned from catalog", - "services", services) - } - - // Wait our minimum time before continuing or retrying - select { - case <-minWaitCh: - if err != nil { - continue - } - - minWaitCh = time.After(minWait) - case <-ctx.Done(): - return + s.Log.Warn("error querying services, will retry", "error", err) + return err } - // Update our blocking index - opts.WaitIndex = meta.LastIndex - - // Lock so we can modify the stored state - s.lock.Lock() + return nil + }, b) + if err != nil { + return + } - // Go through the service array and find services that should be reaped - for _, service := range services.Services { - // Check that the namespace exists in the valid service names map - // before checking whether it contains the service - svcNs := service.Namespace - if !s.EnableNamespaces { - // Set namespace to empty when namespaces are not enabled. - svcNs = "" - } - if _, ok := s.serviceNames[svcNs]; ok { - // We only care if we don't know about this service at all. - if s.serviceNames[svcNs].Contains(service.Service) { - s.Log.Debug("[watchReapableServices] serviceNames contains service", - "namespace", svcNs, - "service-name", service.Service) - continue - } - } + // Lock so we can modify the stored state + s.lock.Lock() + defer s.lock.Unlock() - s.Log.Info("invalid service found, scheduling for delete", - "service-name", service.Service, "service-id", service.ID, "service-consul-namespace", svcNs) - if err = s.scheduleReapServiceLocked(service.Service, svcNs); err != nil { - s.Log.Info("error querying service for delete", - "service-name", service.Service, - "service-consul-namespace", svcNs, - "err", err) + // Go through the service array and find services that should be reaped + for _, service := range services.Services { + // Check that the namespace exists in the valid service names map + // before checking whether it contains the service + namespace := service.Namespace + if !s.EnableNamespaces { + // Set namespace to empty when namespaces are not enabled. + namespace = "" + } + if _, ok := s.serviceNames[namespace]; ok { + // We only care if we don't know about this service at all. + if s.serviceNames[namespace].Contains(service.Service) { + continue } } - s.lock.Unlock() + s.Log.Info("invalid service found, scheduling for delete", + "service-name", service.Service, "service-id", service.ID, "service-consul-namespace", namespace) + if err = s.scheduleReapServiceLocked(service.Service, namespace); err != nil { + s.Log.Info("error querying service for delete", + "service-name", service.Service, + "service-consul-namespace", namespace, + "err", err) + } } } @@ -267,72 +268,88 @@ func (s *ConsulSyncer) watchService(ctx context.Context, name, namespace string) s.Log.Info("starting service watcher", "service-name", name, "service-consul-namespace", namespace) defer s.Log.Info("stopping service watcher", "service-name", name, "service-consul-namespace", namespace) + // Run immediately the first time, then wait for the retry period + waitCh := time.After(0) + waitBeforeRetry := s.SyncPeriod / 4 + for { select { + // Wait for our poll period + case <-waitCh: + s.deregisterRemovedService(ctx, name, namespace) + waitCh = time.After(waitBeforeRetry) // Quit if our context is over case <-ctx.Done(): return - - // Wait for our poll period - case <-time.After(s.SyncPeriod): } - // Set up query options - queryOpts := &api.QueryOptions{ - AllowStale: true, - } - if s.EnableNamespaces { - // Sets the Consul namespace to query the catalog - queryOpts.Namespace = namespace - } + } +} - // Create a new consul client. - consulClient, err := consul.NewClientFromConnMgr(s.ConsulClientConfig, s.ConsulServerConnMgr) +// deregisterRemovedService checks to see if a given service in the catalog +// has been removed from k8s. If it has, then the service is deregistered from +// the Consul catalog. +// +// This function is very similar to [deregisterRemovedServices] but is scoped to a single +// service that is currently being watched. +func (s *ConsulSyncer) deregisterRemovedService(ctx context.Context, name, namespace string) { + opts := &api.QueryOptions{ + AllowStale: true, + } + if s.EnableNamespaces { + opts.Namespace = namespace + } + + consulClient, err := consul.NewClientFromConnMgr(s.ConsulClientConfig, s.ConsulServerConnMgr) + if err != nil { + s.Log.Error("failed to create Consul API client; will retry", "err", err) + return + } + + // Limit our backoff so that we don't try forever with a bad client + b := backoff.WithContext( + backoff.WithMaxRetries( + backoff.NewExponentialBackOff(), 5), ctx) + + var services []*api.CatalogService + err = backoff.Retry(func() error { + services, _, err = consulClient.Catalog().Service(name, s.ConsulK8STag, opts) if err != nil { - s.Log.Error("failed to create Consul API client; will retry", "err", err) - continue - } - // Wait for service changes - var services []*api.CatalogService - err = backoff.Retry(func() error { - services, _, err = consulClient.Catalog().Service(name, s.ConsulK8STag, queryOpts) + s.Log.Warn("error querying service, will retry", "error", err) return err - }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx)) - if err != nil { - s.Log.Warn("error querying service, will retry", - "service-name", name, - "service-namespace", namespace, // will be "" if namespaces aren't enabled - "err", err) - continue } - // Lock so we can modify the set of actions to take - s.lock.Lock() + return nil + }, b) + if err != nil { + return + } - for _, svc := range services { - // Make sure the namespace exists before we run checks against it - if _, ok := s.serviceNames[namespace]; ok { - // If the service is valid and its info isn't nil, we don't deregister it - if s.serviceNames[namespace].Contains(svc.ServiceName) && s.namespaces[namespace][svc.ServiceID] != nil { - continue - } - } + // Lock so we can modify the set of actions to take + s.lock.Lock() + defer s.lock.Unlock() - s.deregs[svc.ServiceID] = &api.CatalogDeregistration{ - Node: svc.Node, - ServiceID: svc.ServiceID, - } - if s.EnableNamespaces { - s.deregs[svc.ServiceID].Namespace = namespace + for _, service := range services { + // Make sure the namespace exists before we run checks against it + if _, ok := s.serviceNames[namespace]; ok { + // If the service is valid and its info isn't nil, we don't deregister it + if s.serviceNames[namespace].Contains(service.ServiceName) && s.namespaces[namespace][service.ServiceID] != nil { + continue } - s.Log.Debug("[watchService] service being scheduled for deregistration", - "namespace", namespace, - "service name", svc.ServiceName, - "service id", svc.ServiceID, - "service dereg", s.deregs[svc.ServiceID]) } - s.lock.Unlock() + s.deregs[service.ServiceID] = &api.CatalogDeregistration{ + Node: service.Node, + ServiceID: service.ServiceID, + } + if s.EnableNamespaces { + s.deregs[service.ServiceID].Namespace = namespace + } + s.Log.Debug("[watchService] service being scheduled for deregistration", + "namespace", namespace, + "service name", service.ServiceName, + "service id", service.ServiceID, + "service dereg", s.deregs[service.ServiceID]) } }