Skip to content

Commit

Permalink
rename mqtt event server to also cover kafka.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao committed Dec 16, 2024
1 parent 0a5db1c commit d942ae2
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 26 deletions.
13 changes: 5 additions & 8 deletions cmd/maestro/servecmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ func runServer(cmd *cobra.Command, args []string) {
// For gRPC, create a gRPC broker to handle resource spec and status events.
// For MQTT, create a Pulse server to handle resource spec and status events.
var eventServer server.EventServer
switch environments.Environment().Config.MessageBroker.MessageBrokerType {
case "mqtt":
if environments.Environment().Config.MessageBroker.MessageBrokerType == "grpc" {
klog.Info("Setting up grpc broker")
eventServer = server.NewGRPCBroker(eventBroadcaster)
} else {
klog.Info("Setting up pulse server")
var statusDispatcher dispatcher.Dispatcher
subscriptionType := environments.Environment().Config.EventServer.SubscriptionType
Expand All @@ -64,12 +66,7 @@ func runServer(cmd *cobra.Command, args []string) {

// Set the status dispatcher for the healthcheck server
healthcheckServer.SetStatusDispatcher(statusDispatcher)
eventServer = server.NewMQTTEventServer(eventBroadcaster, statusDispatcher)
case "grpc":
klog.Info("Setting up grpc broker")
eventServer = server.NewGRPCBroker(eventBroadcaster)
default:
klog.Errorf("Unsupported message broker type: %s", environments.Environment().Config.MessageBroker.MessageBrokerType)
eventServer = server.NewMessageQueueEventServer(eventBroadcaster, statusDispatcher)
}

// Create the servers
Expand Down
33 changes: 16 additions & 17 deletions cmd/maestro/server/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ type EventServer interface {
OnStatusUpdate(ctx context.Context, eventID, resourceID string) error
}

var _ EventServer = &MQTTEventServer{}
var _ EventServer = &MessageQueueEventServer{}

// MQTTEventServer represents a server responsible for publish resource spec events from
// resource controller and handle resource status update events from the maestro agent.
// It also periodic heartbeat updates and checking the liveness of Maestro instances,
// triggering status resync based on instances' status and other conditions.
type MQTTEventServer struct {
// MessageQueueEventServer represents a event server responsible for publish resource spec events
// from resource controller and handle resource status update events from the message queue.
// It also maintains a status dispatcher to dispatch status update events to the corresponding
// maestro instances.
type MessageQueueEventServer struct {
instanceID string
eventInstanceDao dao.EventInstanceDao
lockFactory db.LockFactory
Expand All @@ -58,9 +58,9 @@ type MQTTEventServer struct {
statusDispatcher dispatcher.Dispatcher
}

func NewMQTTEventServer(eventBroadcaster *event.EventBroadcaster, statusDispatcher dispatcher.Dispatcher) EventServer {
func NewMessageQueueEventServer(eventBroadcaster *event.EventBroadcaster, statusDispatcher dispatcher.Dispatcher) EventServer {
sessionFactory := env().Database.SessionFactory
return &MQTTEventServer{
return &MessageQueueEventServer{
instanceID: env().Config.MessageBroker.ClientID,
eventInstanceDao: dao.NewEventInstanceDao(&sessionFactory),
lockFactory: db.NewAdvisoryLockFactory(sessionFactory),
Expand All @@ -72,10 +72,9 @@ func NewMQTTEventServer(eventBroadcaster *event.EventBroadcaster, statusDispatch
}
}

// Start initializes and runs the pulse server, updating and checking Maestro instances' liveness,
// initializes subscription to status update messages and triggers status resync based on
// instances' status and other conditions.
func (s *MQTTEventServer) Start(ctx context.Context) {
// Start initializes and runs the event server. It starts the subscription
// to resource status update messages and the status dispatcher.
func (s *MessageQueueEventServer) Start(ctx context.Context) {
log.Infof("Starting pulse server")

// start subscribing to resource status update messages.
Expand All @@ -90,7 +89,7 @@ func (s *MQTTEventServer) Start(ctx context.Context) {

// startSubscription initiates the subscription to resource status update messages.
// It runs asynchronously in the background until the provided context is canceled.
func (s *MQTTEventServer) startSubscription(ctx context.Context) {
func (s *MessageQueueEventServer) startSubscription(ctx context.Context) {
s.sourceClient.Subscribe(ctx, func(action types.ResourceAction, resource *api.Resource) error {
log.V(4).Infof("received action %s for resource %s", action, resource.ID)

Expand All @@ -115,25 +114,25 @@ func (s *MQTTEventServer) startSubscription(ctx context.Context) {
}

// OnCreate will be called on each new resource creation event inserted into db.
func (s *MQTTEventServer) OnCreate(ctx context.Context, resourceID string) error {
func (s *MessageQueueEventServer) OnCreate(ctx context.Context, resourceID string) error {
return s.sourceClient.OnCreate(ctx, resourceID)
}

// OnUpdate will be called on each new resource update event inserted into db.
func (s *MQTTEventServer) OnUpdate(ctx context.Context, resourceID string) error {
func (s *MessageQueueEventServer) OnUpdate(ctx context.Context, resourceID string) error {
return s.sourceClient.OnUpdate(ctx, resourceID)
}

// OnDelete will be called on each new resource deletion event inserted into db.
func (s *MQTTEventServer) OnDelete(ctx context.Context, resourceID string) error {
func (s *MessageQueueEventServer) OnDelete(ctx context.Context, resourceID string) error {
return s.sourceClient.OnDelete(ctx, resourceID)
}

// On StatusUpdate will be called on each new status event inserted into db.
// It does two things:
// 1. build the resource status and broadcast it to subscribers
// 2. add the event instance record to mark the event has been processed by the current instance
func (s *MQTTEventServer) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error {
func (s *MessageQueueEventServer) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error {
statusEvent, sErr := s.statusEventService.Get(ctx, eventID)
if sErr != nil {
return fmt.Errorf("failed to get status event %s: %s", eventID, sErr.Error())
Expand Down
2 changes: 1 addition & 1 deletion test/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (helper *Helper) sendShutdownSignal() error {

func (helper *Helper) startEventServer(ctx context.Context) {
// helper.Env().Config.EventServer.SubscriptionType = "broadcast"
helper.EventServer = server.NewMQTTEventServer(helper.EventBroadcaster, helper.StatusDispatcher)
helper.EventServer = server.NewMessageQueueEventServer(helper.EventBroadcaster, helper.StatusDispatcher)
go func() {
klog.V(10).Info("Test event server started")
helper.EventServer.Start(ctx)
Expand Down

0 comments on commit d942ae2

Please sign in to comment.