From 59981a32447be51006db3f1bb233217d0f2b3490 Mon Sep 17 00:00:00 2001 From: morvencao Date: Mon, 16 Dec 2024 02:52:19 +0000 Subject: [PATCH] rename mqtt event server to also cover kafka. Signed-off-by: morvencao --- cmd/maestro/servecmd/cmd.go | 17 ++++++------- cmd/maestro/server/event_server.go | 37 ++++++++++++++-------------- cmd/maestro/server/grpc_broker.go | 2 +- pkg/config/event_server.go | 5 ++-- test/helper.go | 2 +- test/integration/status_hash_test.go | 4 +-- 6 files changed, 31 insertions(+), 36 deletions(-) diff --git a/cmd/maestro/servecmd/cmd.go b/cmd/maestro/servecmd/cmd.go index 876da6e6..1b3611df 100755 --- a/cmd/maestro/servecmd/cmd.go +++ b/cmd/maestro/servecmd/cmd.go @@ -45,11 +45,13 @@ func runServer(cmd *cobra.Command, args []string) { // Create the event server based on the message broker type: // For gRPC, create a gRPC broker to handle resource spec and status events. - // For MQTT, create a Pulse server to handle resource spec and status events. + // For MQTT/Kafka, create a message queue based event server to handle resource spec and status events. var eventServer server.EventServer - switch environments.Environment().Config.MessageBroker.MessageBrokerType { - case "mqtt": - klog.Info("Setting up pulse server") + if environments.Environment().Config.MessageBroker.MessageBrokerType == "grpc" { + klog.Info("Setting up grpc broker") + eventServer = server.NewGRPCBroker(eventBroadcaster) + } else { + klog.Info("Setting up message queue event server") var statusDispatcher dispatcher.Dispatcher subscriptionType := environments.Environment().Config.EventServer.SubscriptionType switch config.SubscriptionType(subscriptionType) { @@ -64,12 +66,7 @@ func runServer(cmd *cobra.Command, args []string) { // Set the status dispatcher for the healthcheck server healthcheckServer.SetStatusDispatcher(statusDispatcher) - eventServer = server.NewMQTTEventServer(eventBroadcaster, statusDispatcher) - case "grpc": - klog.Info("Setting up grpc broker") - eventServer = server.NewGRPCBroker(eventBroadcaster) - default: - klog.Errorf("Unsupported message broker type: %s", environments.Environment().Config.MessageBroker.MessageBrokerType) + eventServer = server.NewMessageQueueEventServer(eventBroadcaster, statusDispatcher) } // Create the servers diff --git a/cmd/maestro/server/event_server.go b/cmd/maestro/server/event_server.go index 77ba990b..cae4207c 100644 --- a/cmd/maestro/server/event_server.go +++ b/cmd/maestro/server/event_server.go @@ -41,13 +41,13 @@ type EventServer interface { OnStatusUpdate(ctx context.Context, eventID, resourceID string) error } -var _ EventServer = &MQTTEventServer{} +var _ EventServer = &MessageQueueEventServer{} -// MQTTEventServer represents a server responsible for publish resource spec events from -// resource controller and handle resource status update events from the maestro agent. -// It also periodic heartbeat updates and checking the liveness of Maestro instances, -// triggering status resync based on instances' status and other conditions. -type MQTTEventServer struct { +// MessageQueueEventServer represents a event server responsible for publish resource spec events +// from resource controller and handle resource status update events from the message queue. +// It also maintains a status dispatcher to dispatch status update events to the corresponding +// maestro instances. +type MessageQueueEventServer struct { instanceID string eventInstanceDao dao.EventInstanceDao lockFactory db.LockFactory @@ -58,9 +58,9 @@ type MQTTEventServer struct { statusDispatcher dispatcher.Dispatcher } -func NewMQTTEventServer(eventBroadcaster *event.EventBroadcaster, statusDispatcher dispatcher.Dispatcher) EventServer { +func NewMessageQueueEventServer(eventBroadcaster *event.EventBroadcaster, statusDispatcher dispatcher.Dispatcher) EventServer { sessionFactory := env().Database.SessionFactory - return &MQTTEventServer{ + return &MessageQueueEventServer{ instanceID: env().Config.MessageBroker.ClientID, eventInstanceDao: dao.NewEventInstanceDao(&sessionFactory), lockFactory: db.NewAdvisoryLockFactory(sessionFactory), @@ -72,11 +72,10 @@ func NewMQTTEventServer(eventBroadcaster *event.EventBroadcaster, statusDispatch } } -// Start initializes and runs the pulse server, updating and checking Maestro instances' liveness, -// initializes subscription to status update messages and triggers status resync based on -// instances' status and other conditions. -func (s *MQTTEventServer) Start(ctx context.Context) { - log.Infof("Starting pulse server") +// Start initializes and runs the event server. It starts the subscription +// to resource status update messages and the status dispatcher. +func (s *MessageQueueEventServer) Start(ctx context.Context) { + log.Infof("Starting message queue event server") // start subscribing to resource status update messages. s.startSubscription(ctx) @@ -85,12 +84,12 @@ func (s *MQTTEventServer) Start(ctx context.Context) { // wait until context is canceled <-ctx.Done() - log.Infof("Shutting down pulse server") + log.Infof("Shutting down message queue event server") } // startSubscription initiates the subscription to resource status update messages. // It runs asynchronously in the background until the provided context is canceled. -func (s *MQTTEventServer) startSubscription(ctx context.Context) { +func (s *MessageQueueEventServer) startSubscription(ctx context.Context) { s.sourceClient.Subscribe(ctx, func(action types.ResourceAction, resource *api.Resource) error { log.V(4).Infof("received action %s for resource %s", action, resource.ID) @@ -115,17 +114,17 @@ func (s *MQTTEventServer) startSubscription(ctx context.Context) { } // OnCreate will be called on each new resource creation event inserted into db. -func (s *MQTTEventServer) OnCreate(ctx context.Context, resourceID string) error { +func (s *MessageQueueEventServer) OnCreate(ctx context.Context, resourceID string) error { return s.sourceClient.OnCreate(ctx, resourceID) } // OnUpdate will be called on each new resource update event inserted into db. -func (s *MQTTEventServer) OnUpdate(ctx context.Context, resourceID string) error { +func (s *MessageQueueEventServer) OnUpdate(ctx context.Context, resourceID string) error { return s.sourceClient.OnUpdate(ctx, resourceID) } // OnDelete will be called on each new resource deletion event inserted into db. -func (s *MQTTEventServer) OnDelete(ctx context.Context, resourceID string) error { +func (s *MessageQueueEventServer) OnDelete(ctx context.Context, resourceID string) error { return s.sourceClient.OnDelete(ctx, resourceID) } @@ -133,7 +132,7 @@ func (s *MQTTEventServer) OnDelete(ctx context.Context, resourceID string) error // It does two things: // 1. build the resource status and broadcast it to subscribers // 2. add the event instance record to mark the event has been processed by the current instance -func (s *MQTTEventServer) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error { +func (s *MessageQueueEventServer) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error { statusEvent, sErr := s.statusEventService.Get(ctx, eventID) if sErr != nil { return fmt.Errorf("failed to get status event %s: %s", eventID, sErr.Error()) diff --git a/cmd/maestro/server/grpc_broker.go b/cmd/maestro/server/grpc_broker.go index 7385025e..2dee39e4 100644 --- a/cmd/maestro/server/grpc_broker.go +++ b/cmd/maestro/server/grpc_broker.go @@ -429,7 +429,7 @@ func (bkr *GRPCBroker) OnDelete(ctx context.Context, id string) error { // It does two things: // 1. build the resource status and broadcast it to subscribers // 2. add the event instance record to mark the event has been processed by the current instance -// TODO consider using a same way (pulse_server.OnStatusUpdate) to handle this +// TODO consider using a same way (MessageQueueEventServer.OnStatusUpdate) to handle this func (bkr *GRPCBroker) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error { statusEvent, sErr := bkr.statusEventService.Get(ctx, eventID) if sErr != nil { diff --git a/pkg/config/event_server.go b/pkg/config/event_server.go index 03abb2ff..87bc5f67 100644 --- a/pkg/config/event_server.go +++ b/pkg/config/event_server.go @@ -11,7 +11,7 @@ const ( BroadcastSubscriptionType SubscriptionType = "broadcast" ) -// EventServerConfig contains the configuration for the maestro pulse server. +// EventServerConfig contains the configuration for the message queue event server. type EventServerConfig struct { SubscriptionType string `json:"subscription_type"` ConsistentHashConfig *ConsistentHashConfig `json:"consistent_hash_config"` @@ -45,8 +45,7 @@ func NewConsistentHashConfig() *ConsistentHashConfig { } // AddFlags configures the EventServerConfig with command line flags. -// It allows users to customize the interval for maestro instance pulses and subscription type. -// - "pulse-interval" sets the time between maestro instance pulses (in seconds) to indicate its liveness (default: 15 seconds). +// It allows users to customize the subscription type and ConsistentHashConfig settings. // - "subscription-type" specifies the subscription type for resource status updates from message broker, either "shared" or "broadcast". // "shared" subscription type uses MQTT feature to ensure only one Maestro instance receives resource status messages. // "broadcast" subscription type will make all Maestro instances to receive resource status messages and hash the message to determine which instance should process it. diff --git a/test/helper.go b/test/helper.go index 9d4c353b..ef374d39 100755 --- a/test/helper.go +++ b/test/helper.go @@ -214,7 +214,7 @@ func (helper *Helper) sendShutdownSignal() error { func (helper *Helper) startEventServer(ctx context.Context) { // helper.Env().Config.EventServer.SubscriptionType = "broadcast" - helper.EventServer = server.NewMQTTEventServer(helper.EventBroadcaster, helper.StatusDispatcher) + helper.EventServer = server.NewMessageQueueEventServer(helper.EventBroadcaster, helper.StatusDispatcher) go func() { klog.V(10).Info("Test event server started") helper.EventServer.Start(ctx) diff --git a/test/integration/status_hash_test.go b/test/integration/status_hash_test.go index 6873b5df..000f33aa 100644 --- a/test/integration/status_hash_test.go +++ b/test/integration/status_hash_test.go @@ -68,13 +68,13 @@ func TestEventServer(t *testing.T) { // the cluster1 name cannot be changed, because consistent hash makes it allocate to different instance. // the case here we want to the new consumer allocate to new instance(cluster1) which is a fake instance. - // after 3*pulseInterval (3s), it will relocate to maestro instance. + // after 3*heartbeatInterval (3s), it will relocate to maestro instance. clusterName := "cluster1" consumer := h.CreateConsumer(clusterName) // insert a new instance with the same name to consumer name // to make sure the consumer is hashed to the new instance firstly. - // after the new instance is stale after 3*pulseInterval (3s), the current + // after the new instance is stale after 3*heartbeatInterval (3s), the current // instance will take over the consumer and resync the resource status. _, err = instanceDao.Create(ctx, &api.ServerInstance{ Meta: api.Meta{