From 9172a133a4245cf5f893f93619e39ee15c2d671c Mon Sep 17 00:00:00 2001 From: morvencao Date: Mon, 23 Dec 2024 06:09:05 +0000 Subject: [PATCH] event handler to event filter. Signed-off-by: morvencao --- cmd/maestro/servecmd/cmd.go | 12 +- cmd/maestro/server/controllers.go | 4 +- cmd/maestro/server/event_server.go | 12 +- cmd/maestro/server/grpc_broker.go | 111 ++++++++-------- pkg/api/event_instances.go | 5 +- pkg/controllers/event_filter.go | 96 +++++++++++++ pkg/controllers/event_filter_test.go | 150 +++++++++++++++++++++ pkg/controllers/event_handler.go | 177 ------------------------ pkg/controllers/event_handler_test.go | 185 -------------------------- pkg/controllers/framework.go | 35 +++-- pkg/controllers/framework_test.go | 163 +++++++++++++---------- pkg/dao/event_instance.go | 21 +-- pkg/dao/mocks/event.go | 11 +- pkg/dao/mocks/event_instance.go | 16 +-- test/helper.go | 8 +- test/integration/controller_test.go | 76 +++-------- 16 files changed, 470 insertions(+), 612 deletions(-) create mode 100644 pkg/controllers/event_filter.go create mode 100644 pkg/controllers/event_filter_test.go delete mode 100644 pkg/controllers/event_handler.go delete mode 100644 pkg/controllers/event_handler_test.go diff --git a/cmd/maestro/servecmd/cmd.go b/cmd/maestro/servecmd/cmd.go index d582e060..8f393b36 100755 --- a/cmd/maestro/servecmd/cmd.go +++ b/cmd/maestro/servecmd/cmd.go @@ -49,14 +49,11 @@ func runServer(cmd *cobra.Command, args []string) { // For gRPC, create a gRPC broker to handle resource spec and status events. // For MQTT/Kafka, create a message queue based event server to handle resource spec and status events. var eventServer server.EventServer - var eventHandler controllers.EventHandler + var eventFilter controllers.EventFilter if environments.Environment().Config.MessageBroker.MessageBrokerType == "grpc" { klog.Info("Setting up grpc broker") eventServer = server.NewGRPCBroker(eventBroadcaster) - eventHandler = controllers.NewPredicatedEventHandler(eventServer.PredicateEvent, - environments.Environment().Services.Events(), - dao.NewEventInstanceDao(&environments.Environment().Database.SessionFactory), - dao.NewInstanceDao(&environments.Environment().Database.SessionFactory)) + eventFilter = controllers.NewPredicatedEventFilter(eventServer.PredicateEvent) } else { klog.Info("Setting up message queue event server") var statusDispatcher dispatcher.Dispatcher @@ -74,14 +71,13 @@ func runServer(cmd *cobra.Command, args []string) { // Set the status dispatcher for the healthcheck server healthcheckServer.SetStatusDispatcher(statusDispatcher) eventServer = server.NewMessageQueueEventServer(eventBroadcaster, statusDispatcher) - eventHandler = controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(environments.Environment().Database.SessionFactory), - environments.Environment().Services.Events()) + eventFilter = controllers.NewLockBasedEventFilter(db.NewAdvisoryLockFactory(environments.Environment().Database.SessionFactory)) } // Create the servers apiserver := server.NewAPIServer(eventBroadcaster) metricsServer := server.NewMetricsServer() - controllersServer := server.NewControllersServer(eventServer, eventHandler) + controllersServer := server.NewControllersServer(eventServer, eventFilter) ctx, cancel := context.WithCancel(context.Background()) diff --git a/cmd/maestro/server/controllers.go b/cmd/maestro/server/controllers.go index 38d09ab2..ea54952d 100755 --- a/cmd/maestro/server/controllers.go +++ b/cmd/maestro/server/controllers.go @@ -11,10 +11,10 @@ import ( "github.com/openshift-online/maestro/pkg/logger" ) -func NewControllersServer(eventServer EventServer, eventHandler controllers.EventHandler) *ControllersServer { +func NewControllersServer(eventServer EventServer, eventFilter controllers.EventFilter) *ControllersServer { s := &ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - eventHandler, + eventFilter, env().Services.Events(), ), StatusController: controllers.NewStatusController( diff --git a/cmd/maestro/server/event_server.go b/cmd/maestro/server/event_server.go index 4ff89381..daaf5767 100644 --- a/cmd/maestro/server/event_server.go +++ b/cmd/maestro/server/event_server.go @@ -29,13 +29,13 @@ type EventServer interface { Start(ctx context.Context) // OnCreate handles the creation of a resource. - OnCreate(ctx context.Context, eventID, resourceID string) error + OnCreate(ctx context.Context, resourceID string) error // OnUpdate handles updates to a resource. - OnUpdate(ctx context.Context, eventID, resourceID string) error + OnUpdate(ctx context.Context, resourceID string) error // OnDelete handles the deletion of a resource. - OnDelete(ctx context.Context, eventID, resourceID string) error + OnDelete(ctx context.Context, resourceID string) error // OnStatusUpdate handles status update events for a resource. OnStatusUpdate(ctx context.Context, eventID, resourceID string) error @@ -117,17 +117,17 @@ func (s *MessageQueueEventServer) startSubscription(ctx context.Context) { } // OnCreate will be called on each new resource creation event inserted into db. -func (s *MessageQueueEventServer) OnCreate(ctx context.Context, eventID, resourceID string) error { +func (s *MessageQueueEventServer) OnCreate(ctx context.Context, resourceID string) error { return s.sourceClient.OnCreate(ctx, resourceID) } // OnUpdate will be called on each new resource update event inserted into db. -func (s *MessageQueueEventServer) OnUpdate(ctx context.Context, eventID, resourceID string) error { +func (s *MessageQueueEventServer) OnUpdate(ctx context.Context, resourceID string) error { return s.sourceClient.OnUpdate(ctx, resourceID) } // OnDelete will be called on each new resource deletion event inserted into db. -func (s *MessageQueueEventServer) OnDelete(ctx context.Context, eventID, resourceID string) error { +func (s *MessageQueueEventServer) OnDelete(ctx context.Context, resourceID string) error { return s.sourceClient.OnDelete(ctx, resourceID) } diff --git a/cmd/maestro/server/grpc_broker.go b/cmd/maestro/server/grpc_broker.go index d1574f1d..e4d9a655 100644 --- a/cmd/maestro/server/grpc_broker.go +++ b/cmd/maestro/server/grpc_broker.go @@ -29,7 +29,12 @@ import ( "github.com/openshift-online/maestro/pkg/services" ) -type resourceHandler func(res *api.Resource) error +// resourceHandler processes a resource spec by encoding it to a CloudEvent and sending it to the subscriber. +// It returns a bool indicating if the connection is closed and an error if one occurs. +// - Returns true and an error if the connection is closed. +// - Returns false and an error if encoding fails. +// - Returns false and nil if successful. +type resourceHandler func(res *api.Resource) (bool, error) // subscriber defines a subscriber that can receive and handle resource spec. type subscriber struct { @@ -182,36 +187,55 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv return fmt.Errorf("invalid subscription request: missing cluster name") } // register the cluster for subscription to the resource spec - subscriberID, errChan := bkr.register(subReq.ClusterName, func(res *api.Resource) error { + subscriberID, errChan := bkr.register(subReq.ClusterName, func(res *api.Resource) (bool, error) { evt, err := encodeResourceSpec(res) if err != nil { - return fmt.Errorf("failed to encode resource %s to cloudevent: %v", res.ID, err) + // return the error to requeue the event if encoding fails (e.g., due to invalid resource spec). + return false, fmt.Errorf("failed to encode resource %s to cloudevent: %v", res.ID, err) } - klog.V(4).Infof("send the event to spec subscribers, %s", evt) - // WARNING: don't use "pbEvt, err := pb.ToProto(evt)" to convert cloudevent to protobuf pbEvt := &pbv1.CloudEvent{} if err = grpcprotocol.WritePBMessage(context.TODO(), binding.ToMessage(evt), pbEvt); err != nil { - return fmt.Errorf("failed to convert cloudevent to protobuf: %v", err) + // return the error to requeue the event if converting to protobuf fails (e.g., due to invalid cloudevent). + return false, fmt.Errorf("failed to convert cloudevent to protobuf for resource(%s): %v", res.ID, err) } // send the cloudevent to the subscriber - // TODO: error handling to address errors beyond network issues. + klog.V(4).Infof("sending the event to spec subscribers, %s", evt) if err := subServer.Send(pbEvt); err != nil { klog.Errorf("failed to send grpc event, %v", err) - return err // return error to requeue the spec event + // Return true to ensure the subscriber will be unregistered when sending fails, which will close the subserver stream. + // See: https://github.com/grpc/grpc-go/blob/b615b35c4feb932a0ac658fb86b7127f10ef664e/stream.go#L1537 for more details. + // Return the error without wrapping, as it contains the gRPC error code and message for future (TODO) handling beyond network issues. + // This will also not requeue the event, as the error will cause the connection to the subscriber to be closed. + // If the subscriber (agent) reconnects, rely on the agent's resync to retrieve the missing resource spec. + return true, err } - return nil + return false, nil }) select { case err := <-errChan: + // An error occurred while sending the event to the subscriber. + // This could be due to multiple reasons: + // see: https://grpc.io/docs/guides/error/ + // 1. general errors such as: deadline exceeded before return the response. + // 2. network errors such as: connection closed by intermidiate proxy. + // 3. protocol errors such as: compression error or flow control error. + // In all above cases, unregister the subscriber. + // TODO: unregister the subscriber if the error is a network error and the connection could be re-established. klog.Errorf("unregister subscriber %s, error= %v", subscriberID, err) bkr.unregister(subscriberID) return err case <-subServer.Context().Done(): + // The context of the stream has been canceled or completed. + // This could happen if: + // - The client closed the connection or canceled the stream. + // - The server closed the stream, potentially due to a shutdown. + // Regardless of the reason, unregister the subscriber and stop processing. + // No error is returned here because the stream closure is expected. bkr.unregister(subscriberID) return nil } @@ -380,61 +404,53 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy } // handleRes publish the resource to the correct subscriber. -func (bkr *GRPCBroker) handleRes(resource *api.Resource) { +func (bkr *GRPCBroker) handleRes(resource *api.Resource) error { bkr.mu.RLock() defer bkr.mu.RUnlock() for _, subscriber := range bkr.subscribers { if subscriber.clusterName == resource.ConsumerName { - if err := subscriber.handler(resource); err != nil { - subscriber.errChan <- err + if isConnClosed, err := subscriber.handler(resource); err != nil { + if isConnClosed { + // if the connection is closed, write the error to the subscriber's error channel + // to ensure the subscriber is unregistered + subscriber.errChan <- err + } + // return the error to requeue the event if handling fails. + return err } } } -} - -// handleResEvent publish the resource to the correct subscriber and add the event instance record. -func (bkr *GRPCBroker) handleResEvent(ctx context.Context, eventID string, resource *api.Resource) error { - bkr.handleRes(resource) - - // add the event instance record to mark the event has been processed by the current instance - if _, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{ - SpecEventID: eventID, - InstanceID: bkr.instanceID, - }); err != nil { - return fmt.Errorf("failed to create event instance record %s: %s", eventID, err.Error()) - } - return nil } // OnCreate is called by the controller when a resource is created on the maestro server. -func (bkr *GRPCBroker) OnCreate(ctx context.Context, eventID, resourceID string) error { - resource, err := bkr.resourceService.Get(ctx, resourceID) +func (bkr *GRPCBroker) OnCreate(ctx context.Context, id string) error { + resource, err := bkr.resourceService.Get(ctx, id) if err != nil { return err } - return bkr.handleResEvent(ctx, eventID, resource) + return bkr.handleRes(resource) } // OnUpdate is called by the controller when a resource is updated on the maestro server. -func (bkr *GRPCBroker) OnUpdate(ctx context.Context, eventID, resourceID string) error { - resource, err := bkr.resourceService.Get(ctx, resourceID) +func (bkr *GRPCBroker) OnUpdate(ctx context.Context, id string) error { + resource, err := bkr.resourceService.Get(ctx, id) if err != nil { return err } - return bkr.handleResEvent(ctx, eventID, resource) + return bkr.handleRes(resource) } // OnDelete is called by the controller when a resource is deleted from the maestro server. -func (bkr *GRPCBroker) OnDelete(ctx context.Context, eventID, resourceID string) error { - resource, err := bkr.resourceService.Get(ctx, resourceID) +func (bkr *GRPCBroker) OnDelete(ctx context.Context, id string) error { + resource, err := bkr.resourceService.Get(ctx, id) if err != nil { return err } - return bkr.handleResEvent(ctx, eventID, resource) + return bkr.handleRes(resource) } // On StatusUpdate will be called on each new status event inserted into db. @@ -470,35 +486,16 @@ func (bkr *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool resource, svcErr := bkr.resourceService.Get(ctx, evt.SourceID) if svcErr != nil { - // if the resource is not found, it indicates the resource has been handled by - // other instances, so we can mark the event as reconciled and ignore it. + // if the resource is not found, it indicates the resource has been handled by other instances. if svcErr.Is404() { klog.V(10).Infof("The resource %s has been deleted, mark the event as reconciled", evt.SourceID) - now := time.Now() - evt.ReconciledDate = &now - if _, svcErr := bkr.eventService.Replace(ctx, evt); svcErr != nil { - return false, fmt.Errorf("failed to update event %s: %s", evt.ID, svcErr.Error()) - } return false, nil } return false, fmt.Errorf("failed to get resource %s: %s", evt.SourceID, svcErr.Error()) } - if bkr.IsConsumerSubscribed(resource.ConsumerName) { - return true, nil - } - - // if the consumer is not subscribed to the broker, then add the event instance record - // to indicate the event has been processed by the instance - if _, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{ - SpecEventID: eventID, - InstanceID: bkr.instanceID, - }); err != nil { - return false, fmt.Errorf("failed to create event instance record %s: %s", eventID, err.Error()) - } - klog.V(10).Infof("The consumer %s is not subscribed to the broker, added the event instance record", resource.ConsumerName) - - return false, nil + // check if the consumer is subscribed to the broker + return bkr.IsConsumerSubscribed(resource.ConsumerName), nil } // IsConsumerSubscribed returns true if the consumer is subscribed to the broker for resource spec. diff --git a/pkg/api/event_instances.go b/pkg/api/event_instances.go index 97141852..413147e8 100644 --- a/pkg/api/event_instances.go +++ b/pkg/api/event_instances.go @@ -1,9 +1,8 @@ package api type EventInstance struct { - EventID string `gorm:"default:null"` - SpecEventID string `gorm:"default:null"` - InstanceID string + EventID string + InstanceID string } type EventInstanceList []*EventInstance diff --git a/pkg/controllers/event_filter.go b/pkg/controllers/event_filter.go new file mode 100644 index 00000000..cdabc4b8 --- /dev/null +++ b/pkg/controllers/event_filter.go @@ -0,0 +1,96 @@ +package controllers + +import ( + "context" + "fmt" + + "github.com/openshift-online/maestro/pkg/db" +) + +// EventFilter defines an interface for filtering and deferring actions on events. +// Implementations of EventFilter should provide logic for determining whether an event +// should be processed and for handling any actions that need to be deferred. +// +// - Filter: Decides whether the event should be processed based on its ID. +// - DeferredAction: Allows for scheduling actions that should occur regardless of whether the event +// was processed successfully or not, such as cleanup tasks or releasing resources. +type EventFilter interface { + // Filter determines whether the event should be processed. + // Returns true if the event should be handled, false and an error otherwise. + Filter(ctx context.Context, id string) (bool, error) + + // DeferredAction schedules actions to be executed regardless of event processing success. + DeferredAction(ctx context.Context, id string) +} + +// LockBasedEventFilter implements EventFilter using a locking mechanism for event processing. +// It creates advisory locks on event IDs to ensure thread-safe access. +// - Filter acquires a lock on the event ID and returns true if the lock is successful. +// - DeferredAction releases the lock for the event ID. +type LockBasedEventFilter struct { + lockFactory db.LockFactory + // locks map is accessed by a single-threaded handler goroutine, no need for lock on it. + locks map[string]string +} + +func NewLockBasedEventFilter(lockFactory db.LockFactory) EventFilter { + return &LockBasedEventFilter{ + lockFactory: lockFactory, + locks: make(map[string]string), + } +} + +// Filter attempts to acquire a lock on the event ID. Returns true if successful, false and error otherwise. +func (h *LockBasedEventFilter) Filter(ctx context.Context, id string) (bool, error) { + // lock the Event with a fail-fast advisory lock context. + // this allows concurrent processing of many events by one or many controller managers. + // allow the lock to be released by the handler goroutine and allow this function to continue. + // subsequent events will be locked by their own distinct IDs. + lockOwnerID, acquired, err := h.lockFactory.NewNonBlockingLock(ctx, id, db.Events) + // store the lock owner ID for deferred action + h.locks[id] = lockOwnerID + if err != nil { + return false, fmt.Errorf("error obtaining the event lock: %v", err) + } + + if !acquired { + logger.V(4).Infof("Event %s is processed by another worker", id) + return false, nil + } + + return true, nil +} + +// DeferredAction releases the lock for the given event ID if it was acquired. +func (h *LockBasedEventFilter) DeferredAction(ctx context.Context, id string) { + if ownerID, exists := h.locks[id]; exists { + h.lockFactory.Unlock(ctx, ownerID) + delete(h.locks, id) + } +} + +// eventFilterPredicate is a function type for filtering events based on their ID. +type eventFilterPredicate func(ctx context.Context, eventID string) (bool, error) + +// PredicatedEventFilter implements EventFilter using a predicate function for event filtering. +// - Filter uses the predicate to decide if the event should be processed. +// - DeferredAction is a no-op as no locking is performed. +type PredicatedEventFilter struct { + predicate eventFilterPredicate +} + +func NewPredicatedEventFilter(predicate eventFilterPredicate) EventFilter { + return &PredicatedEventFilter{ + predicate: predicate, + } +} + +// Filter calls the predicate function to determine if the event should be processed. +func (h *PredicatedEventFilter) Filter(ctx context.Context, id string) (bool, error) { + return h.predicate(ctx, id) +} + +// DeferredAction is a no-op since no locks are involved. +func (h *PredicatedEventFilter) DeferredAction(ctx context.Context, id string) { + // no-op +} diff --git a/pkg/controllers/event_filter_test.go b/pkg/controllers/event_filter_test.go new file mode 100644 index 00000000..2bb22e36 --- /dev/null +++ b/pkg/controllers/event_filter_test.go @@ -0,0 +1,150 @@ +package controllers + +import ( + "context" + "testing" + + "github.com/google/uuid" + . "github.com/onsi/gomega" + "github.com/openshift-online/maestro/pkg/api" + "github.com/openshift-online/maestro/pkg/dao/mocks" + dbmocks "github.com/openshift-online/maestro/pkg/db/mocks" +) + +func TestLockingEventFilter(t *testing.T) { + RegisterTestingT(t) + + source := "my-event-source" + ctx := context.Background() + eventsDao := mocks.NewEventDao() + eventFilter := NewLockBasedEventFilter(dbmocks.NewMockAdvisoryLockFactory()) + + _, err := eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "1"}, + Source: source, + SourceID: "any id", + EventType: api.CreateEventType, + }) + Expect(err).To(BeNil()) + + _, err = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "2"}, + Source: source, + SourceID: "any id", + EventType: api.CreateEventType, + }) + Expect(err).To(BeNil()) + + shouldProcess, err := eventFilter.Filter(ctx, "1") + Expect(err).To(BeNil()) + Expect(shouldProcess).To(BeTrue()) + + lockingEventFilter, ok := eventFilter.(*LockBasedEventFilter) + Expect(ok).To(BeTrue()) + Expect(lockingEventFilter.locks).To(HaveLen(1)) + + eventFilter.DeferredAction(ctx, "1") + Expect(lockingEventFilter.locks).To(HaveLen(0)) + + event, err := eventsDao.Get(ctx, "1") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).To(BeNil()) + + shouldProcess, err = eventFilter.Filter(ctx, "2") + Expect(err).To(BeNil()) + Expect(shouldProcess).To(BeTrue()) + Expect(lockingEventFilter.locks).To(HaveLen(1)) + + eventFilter.DeferredAction(ctx, "2") + Expect(lockingEventFilter.locks).To(HaveLen(0)) + + event, err = eventsDao.Get(ctx, "2") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).To(BeNil()) + + shouldProcess, err = eventFilter.Filter(ctx, "3") + Expect(err).To(BeNil()) + Expect(shouldProcess).To(BeTrue()) + Expect(lockingEventFilter.locks).To(HaveLen(1)) +} + +func TestPredicatedEventFilter(t *testing.T) { + RegisterTestingT(t) + + source := "my-event-source" + ctx := context.Background() + eventsDao := mocks.NewEventDao() + resourcesDao := mocks.NewResourceDao() + eventServer := &exampleEventServer{eventsDao: eventsDao, resourcesDao: resourcesDao, subscrbers: []string{"cluster1"}} + eventFilter := NewPredicatedEventFilter(eventServer.PredicateEvent) + + resID := uuid.New().String() + _, err := resourcesDao.Create(ctx, &api.Resource{ + Meta: api.Meta{ID: resID}, + ConsumerName: "cluster1", + Source: source, + }) + Expect(err).To(BeNil()) + + _, err = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "1"}, + Source: source, + SourceID: resID, + EventType: api.CreateEventType, + }) + Expect(err).To(BeNil()) + + _, err = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "2"}, + Source: source, + SourceID: "any id", + EventType: api.CreateEventType, + }) + Expect(err).To(BeNil()) + + newResID := uuid.New().String() + _, err = resourcesDao.Create(ctx, &api.Resource{ + Meta: api.Meta{ID: newResID}, + ConsumerName: "cluster2", + Source: source, + }) + Expect(err).To(BeNil()) + + _, err = eventsDao.Create(ctx, &api.Event{ + Meta: api.Meta{ID: "3"}, + Source: source, + SourceID: newResID, + EventType: api.DeleteEventType, + }) + Expect(err).To(BeNil()) + + // handle event 1 + shouldProcess, err := eventFilter.Filter(ctx, "1") + Expect(err).To(BeNil()) + Expect(shouldProcess).To(BeTrue()) + + // call deferred action + eventFilter.DeferredAction(ctx, "1") + + event, err := eventsDao.Get(ctx, "1") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).To(BeNil()) + + // handle event 2 + shouldProcess, err = eventFilter.Filter(ctx, "2") + Expect(err).To(BeNil()) + Expect(shouldProcess).NotTo(BeTrue()) + + event, err = eventsDao.Get(ctx, "2") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).NotTo(BeNil()) + + // handle event 3 + shouldProcess, err = eventFilter.Filter(ctx, "3") + Expect(err).To(BeNil()) + Expect(shouldProcess).NotTo(BeTrue()) + + event, err = eventsDao.Get(ctx, "3") + Expect(err).To(BeNil()) + Expect(event.ReconciledDate).To(BeNil()) +} diff --git a/pkg/controllers/event_handler.go b/pkg/controllers/event_handler.go deleted file mode 100644 index 8794da61..00000000 --- a/pkg/controllers/event_handler.go +++ /dev/null @@ -1,177 +0,0 @@ -package controllers - -import ( - "context" - "fmt" - "time" - - "github.com/openshift-online/maestro/pkg/api" - "github.com/openshift-online/maestro/pkg/dao" - "github.com/openshift-online/maestro/pkg/db" - "github.com/openshift-online/maestro/pkg/services" - "k8s.io/klog/v2" -) - -// EventHandler defines the actions to handle an event at various stages of its lifecycle. -type EventHandler interface { - // ShouldHandleEvent determines whether the event should be processed. - // Returns true if the event should be handled, false and an error otherwise. - ShouldHandleEvent(ctx context.Context, id string) (bool, error) - - // DeferredAction schedules any deferred actions that need to be executed - // after the event is processed successfully or unsuccessfully. - DeferredAction(ctx context.Context, id string) - - // PostProcess is called after the event is processed to perform any cleanup - // or additional actions required for the event. - PostProcess(ctx context.Context, event *api.Event) error -} - -// LockBasedEventHandler is an implementation of EventHandler that uses a locking mechanism to control event processing. -// It leverages a lock factory to create advisory locks for each event ID, ensuring non-blocking, thread-safe access. -// - ShouldHandleEvent acquires the lock for the event ID and returns true if the lock is successful. -// - DeferredAction releases the lock for the event ID. -// - PostProcess updates the event with a reconciled date after processing. -type LockBasedEventHandler struct { - lockFactory db.LockFactory - locks map[string]string - events services.EventService -} - -func NewLockBasedEventHandler(lockFactory db.LockFactory, events services.EventService) EventHandler { - return &LockBasedEventHandler{ - lockFactory: lockFactory, - locks: make(map[string]string), - events: events, - } -} - -func (h *LockBasedEventHandler) ShouldHandleEvent(ctx context.Context, id string) (bool, error) { - // lock the Event with a fail-fast advisory lock context. - // this allows concurrent processing of many events by one or many controller managers. - // allow the lock to be released by the handler goroutine and allow this function to continue. - // subsequent events will be locked by their own distinct IDs. - lockOwnerID, acquired, err := h.lockFactory.NewNonBlockingLock(ctx, id, db.Events) - // store the lock owner ID for deferred action - h.locks[id] = lockOwnerID - if err != nil { - return false, fmt.Errorf("error obtaining the event lock: %v", err) - } - - if !acquired { - logger.V(4).Infof("Event %s is processed by another worker", id) - return false, nil - } - - return true, nil -} - -func (h *LockBasedEventHandler) DeferredAction(ctx context.Context, id string) { - if ownerID, exists := h.locks[id]; exists { - h.lockFactory.Unlock(ctx, ownerID) - delete(h.locks, id) - } -} - -func (h *LockBasedEventHandler) PostProcess(ctx context.Context, event *api.Event) error { - // update the event with the reconciled date - if event != nil { - now := time.Now() - event.ReconciledDate = &now - if _, svcErr := h.events.Replace(ctx, event); svcErr != nil { - return fmt.Errorf("error updating event with id(%s): %s", event.ID, svcErr) - } - } - - return nil -} - -// eventHandlerPredicate is a function type for filtering events based on their ID. -type eventHandlerPredicate func(ctx context.Context, eventID string) (bool, error) - -// PredicatedEventHandler is an implementation of EventHandler that filters events using a predicate function. -// - ShouldHandleEvent uses the predicate to determine if the event should be processed by ID. -// - DeferredAction is a no-op as no locking is performed. -// - PostProcess updates the event with the reconciled date and checks if it's processed by all instances. -// If all instances have processed the event, it marks the event as reconciled. -type PredicatedEventHandler struct { - predicate eventHandlerPredicate - events services.EventService - eventInstanceDao dao.EventInstanceDao - instanceDao dao.InstanceDao -} - -func NewPredicatedEventHandler(predicate eventHandlerPredicate, events services.EventService, eventInstanceDao dao.EventInstanceDao, instanceDao dao.InstanceDao) EventHandler { - return &PredicatedEventHandler{ - predicate: predicate, - events: events, - eventInstanceDao: eventInstanceDao, - instanceDao: instanceDao, - } -} - -func (h *PredicatedEventHandler) ShouldHandleEvent(ctx context.Context, id string) (bool, error) { - return h.predicate(ctx, id) -} - -func (h *PredicatedEventHandler) DeferredAction(ctx context.Context, id string) { - // no-op -} - -func (h *PredicatedEventHandler) PostProcess(ctx context.Context, event *api.Event) error { - // check the event and alive instances - // if the event is handled by all alive instances, mark the event as reconciled - activeInstances, err := h.instanceDao.FindReadyIDs(ctx) - if err != nil { - return fmt.Errorf("error finding ready instances: %v", err) - } - - processedInstances, err := h.eventInstanceDao.GetInstancesBySpecEventID(ctx, event.ID) - if err != nil { - return fmt.Errorf("error finding processed instances for event %s: %v", event.ID, err) - } - - // should never happen. If the event is not processed by any instance, return an error - if len(processedInstances) == 0 { - klog.V(10).Infof("Event %s is not processed by any instance", event.ID) - return fmt.Errorf("event %s is not processed by any instance", event.ID) - } - - // check if all instances have processed the event - // 1. In normal case, the activeInstances == eventInstances, mark the event as reconciled - // 2. If maestro server instance is up, but has't been marked as ready, then activeInstances < eventInstances, - // it's ok to mark the event as reconciled, as the instance is not ready to sever the request, no connected agents. - // 3. If maestro server instance is down, but has been marked as unready, it may still have connected agents, but - // the instance has stopped to handle the event, so activeInstances > eventInstances, the event should be equeued. - if !isSubSet(activeInstances, processedInstances) { - klog.V(10).Infof("Event %s is not processed by all active instances %v, handled by %v", event.ID, activeInstances, processedInstances) - return fmt.Errorf("event %s is not processed by all active instances", event.ID) - } - - // update the event with the reconciled date - now := time.Now() - event.ReconciledDate = &now - if _, svcErr := h.events.Replace(ctx, event); svcErr != nil { - return fmt.Errorf("error updating event with id(%s): %s", event.ID, svcErr) - } - - return nil -} - -// isSubSet checks if slice a is a subset of slice b -func isSubSet(a, b []string) bool { - for _, v := range a { - found := false - for _, vv := range b { - if v == vv { - found = true - break - } - } - if !found { - return false - } - } - - return true -} diff --git a/pkg/controllers/event_handler_test.go b/pkg/controllers/event_handler_test.go deleted file mode 100644 index 8a00002d..00000000 --- a/pkg/controllers/event_handler_test.go +++ /dev/null @@ -1,185 +0,0 @@ -package controllers - -import ( - "context" - "testing" - - . "github.com/onsi/gomega" - "github.com/openshift-online/maestro/pkg/api" - "github.com/openshift-online/maestro/pkg/dao/mocks" - dbmocks "github.com/openshift-online/maestro/pkg/db/mocks" - "github.com/openshift-online/maestro/pkg/services" -) - -func TestLockingEventHandler(t *testing.T) { - RegisterTestingT(t) - - source := "my-event-source" - ctx := context.Background() - eventsDao := mocks.NewEventDao() - events := services.NewEventService(eventsDao) - eventHandler := NewLockBasedEventHandler(dbmocks.NewMockAdvisoryLockFactory(), events) - - _, _ = eventsDao.Create(ctx, &api.Event{ - Meta: api.Meta{ID: "1"}, - Source: source, - SourceID: "any id", - EventType: api.CreateEventType, - }) - - _, _ = eventsDao.Create(ctx, &api.Event{ - Meta: api.Meta{ID: "2"}, - Source: source, - SourceID: "any id", - EventType: api.CreateEventType, - }) - - shouldProcess, err := eventHandler.ShouldHandleEvent(ctx, "1") - Expect(err).To(BeNil()) - Expect(shouldProcess).To(BeTrue()) - - lockingEventHandler, ok := eventHandler.(*LockBasedEventHandler) - Expect(ok).To(BeTrue()) - Expect(lockingEventHandler.locks).To(HaveLen(1)) - - eventHandler.DeferredAction(ctx, "1") - Expect(lockingEventHandler.locks).To(HaveLen(0)) - - event, err := eventsDao.Get(ctx, "1") - Expect(err).To(BeNil()) - Expect(event.ReconciledDate).To(BeNil()) - - err = eventHandler.PostProcess(ctx, event) - Expect(err).To(BeNil()) - - event, err = eventsDao.Get(ctx, "1") - Expect(err).To(BeNil()) - Expect(event.ReconciledDate).NotTo(BeNil()) - - shouldProcess, err = eventHandler.ShouldHandleEvent(ctx, "2") - Expect(err).To(BeNil()) - Expect(shouldProcess).To(BeTrue()) - Expect(lockingEventHandler.locks).To(HaveLen(1)) - - eventHandler.DeferredAction(ctx, "2") - Expect(lockingEventHandler.locks).To(HaveLen(0)) - - event, err = eventsDao.Get(ctx, "2") - Expect(err).To(BeNil()) - Expect(event.ReconciledDate).To(BeNil()) - - shouldProcess, err = eventHandler.ShouldHandleEvent(ctx, "3") - Expect(err).To(BeNil()) - Expect(shouldProcess).To(BeTrue()) - Expect(lockingEventHandler.locks).To(HaveLen(1)) -} - -func TestPredicatedEventHandler(t *testing.T) { - RegisterTestingT(t) - - currentInstanceID := "test-instance" - anotherInstanceID := "another-instance" - source := "my-event-source" - ctx := context.Background() - eventsDao := mocks.NewEventDao() - events := services.NewEventService(eventsDao) - eventInstancesDao := mocks.NewEventInstanceDaoMock() - instancesDao := mocks.NewInstanceDao() - eventServer := &exampleEventServer{eventDao: eventsDao} - eventHandler := NewPredicatedEventHandler(eventServer.PredicateEvent, events, eventInstancesDao, instancesDao) - - // current instance is ready - _, _ = instancesDao.Create(ctx, &api.ServerInstance{ - Meta: api.Meta{ID: currentInstanceID}, - Ready: true, - }) - - // second instance is not ready - _, _ = instancesDao.Create(ctx, &api.ServerInstance{ - Meta: api.Meta{ID: anotherInstanceID}, - Ready: false, - }) - - _, _ = eventsDao.Create(ctx, &api.Event{ - Meta: api.Meta{ID: "1"}, - Source: source, - SourceID: "any id", - EventType: api.CreateEventType, - }) - - _, _ = eventsDao.Create(ctx, &api.Event{ - Meta: api.Meta{ID: "2"}, - Source: source, - SourceID: "any id", - EventType: api.CreateEventType, - }) - - // handle event 1 - shouldProcess, err := eventHandler.ShouldHandleEvent(ctx, "1") - Expect(err).To(BeNil()) - Expect(shouldProcess).To(BeTrue()) - - _, err = eventInstancesDao.Create(ctx, &api.EventInstance{ - SpecEventID: "1", - InstanceID: currentInstanceID, - }) - Expect(err).To(BeNil()) - - eventHandler.DeferredAction(ctx, "1") - - // simulate the second instance handled the event, although it has not been marked as ready - _, err = eventInstancesDao.Create(ctx, &api.EventInstance{ - SpecEventID: "1", - InstanceID: anotherInstanceID, - }) - Expect(err).To(BeNil()) - - event, err := eventsDao.Get(ctx, "1") - Expect(err).To(BeNil()) - Expect(event.ReconciledDate).To(BeNil()) - - // should post process the event the second instance is not ready - err = eventHandler.PostProcess(ctx, event) - Expect(err).To(BeNil()) - Expect(event.ReconciledDate).NotTo(BeNil()) - - // mark the second instance as ready - err = instancesDao.MarkReadyByIDs(ctx, []string{anotherInstanceID}) - Expect(err).To(BeNil()) - - // handle event 2 - shouldProcess, err = eventHandler.ShouldHandleEvent(ctx, "2") - Expect(err).To(BeNil()) - Expect(shouldProcess).To(BeTrue()) - - // simulate the current instance handled the event, the second instance is shutting down - // before it handled the event - _, err = eventInstancesDao.Create(ctx, &api.EventInstance{ - SpecEventID: "2", - InstanceID: currentInstanceID, - }) - Expect(err).To(BeNil()) - - eventHandler.DeferredAction(ctx, "2") - - event, err = eventsDao.Get(ctx, "2") - Expect(err).To(BeNil()) - Expect(event.ReconciledDate).To(BeNil()) - - err = eventHandler.PostProcess(ctx, event) - Expect(err).NotTo(BeNil()) - Expect(event.ReconciledDate).To(BeNil()) - - // mark the second instance as unready - err = instancesDao.MarkUnreadyByIDs(ctx, []string{anotherInstanceID}) - Expect(err).To(BeNil()) - - // simulate requeue the event - err = eventHandler.PostProcess(ctx, event) - Expect(err).To(BeNil()) - Expect(event.ReconciledDate).NotTo(BeNil()) - - shouldProcess, err = eventHandler.ShouldHandleEvent(ctx, "3") - Expect(err).NotTo(BeNil()) - Expect(shouldProcess).To(BeFalse()) -} diff --git a/pkg/controllers/framework.go b/pkg/controllers/framework.go index 62c56321..b55819ea 100755 --- a/pkg/controllers/framework.go +++ b/pkg/controllers/framework.go @@ -44,7 +44,7 @@ var logger = maestrologger.NewOCMLogger(context.Background()) // events sync will help us to handle unexpected errors (e.g. sever restart), it ensures we will not miss any events var defaultEventsSyncPeriod = 10 * time.Hour -type ControllerHandlerFunc func(ctx context.Context, eventID, sourceID string) error +type ControllerHandlerFunc func(ctx context.Context, id string) error type ControllerConfig struct { Source string @@ -52,18 +52,18 @@ type ControllerConfig struct { } type KindControllerManager struct { - controllers map[string]map[api.EventType][]ControllerHandlerFunc - eventHandler EventHandler - events services.EventService - eventsQueue workqueue.RateLimitingInterface + controllers map[string]map[api.EventType][]ControllerHandlerFunc + eventFilter EventFilter + events services.EventService + eventsQueue workqueue.RateLimitingInterface } -func NewKindControllerManager(eventHandler EventHandler, events services.EventService) *KindControllerManager { +func NewKindControllerManager(eventFilter EventFilter, events services.EventService) *KindControllerManager { return &KindControllerManager{ - controllers: map[string]map[api.EventType][]ControllerHandlerFunc{}, - eventHandler: eventHandler, - events: events, - eventsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "event-controller"), + controllers: map[string]map[api.EventType][]ControllerHandlerFunc{}, + eventFilter: eventFilter, + events: events, + eventsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "event-controller"), } } @@ -110,8 +110,8 @@ func (km *KindControllerManager) handleEvent(id string) error { reqContext := context.WithValue(context.Background(), EventID, id) // check if the event should be processed by this instance - shouldProcess, err := km.eventHandler.ShouldHandleEvent(reqContext, id) - defer km.eventHandler.DeferredAction(reqContext, id) + shouldProcess, err := km.eventFilter.Filter(reqContext, id) + defer km.eventFilter.DeferredAction(reqContext, id) if err != nil { return fmt.Errorf("error filtering event with id (%s): %s", id, err) } @@ -151,13 +151,20 @@ func (km *KindControllerManager) handleEvent(id string) error { } for _, fn := range handlerFns { - err := fn(reqContext, id, event.SourceID) + err := fn(reqContext, event.SourceID) if err != nil { return fmt.Errorf("error handing event %s-%s (%s): %s", event.Source, event.EventType, id, err) } } - return km.eventHandler.PostProcess(reqContext, event) + // all handlers successfully executed + now := time.Now() + event.ReconciledDate = &now + if _, svcErr := km.events.Replace(reqContext, event); svcErr != nil { + return fmt.Errorf("error updating event with id (%s): %s", id, svcErr) + } + + return nil } func (km *KindControllerManager) runWorker() { diff --git a/pkg/controllers/framework_test.go b/pkg/controllers/framework_test.go index c91a1374..10aebd35 100755 --- a/pkg/controllers/framework_test.go +++ b/pkg/controllers/framework_test.go @@ -2,14 +2,18 @@ package controllers import ( "context" + "fmt" "testing" + "time" + "github.com/google/uuid" . "github.com/onsi/gomega" "github.com/openshift-online/maestro/pkg/api" "github.com/openshift-online/maestro/pkg/dao" "github.com/openshift-online/maestro/pkg/dao/mocks" dbmocks "github.com/openshift-online/maestro/pkg/db/mocks" "github.com/openshift-online/maestro/pkg/services" + "gorm.io/gorm" ) func newExampleControllerConfig(ctrl *exampleController) *ControllerConfig { @@ -24,76 +28,61 @@ func newExampleControllerConfig(ctrl *exampleController) *ControllerConfig { } type exampleController struct { - instanceID string - eventInstancesDao dao.EventInstanceDao - addCounter int - updateCounter int - deleteCounter int + addCounter int + updateCounter int + deleteCounter int } -func (d *exampleController) OnAdd(ctx context.Context, eventID, resourceID string) error { +func (d *exampleController) OnAdd(ctx context.Context, id string) error { d.addCounter++ - _, err := d.eventInstancesDao.Create(ctx, &api.EventInstance{ - SpecEventID: eventID, - InstanceID: d.instanceID, - }) - return err + return nil } -func (d *exampleController) OnUpdate(ctx context.Context, eventID, resourceID string) error { +func (d *exampleController) OnUpdate(ctx context.Context, id string) error { d.updateCounter++ - _, err := d.eventInstancesDao.Create(ctx, &api.EventInstance{ - SpecEventID: eventID, - InstanceID: d.instanceID, - }) - return err + return nil } -func (d *exampleController) OnDelete(ctx context.Context, eventID, resourceID string) error { +func (d *exampleController) OnDelete(ctx context.Context, id string) error { d.deleteCounter++ - _, err := d.eventInstancesDao.Create(ctx, &api.EventInstance{ - SpecEventID: eventID, - InstanceID: d.instanceID, - }) - return err + return nil } -func TestControllerFrameworkWithLockBasedEventHandler(t *testing.T) { +func TestControllerFrameworkWithLockBasedEventFilter(t *testing.T) { RegisterTestingT(t) ctx := context.Background() eventsDao := mocks.NewEventDao() events := services.NewEventService(eventsDao) - eventInstancesDao := mocks.NewEventInstanceDaoMock() - mgr := NewKindControllerManager(NewLockBasedEventHandler(dbmocks.NewMockAdvisoryLockFactory(), events), events) + mgr := NewKindControllerManager(NewLockBasedEventFilter(dbmocks.NewMockAdvisoryLockFactory()), events) - ctrl := &exampleController{ - instanceID: "instance-1", - eventInstancesDao: eventInstancesDao, - } + ctrl := &exampleController{} config := newExampleControllerConfig(ctrl) mgr.Add(config) - _, _ = eventsDao.Create(ctx, &api.Event{ + _, err := eventsDao.Create(ctx, &api.Event{ Meta: api.Meta{ID: "1"}, Source: config.Source, SourceID: "any id", EventType: api.CreateEventType, }) + Expect(err).To(BeNil()) - _, _ = eventsDao.Create(ctx, &api.Event{ + _, err = eventsDao.Create(ctx, &api.Event{ Meta: api.Meta{ID: "2"}, Source: config.Source, SourceID: "any id", EventType: api.UpdateEventType, }) + Expect(err).To(BeNil()) - _, _ = eventsDao.Create(ctx, &api.Event{ + _, err = eventsDao.Create(ctx, &api.Event{ Meta: api.Meta{ID: "3"}, Source: config.Source, SourceID: "any id", EventType: api.DeleteEventType, }) + Expect(err).To(BeNil()) mgr.handleEvent("1") mgr.handleEvent("2") @@ -103,88 +92,126 @@ func TestControllerFrameworkWithLockBasedEventHandler(t *testing.T) { Expect(ctrl.updateCounter).To(Equal(1)) Expect(ctrl.deleteCounter).To(Equal(1)) - eve, _ := eventsDao.Get(ctx, "1") + eve, err := eventsDao.Get(ctx, "1") + Expect(err).To(BeNil()) Expect(eve.ReconciledDate).ToNot(BeNil(), "event reconcile date should be set") - eve, _ = eventsDao.Get(ctx, "2") + eve, err = eventsDao.Get(ctx, "2") + Expect(err).To(BeNil()) Expect(eve.ReconciledDate).ToNot(BeNil(), "event reconcile date should be set") - eve, _ = eventsDao.Get(ctx, "3") + eve, err = eventsDao.Get(ctx, "3") + Expect(err).To(BeNil()) Expect(eve.ReconciledDate).ToNot(BeNil(), "event reconcile date should be set") } type exampleEventServer struct { - eventDao dao.EventDao + eventsDao dao.EventDao + resourcesDao dao.ResourceDao + subscrbers []string } func (e *exampleEventServer) PredicateEvent(ctx context.Context, eventID string) (bool, error) { - _, err := e.eventDao.Get(ctx, eventID) + event, err := e.eventsDao.Get(ctx, eventID) + if err != nil { + return false, err + } + resource, err := e.resourcesDao.Get(ctx, event.SourceID) if err != nil { + // 404 == gorm.ErrRecordNotFound means the resource was deleted, so we can ignore the event + if err == gorm.ErrRecordNotFound { + now := time.Now() + event.ReconciledDate = &now + if _, svcErr := e.eventsDao.Replace(ctx, event); svcErr != nil { + return false, fmt.Errorf("failed to update event %s: %s", event.ID, svcErr.Error()) + } + return false, nil + } return false, err } - return true, nil + return contains(e.subscrbers, resource.ConsumerName), nil +} + +func contains(s []string, e string) bool { + for _, a := range s { + if a == e { + return true + } + } + return false } -func TestControllerFrameworkWithPredicatedEventHandler(t *testing.T) { +func TestControllerFrameworkWithPredicatedEventFilter(t *testing.T) { RegisterTestingT(t) - currentInstanceID := "test-instance" - anotherInstanceID := "another-instance" ctx := context.Background() eventsDao := mocks.NewEventDao() + resourcesDao := mocks.NewResourceDao() events := services.NewEventService(eventsDao) - eventServer := &exampleEventServer{eventDao: eventsDao} - eventInstancesDao := mocks.NewEventInstanceDaoMock() - instancesDao := mocks.NewInstanceDao() - eventHandler := NewPredicatedEventHandler(eventServer.PredicateEvent, events, eventInstancesDao, instancesDao) - mgr := NewKindControllerManager(eventHandler, events) - - ctrl := &exampleController{ - instanceID: currentInstanceID, - eventInstancesDao: eventInstancesDao, - } + eventServer := &exampleEventServer{eventsDao: eventsDao, resourcesDao: resourcesDao, subscrbers: []string{"cluster1"}} + mgr := NewKindControllerManager(NewPredicatedEventFilter(eventServer.PredicateEvent), events) + + ctrl := &exampleController{} config := newExampleControllerConfig(ctrl) mgr.Add(config) - _, _ = instancesDao.Create(ctx, &api.ServerInstance{ - Meta: api.Meta{ID: currentInstanceID}, - Ready: true, + resID := uuid.New().String() + _, err := resourcesDao.Create(ctx, &api.Resource{ + Meta: api.Meta{ID: resID}, + ConsumerName: "cluster1", + Source: config.Source, }) + Expect(err).To(BeNil()) - _, _ = instancesDao.Create(ctx, &api.ServerInstance{ - Meta: api.Meta{ID: anotherInstanceID}, - Ready: false, - }) - - _, _ = eventsDao.Create(ctx, &api.Event{ + _, err = eventsDao.Create(ctx, &api.Event{ Meta: api.Meta{ID: "1"}, Source: config.Source, - SourceID: "any id", + SourceID: resID, EventType: api.CreateEventType, }) + Expect(err).To(BeNil()) - _, _ = eventsDao.Create(ctx, &api.Event{ + _, err = eventsDao.Create(ctx, &api.Event{ Meta: api.Meta{ID: "2"}, Source: config.Source, SourceID: "any id", EventType: api.UpdateEventType, }) + Expect(err).To(BeNil()) - _, _ = eventsDao.Create(ctx, &api.Event{ + newResID := uuid.New().String() + _, err = resourcesDao.Create(ctx, &api.Resource{ + Meta: api.Meta{ID: newResID}, + ConsumerName: "cluster2", + Source: config.Source, + }) + Expect(err).To(BeNil()) + + _, err = eventsDao.Create(ctx, &api.Event{ Meta: api.Meta{ID: "3"}, Source: config.Source, - SourceID: "any id", + SourceID: newResID, EventType: api.DeleteEventType, }) + Expect(err).To(BeNil()) mgr.handleEvent("1") mgr.handleEvent("2") mgr.handleEvent("3") Expect(ctrl.addCounter).To(Equal(1)) - Expect(ctrl.updateCounter).To(Equal(1)) - Expect(ctrl.deleteCounter).To(Equal(1)) + Expect(ctrl.updateCounter).To(Equal(0)) + Expect(ctrl.deleteCounter).To(Equal(0)) - eve, _ := eventsDao.Get(ctx, "1") + eve, err := eventsDao.Get(ctx, "1") + Expect(err).To(BeNil()) Expect(eve.ReconciledDate).ToNot(BeNil(), "event reconcile date should be set") + + eve, err = eventsDao.Get(ctx, "2") + Expect(err).To(BeNil()) + Expect(eve.ReconciledDate).ToNot(BeNil(), "event reconcile date should be set") + + eve, err = eventsDao.Get(ctx, "3") + Expect(err).To(BeNil()) + Expect(eve.ReconciledDate).To(BeNil(), "event reconcile date should not be set") } diff --git a/pkg/dao/event_instance.go b/pkg/dao/event_instance.go index 687a48ea..7ec1eef5 100644 --- a/pkg/dao/event_instance.go +++ b/pkg/dao/event_instance.go @@ -12,8 +12,8 @@ import ( type EventInstanceDao interface { Get(ctx context.Context, eventID, instanceID string) (*api.EventInstance, error) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) - GetInstancesBySpecEventID(ctx context.Context, eventID string) ([]string, error) - FindEventInstancesByEventIDs(ctx context.Context, ids []string) (api.EventInstanceList, error) + + FindStatusEvents(ctx context.Context, ids []string) (api.EventInstanceList, error) GetEventsAssociatedWithInstances(ctx context.Context, instanceIDs []string) ([]string, error) } @@ -47,20 +47,7 @@ func (d *sqlEventInstanceDao) Create(ctx context.Context, eventInstance *api.Eve return eventInstance, nil } -func (d *sqlEventInstanceDao) GetInstancesBySpecEventID(ctx context.Context, specEventID string) ([]string, error) { - g2 := (*d.sessionFactory).New(ctx) - var eventInstances []api.EventInstance - if err := g2.Model(&api.EventInstance{}).Where("spec_event_id = ?", specEventID).Find(&eventInstances).Error; err != nil { - return nil, err - } - instanceIDs := make([]string, len(eventInstances)) - for i, eventInstance := range eventInstances { - instanceIDs[i] = eventInstance.InstanceID - } - return instanceIDs, nil -} - -func (d *sqlEventInstanceDao) FindEventInstancesByEventIDs(ctx context.Context, ids []string) (api.EventInstanceList, error) { +func (d *sqlEventInstanceDao) FindStatusEvents(ctx context.Context, ids []string) (api.EventInstanceList, error) { g2 := (*d.sessionFactory).New(ctx) eventInstances := api.EventInstanceList{} if err := g2.Where("event_id in (?)", ids).Find(&eventInstances).Error; err != nil { @@ -83,7 +70,7 @@ func (d *sqlEventInstanceDao) GetEventsAssociatedWithInstances(ctx context.Conte // consider using join to optimize if err := g2.Table("event_instances"). Select("event_id"). - Where("instance_id IN (?) AND event_id IS NOT NULL", instanceIDs). + Where("instance_id IN ?", instanceIDs). Group("event_id"). Having("COUNT(DISTINCT instance_id) = ?", instanceCount). Scan(&eventIDs).Error; err != nil { diff --git a/pkg/dao/mocks/event.go b/pkg/dao/mocks/event.go index 60833a0d..a7e9559b 100755 --- a/pkg/dao/mocks/event.go +++ b/pkg/dao/mocks/event.go @@ -7,7 +7,6 @@ import ( "github.com/openshift-online/maestro/pkg/api" "github.com/openshift-online/maestro/pkg/dao" - "github.com/openshift-online/maestro/pkg/errors" ) var _ dao.EventDao = &eventDaoMock{} @@ -59,7 +58,15 @@ func (d *eventDaoMock) Delete(ctx context.Context, id string) error { } func (d *eventDaoMock) FindByIDs(ctx context.Context, ids []string) (api.EventList, error) { - return nil, errors.NotImplemented("Event").AsError() + filteredEvents := api.EventList{} + for _, id := range ids { + for _, e := range d.events { + if e.ID == id { + filteredEvents = append(filteredEvents, e) + } + } + } + return filteredEvents, nil } func (d *eventDaoMock) All(ctx context.Context) (api.EventList, error) { diff --git a/pkg/dao/mocks/event_instance.go b/pkg/dao/mocks/event_instance.go index 13bf8ce0..e8ebef59 100644 --- a/pkg/dao/mocks/event_instance.go +++ b/pkg/dao/mocks/event_instance.go @@ -42,21 +42,7 @@ func (d *eventInstanceDaoMock) Create(ctx context.Context, eventInstance *api.Ev return eventInstance, nil } -func (d *eventInstanceDaoMock) GetInstancesBySpecEventID(ctx context.Context, specEventID string) ([]string, error) { - d.mux.RLock() - defer d.mux.RUnlock() - - var instanceIDs []string - for _, ei := range d.eventInstances { - if ei.SpecEventID == specEventID { - instanceIDs = append(instanceIDs, ei.InstanceID) - } - } - - return instanceIDs, nil -} - -func (d *eventInstanceDaoMock) FindEventInstancesByEventIDs(ctx context.Context, ids []string) (api.EventInstanceList, error) { +func (d *eventInstanceDaoMock) FindStatusEvents(ctx context.Context, ids []string) (api.EventInstanceList, error) { d.mux.RLock() defer d.mux.RUnlock() diff --git a/test/helper.go b/test/helper.go index 70eab990..331a79e4 100755 --- a/test/helper.go +++ b/test/helper.go @@ -82,7 +82,7 @@ type Helper struct { MetricsServer server.Server HealthCheckServer *server.HealthCheckServer EventServer server.EventServer - EventHandler controllers.EventHandler + EventFilter controllers.EventFilter ControllerManager *server.ControllersServer WorkAgentHolder *work.ClientHolder WorkAgentInformer workv1informers.ManifestWorkInformer @@ -150,10 +150,10 @@ func NewHelper(t *testing.T) *Helper { ) helper.HealthCheckServer.SetStatusDispatcher(statusDispatcher) helper.EventServer = server.NewMessageQueueEventServer(helper.EventBroadcaster, statusDispatcher) - helper.EventHandler = controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(helper.Env().Database.SessionFactory), helper.Env().Services.Events()) + helper.EventFilter = controllers.NewLockBasedEventFilter(db.NewAdvisoryLockFactory(helper.Env().Database.SessionFactory)) } else { helper.EventServer = server.NewGRPCBroker(helper.EventBroadcaster) - helper.EventHandler = controllers.NewPredicatedEventHandler(helper.EventServer.PredicateEvent, helper.Env().Services.Events(), dao.NewEventInstanceDao(&helper.Env().Database.SessionFactory), dao.NewInstanceDao(&helper.Env().Database.SessionFactory)) + helper.EventFilter = controllers.NewPredicatedEventFilter(helper.EventServer.PredicateEvent) } // TODO jwk mock server needs to be refactored out of the helper and into the testing environment @@ -259,7 +259,7 @@ func (helper *Helper) startEventBroadcaster() { func (helper *Helper) StartControllerManager(ctx context.Context) { helper.ControllerManager = &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - helper.EventHandler, + helper.EventFilter, helper.Env().Services.Events(), ), StatusController: controllers.NewStatusController( diff --git a/test/integration/controller_test.go b/test/integration/controller_test.go index f5459021..9f4cb965 100755 --- a/test/integration/controller_test.go +++ b/test/integration/controller_test.go @@ -20,9 +20,6 @@ import ( func TestControllerRacing(t *testing.T) { h, _ := test.RegisterIntegration(t) - // sleep for a while to wait for server instance is added - time.Sleep(1 * time.Second) - account := h.NewRandAccount() ctx, cancel := context.WithCancel(h.NewAuthenticatedContext(account)) defer func() { @@ -35,19 +32,18 @@ func TestControllerRacing(t *testing.T) { eventDao := dao.NewEventDao(&h.Env().Database.SessionFactory) statusEventDao := dao.NewStatusEventDao(&h.Env().Database.SessionFactory) - eventInstanceDao := dao.NewEventInstanceDao(&h.Env().Database.SessionFactory) // The handler filters the events by source id/type/reconciled, and only record // the event with create type. Due to the event lock, each create event // should be only processed once. var proccessedEvent, processedStatusEvent []string - onUpsert := func(ctx context.Context, eventID, resourceID string) error { + onUpsert := func(ctx context.Context, id string) error { events, err := eventDao.All(ctx) if err != nil { return err } for _, evt := range events { - if evt.SourceID != resourceID { + if evt.SourceID != id { continue } if evt.EventType != api.CreateEventType { @@ -57,15 +53,7 @@ func TestControllerRacing(t *testing.T) { if evt.ReconciledDate != nil { continue } - proccessedEvent = append(proccessedEvent, resourceID) - // add the event instance record - _, err := eventInstanceDao.Create(ctx, &api.EventInstance{ - SpecEventID: eventID, - InstanceID: h.Env().Config.MessageBroker.ClientID, - }) - if err != nil { - return err - } + proccessedEvent = append(proccessedEvent, id) } return nil @@ -91,21 +79,23 @@ func TestControllerRacing(t *testing.T) { return nil } - // Start 3 controllers concurrently + // Start 3 controllers concurrently for message queue event server threads := 3 - if h.Broker == "grpc" { - threads = 1 - } + randNum := rand.Intn(3) for i := 0; i < threads; i++ { - // each controller has its own event handler, otherwise, the event lock will block the event processing. - eventHandler := controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), h.Env().Services.Events()) + // each controller has its own event filter, otherwise, the event lock will block the event processing. + eventFilter := controllers.NewLockBasedEventFilter(db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory)) if h.Broker == "grpc" { - eventHandler = controllers.NewPredicatedEventHandler(h.EventServer.PredicateEvent, h.Env().Services.Events(), dao.NewEventInstanceDao(&h.Env().Database.SessionFactory), dao.NewInstanceDao(&h.Env().Database.SessionFactory)) + eventFilter = controllers.NewPredicatedEventFilter(func(ctx context.Context, eventID string) (bool, error) { + // simulate the event filter, where the agent randomly connects to a grpc broker instance. + // in theory, only one broker instance should process the event. + return i == randNum, nil + }) } go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - eventHandler, + eventFilter, h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -168,9 +158,6 @@ func TestControllerRacing(t *testing.T) { func TestControllerReconcile(t *testing.T) { h, _ := test.RegisterIntegration(t) - // sleep for a while to wait for server instance is added - time.Sleep(1 * time.Second) - account := h.NewRandAccount() ctx, cancel := context.WithCancel(h.NewAuthenticatedContext(account)) @@ -180,26 +167,16 @@ func TestControllerReconcile(t *testing.T) { eventDao := dao.NewEventDao(&h.Env().Database.SessionFactory) statusEventDao := dao.NewStatusEventDao(&h.Env().Database.SessionFactory) - eventInstanceDao := dao.NewEventInstanceDao(&h.Env().Database.SessionFactory) processedEventTimes := 0 // this handler will return an error at the first time to simulate an error happened when handing an event, // and then, the controller will requeue this event, at that time, we handle this event successfully. - onUpsert := func(ctx context.Context, eventID, resourceID string) error { + onUpsert := func(ctx context.Context, id string) error { processedEventTimes = processedEventTimes + 1 if processedEventTimes == 1 { return fmt.Errorf("failed to process the event") } - // add the event instance record - _, err := eventInstanceDao.Create(ctx, &api.EventInstance{ - SpecEventID: eventID, - InstanceID: h.Env().Config.MessageBroker.ClientID, - }) - if err != nil { - return err - } - return nil } @@ -219,7 +196,7 @@ func TestControllerReconcile(t *testing.T) { go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - h.EventHandler, + h.EventFilter, h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -244,7 +221,7 @@ func TestControllerReconcile(t *testing.T) { s.Start(ctx) }() // wait for the listener to start - time.Sleep(100 * time.Millisecond) + time.Sleep(time.Second) deployName := fmt.Sprintf("nginx-%s", rand.String(5)) resource := h.CreateResource(consumer.Name, deployName, 1) @@ -303,9 +280,6 @@ func TestControllerReconcile(t *testing.T) { func TestControllerSync(t *testing.T) { h, _ := test.RegisterIntegration(t) - // sleep for a while to wait for server instance is added - time.Sleep(1 * time.Second) - account := h.NewRandAccount() ctx, cancel := context.WithCancel(h.NewAuthenticatedContext(account)) @@ -338,7 +312,6 @@ func TestControllerSync(t *testing.T) { } eventDao := dao.NewEventDao(&h.Env().Database.SessionFactory) - eventInstanceDao := dao.NewEventInstanceDao(&h.Env().Database.SessionFactory) now := time.Now() if _, err := eventDao.Create(ctx, &api.Event{Source: "Resources", SourceID: "resource1", @@ -370,15 +343,10 @@ func TestControllerSync(t *testing.T) { } var proccessedEvents []string - onUpsert := func(ctx context.Context, eventID, resourceID string) error { + onUpsert := func(ctx context.Context, id string) error { // we just record the processed event - proccessedEvents = append(proccessedEvents, resourceID) - // add the event instance record - _, err := eventInstanceDao.Create(ctx, &api.EventInstance{ - SpecEventID: eventID, - InstanceID: h.Env().Config.MessageBroker.ClientID, - }) - return err + proccessedEvents = append(proccessedEvents, id) + return nil } // start the controller, once the controller started, it will sync the events: @@ -387,7 +355,7 @@ func TestControllerSync(t *testing.T) { go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - h.EventHandler, + h.EventFilter, h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -509,7 +477,7 @@ func TestStatusControllerSync(t *testing.T) { go func() { s := &server.ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( - h.EventHandler, + h.EventFilter, h.Env().Services.Events(), ), StatusController: controllers.NewStatusController( @@ -543,7 +511,7 @@ func TestStatusControllerSync(t *testing.T) { return fmt.Errorf("should purge the events %s, but got %+v", purged, events) } - eventInstances, err := eventInstanceDao.FindEventInstancesByEventIDs(ctx, purged) + eventInstances, err := eventInstanceDao.FindStatusEvents(ctx, purged) if err != nil { return err }