Skip to content

Commit

Permalink
event handler to event filter.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao committed Dec 24, 2024
1 parent 1cf6559 commit 9172a13
Show file tree
Hide file tree
Showing 16 changed files with 470 additions and 612 deletions.
12 changes: 4 additions & 8 deletions cmd/maestro/servecmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,11 @@ func runServer(cmd *cobra.Command, args []string) {
// For gRPC, create a gRPC broker to handle resource spec and status events.
// For MQTT/Kafka, create a message queue based event server to handle resource spec and status events.
var eventServer server.EventServer
var eventHandler controllers.EventHandler
var eventFilter controllers.EventFilter
if environments.Environment().Config.MessageBroker.MessageBrokerType == "grpc" {
klog.Info("Setting up grpc broker")
eventServer = server.NewGRPCBroker(eventBroadcaster)
eventHandler = controllers.NewPredicatedEventHandler(eventServer.PredicateEvent,
environments.Environment().Services.Events(),
dao.NewEventInstanceDao(&environments.Environment().Database.SessionFactory),
dao.NewInstanceDao(&environments.Environment().Database.SessionFactory))
eventFilter = controllers.NewPredicatedEventFilter(eventServer.PredicateEvent)
} else {
klog.Info("Setting up message queue event server")
var statusDispatcher dispatcher.Dispatcher
Expand All @@ -74,14 +71,13 @@ func runServer(cmd *cobra.Command, args []string) {
// Set the status dispatcher for the healthcheck server
healthcheckServer.SetStatusDispatcher(statusDispatcher)
eventServer = server.NewMessageQueueEventServer(eventBroadcaster, statusDispatcher)
eventHandler = controllers.NewLockBasedEventHandler(db.NewAdvisoryLockFactory(environments.Environment().Database.SessionFactory),
environments.Environment().Services.Events())
eventFilter = controllers.NewLockBasedEventFilter(db.NewAdvisoryLockFactory(environments.Environment().Database.SessionFactory))
}

// Create the servers
apiserver := server.NewAPIServer(eventBroadcaster)
metricsServer := server.NewMetricsServer()
controllersServer := server.NewControllersServer(eventServer, eventHandler)
controllersServer := server.NewControllersServer(eventServer, eventFilter)

ctx, cancel := context.WithCancel(context.Background())

Expand Down
4 changes: 2 additions & 2 deletions cmd/maestro/server/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (
"github.com/openshift-online/maestro/pkg/logger"
)

func NewControllersServer(eventServer EventServer, eventHandler controllers.EventHandler) *ControllersServer {
func NewControllersServer(eventServer EventServer, eventFilter controllers.EventFilter) *ControllersServer {
s := &ControllersServer{
KindControllerManager: controllers.NewKindControllerManager(
eventHandler,
eventFilter,
env().Services.Events(),
),
StatusController: controllers.NewStatusController(
Expand Down
12 changes: 6 additions & 6 deletions cmd/maestro/server/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ type EventServer interface {
Start(ctx context.Context)

// OnCreate handles the creation of a resource.
OnCreate(ctx context.Context, eventID, resourceID string) error
OnCreate(ctx context.Context, resourceID string) error

// OnUpdate handles updates to a resource.
OnUpdate(ctx context.Context, eventID, resourceID string) error
OnUpdate(ctx context.Context, resourceID string) error

// OnDelete handles the deletion of a resource.
OnDelete(ctx context.Context, eventID, resourceID string) error
OnDelete(ctx context.Context, resourceID string) error

// OnStatusUpdate handles status update events for a resource.
OnStatusUpdate(ctx context.Context, eventID, resourceID string) error
Expand Down Expand Up @@ -117,17 +117,17 @@ func (s *MessageQueueEventServer) startSubscription(ctx context.Context) {
}

// OnCreate will be called on each new resource creation event inserted into db.
func (s *MessageQueueEventServer) OnCreate(ctx context.Context, eventID, resourceID string) error {
func (s *MessageQueueEventServer) OnCreate(ctx context.Context, resourceID string) error {
return s.sourceClient.OnCreate(ctx, resourceID)
}

// OnUpdate will be called on each new resource update event inserted into db.
func (s *MessageQueueEventServer) OnUpdate(ctx context.Context, eventID, resourceID string) error {
func (s *MessageQueueEventServer) OnUpdate(ctx context.Context, resourceID string) error {
return s.sourceClient.OnUpdate(ctx, resourceID)
}

// OnDelete will be called on each new resource deletion event inserted into db.
func (s *MessageQueueEventServer) OnDelete(ctx context.Context, eventID, resourceID string) error {
func (s *MessageQueueEventServer) OnDelete(ctx context.Context, resourceID string) error {
return s.sourceClient.OnDelete(ctx, resourceID)
}

Expand Down
111 changes: 54 additions & 57 deletions cmd/maestro/server/grpc_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ import (
"github.com/openshift-online/maestro/pkg/services"
)

type resourceHandler func(res *api.Resource) error
// resourceHandler processes a resource spec by encoding it to a CloudEvent and sending it to the subscriber.
// It returns a bool indicating if the connection is closed and an error if one occurs.
// - Returns true and an error if the connection is closed.
// - Returns false and an error if encoding fails.
// - Returns false and nil if successful.
type resourceHandler func(res *api.Resource) (bool, error)

// subscriber defines a subscriber that can receive and handle resource spec.
type subscriber struct {
Expand Down Expand Up @@ -182,36 +187,55 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
return fmt.Errorf("invalid subscription request: missing cluster name")
}
// register the cluster for subscription to the resource spec
subscriberID, errChan := bkr.register(subReq.ClusterName, func(res *api.Resource) error {
subscriberID, errChan := bkr.register(subReq.ClusterName, func(res *api.Resource) (bool, error) {
evt, err := encodeResourceSpec(res)
if err != nil {
return fmt.Errorf("failed to encode resource %s to cloudevent: %v", res.ID, err)
// return the error to requeue the event if encoding fails (e.g., due to invalid resource spec).
return false, fmt.Errorf("failed to encode resource %s to cloudevent: %v", res.ID, err)
}

klog.V(4).Infof("send the event to spec subscribers, %s", evt)

// WARNING: don't use "pbEvt, err := pb.ToProto(evt)" to convert cloudevent to protobuf
pbEvt := &pbv1.CloudEvent{}
if err = grpcprotocol.WritePBMessage(context.TODO(), binding.ToMessage(evt), pbEvt); err != nil {
return fmt.Errorf("failed to convert cloudevent to protobuf: %v", err)
// return the error to requeue the event if converting to protobuf fails (e.g., due to invalid cloudevent).
return false, fmt.Errorf("failed to convert cloudevent to protobuf for resource(%s): %v", res.ID, err)
}

// send the cloudevent to the subscriber
// TODO: error handling to address errors beyond network issues.
klog.V(4).Infof("sending the event to spec subscribers, %s", evt)
if err := subServer.Send(pbEvt); err != nil {
klog.Errorf("failed to send grpc event, %v", err)
return err // return error to requeue the spec event
// Return true to ensure the subscriber will be unregistered when sending fails, which will close the subserver stream.
// See: https://github.com/grpc/grpc-go/blob/b615b35c4feb932a0ac658fb86b7127f10ef664e/stream.go#L1537 for more details.
// Return the error without wrapping, as it contains the gRPC error code and message for future (TODO) handling beyond network issues.
// This will also not requeue the event, as the error will cause the connection to the subscriber to be closed.
// If the subscriber (agent) reconnects, rely on the agent's resync to retrieve the missing resource spec.
return true, err
}

return nil
return false, nil
})

select {
case err := <-errChan:
// An error occurred while sending the event to the subscriber.
// This could be due to multiple reasons:
// see: https://grpc.io/docs/guides/error/
// 1. general errors such as: deadline exceeded before return the response.
// 2. network errors such as: connection closed by intermidiate proxy.
// 3. protocol errors such as: compression error or flow control error.
// In all above cases, unregister the subscriber.
// TODO: unregister the subscriber if the error is a network error and the connection could be re-established.
klog.Errorf("unregister subscriber %s, error= %v", subscriberID, err)
bkr.unregister(subscriberID)
return err
case <-subServer.Context().Done():
// The context of the stream has been canceled or completed.
// This could happen if:
// - The client closed the connection or canceled the stream.
// - The server closed the stream, potentially due to a shutdown.
// Regardless of the reason, unregister the subscriber and stop processing.
// No error is returned here because the stream closure is expected.
bkr.unregister(subscriberID)
return nil
}
Expand Down Expand Up @@ -380,61 +404,53 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy
}

// handleRes publish the resource to the correct subscriber.
func (bkr *GRPCBroker) handleRes(resource *api.Resource) {
func (bkr *GRPCBroker) handleRes(resource *api.Resource) error {
bkr.mu.RLock()
defer bkr.mu.RUnlock()
for _, subscriber := range bkr.subscribers {
if subscriber.clusterName == resource.ConsumerName {
if err := subscriber.handler(resource); err != nil {
subscriber.errChan <- err
if isConnClosed, err := subscriber.handler(resource); err != nil {
if isConnClosed {
// if the connection is closed, write the error to the subscriber's error channel
// to ensure the subscriber is unregistered
subscriber.errChan <- err
}
// return the error to requeue the event if handling fails.
return err
}
}
}
}

// handleResEvent publish the resource to the correct subscriber and add the event instance record.
func (bkr *GRPCBroker) handleResEvent(ctx context.Context, eventID string, resource *api.Resource) error {
bkr.handleRes(resource)

// add the event instance record to mark the event has been processed by the current instance
if _, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{
SpecEventID: eventID,
InstanceID: bkr.instanceID,
}); err != nil {
return fmt.Errorf("failed to create event instance record %s: %s", eventID, err.Error())
}

return nil
}

// OnCreate is called by the controller when a resource is created on the maestro server.
func (bkr *GRPCBroker) OnCreate(ctx context.Context, eventID, resourceID string) error {
resource, err := bkr.resourceService.Get(ctx, resourceID)
func (bkr *GRPCBroker) OnCreate(ctx context.Context, id string) error {
resource, err := bkr.resourceService.Get(ctx, id)
if err != nil {
return err
}

return bkr.handleResEvent(ctx, eventID, resource)
return bkr.handleRes(resource)
}

// OnUpdate is called by the controller when a resource is updated on the maestro server.
func (bkr *GRPCBroker) OnUpdate(ctx context.Context, eventID, resourceID string) error {
resource, err := bkr.resourceService.Get(ctx, resourceID)
func (bkr *GRPCBroker) OnUpdate(ctx context.Context, id string) error {
resource, err := bkr.resourceService.Get(ctx, id)
if err != nil {
return err
}

return bkr.handleResEvent(ctx, eventID, resource)
return bkr.handleRes(resource)
}

// OnDelete is called by the controller when a resource is deleted from the maestro server.
func (bkr *GRPCBroker) OnDelete(ctx context.Context, eventID, resourceID string) error {
resource, err := bkr.resourceService.Get(ctx, resourceID)
func (bkr *GRPCBroker) OnDelete(ctx context.Context, id string) error {
resource, err := bkr.resourceService.Get(ctx, id)
if err != nil {
return err
}

return bkr.handleResEvent(ctx, eventID, resource)
return bkr.handleRes(resource)
}

// On StatusUpdate will be called on each new status event inserted into db.
Expand Down Expand Up @@ -470,35 +486,16 @@ func (bkr *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool

resource, svcErr := bkr.resourceService.Get(ctx, evt.SourceID)
if svcErr != nil {
// if the resource is not found, it indicates the resource has been handled by
// other instances, so we can mark the event as reconciled and ignore it.
// if the resource is not found, it indicates the resource has been handled by other instances.
if svcErr.Is404() {
klog.V(10).Infof("The resource %s has been deleted, mark the event as reconciled", evt.SourceID)
now := time.Now()
evt.ReconciledDate = &now
if _, svcErr := bkr.eventService.Replace(ctx, evt); svcErr != nil {
return false, fmt.Errorf("failed to update event %s: %s", evt.ID, svcErr.Error())
}
return false, nil
}
return false, fmt.Errorf("failed to get resource %s: %s", evt.SourceID, svcErr.Error())
}

if bkr.IsConsumerSubscribed(resource.ConsumerName) {
return true, nil
}

// if the consumer is not subscribed to the broker, then add the event instance record
// to indicate the event has been processed by the instance
if _, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{
SpecEventID: eventID,
InstanceID: bkr.instanceID,
}); err != nil {
return false, fmt.Errorf("failed to create event instance record %s: %s", eventID, err.Error())
}
klog.V(10).Infof("The consumer %s is not subscribed to the broker, added the event instance record", resource.ConsumerName)

return false, nil
// check if the consumer is subscribed to the broker
return bkr.IsConsumerSubscribed(resource.ConsumerName), nil
}

// IsConsumerSubscribed returns true if the consumer is subscribed to the broker for resource spec.
Expand Down
5 changes: 2 additions & 3 deletions pkg/api/event_instances.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package api

type EventInstance struct {
EventID string `gorm:"default:null"`
SpecEventID string `gorm:"default:null"`
InstanceID string
EventID string
InstanceID string
}

type EventInstanceList []*EventInstance
96 changes: 96 additions & 0 deletions pkg/controllers/event_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package controllers

import (
"context"
"fmt"

"github.com/openshift-online/maestro/pkg/db"
)

// EventFilter defines an interface for filtering and deferring actions on events.
// Implementations of EventFilter should provide logic for determining whether an event
// should be processed and for handling any actions that need to be deferred.
//
// - Filter: Decides whether the event should be processed based on its ID.
// - DeferredAction: Allows for scheduling actions that should occur regardless of whether the event
// was processed successfully or not, such as cleanup tasks or releasing resources.
type EventFilter interface {
// Filter determines whether the event should be processed.
// Returns true if the event should be handled, false and an error otherwise.
Filter(ctx context.Context, id string) (bool, error)

// DeferredAction schedules actions to be executed regardless of event processing success.
DeferredAction(ctx context.Context, id string)
}

// LockBasedEventFilter implements EventFilter using a locking mechanism for event processing.
// It creates advisory locks on event IDs to ensure thread-safe access.
// - Filter acquires a lock on the event ID and returns true if the lock is successful.
// - DeferredAction releases the lock for the event ID.
type LockBasedEventFilter struct {
lockFactory db.LockFactory
// locks map is accessed by a single-threaded handler goroutine, no need for lock on it.
locks map[string]string
}

func NewLockBasedEventFilter(lockFactory db.LockFactory) EventFilter {
return &LockBasedEventFilter{
lockFactory: lockFactory,
locks: make(map[string]string),
}
}

// Filter attempts to acquire a lock on the event ID. Returns true if successful, false and error otherwise.
func (h *LockBasedEventFilter) Filter(ctx context.Context, id string) (bool, error) {
// lock the Event with a fail-fast advisory lock context.
// this allows concurrent processing of many events by one or many controller managers.
// allow the lock to be released by the handler goroutine and allow this function to continue.
// subsequent events will be locked by their own distinct IDs.
lockOwnerID, acquired, err := h.lockFactory.NewNonBlockingLock(ctx, id, db.Events)
// store the lock owner ID for deferred action
h.locks[id] = lockOwnerID
if err != nil {
return false, fmt.Errorf("error obtaining the event lock: %v", err)
}

if !acquired {
logger.V(4).Infof("Event %s is processed by another worker", id)
return false, nil
}

return true, nil
}

// DeferredAction releases the lock for the given event ID if it was acquired.
func (h *LockBasedEventFilter) DeferredAction(ctx context.Context, id string) {
if ownerID, exists := h.locks[id]; exists {
h.lockFactory.Unlock(ctx, ownerID)
delete(h.locks, id)
}
}

// eventFilterPredicate is a function type for filtering events based on their ID.
type eventFilterPredicate func(ctx context.Context, eventID string) (bool, error)

// PredicatedEventFilter implements EventFilter using a predicate function for event filtering.
// - Filter uses the predicate to decide if the event should be processed.
// - DeferredAction is a no-op as no locking is performed.
type PredicatedEventFilter struct {
predicate eventFilterPredicate
}

func NewPredicatedEventFilter(predicate eventFilterPredicate) EventFilter {
return &PredicatedEventFilter{
predicate: predicate,
}
}

// Filter calls the predicate function to determine if the event should be processed.
func (h *PredicatedEventFilter) Filter(ctx context.Context, id string) (bool, error) {
return h.predicate(ctx, id)
}

// DeferredAction is a no-op since no locks are involved.
func (h *PredicatedEventFilter) DeferredAction(ctx context.Context, id string) {
// no-op
}
Loading

0 comments on commit 9172a13

Please sign in to comment.