Skip to content

Commit

Permalink
Add finalizer to clean up topic (#45)
Browse files Browse the repository at this point in the history
* Add finalizer to clean up topic

* Add finalizer logic
  • Loading branch information
int128 authored Feb 2, 2023
1 parent 7d17cf3 commit f0b10d1
Showing 1 changed file with 52 additions and 1 deletion.
53 changes: 52 additions & 1 deletion controllers/topic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"

googlecloudpubsuboperatorv1 "github.com/quipper/google-cloud-pubsub-operator/api/v1"
Expand All @@ -36,6 +37,8 @@ type TopicReconciler struct {
Scheme *runtime.Scheme
}

const topicFinalizerName = "topic.googlecloudpubsuboperator.quipper.github.io/finalizer"

//+kubebuilder:rbac:groups=googlecloudpubsuboperator.quipper.github.io,resources=topics,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=googlecloudpubsuboperator.quipper.github.io,resources=topics/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=googlecloudpubsuboperator.quipper.github.io,resources=topics/finalizers,verbs=update
Expand All @@ -51,9 +54,40 @@ func (r *TopicReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
// on deleted requests.
return ctrl.Result{}, client.IgnoreNotFound(err)
}

logger.Info("Found the topic", "topic", topic)

// examine DeletionTimestamp to determine if object is under deletion
if topic.ObjectMeta.DeletionTimestamp.IsZero() {
// The object is not being deleted, so if it does not have our finalizer,
// then lets add the finalizer and update the object. This is equivalent
// registering our finalizer.
if !controllerutil.ContainsFinalizer(&topic, topicFinalizerName) {
controllerutil.AddFinalizer(&topic, topicFinalizerName)
if err := r.Update(ctx, &topic); err != nil {
return ctrl.Result{}, err
}
}
} else {
// The object is being deleted
if controllerutil.ContainsFinalizer(&topic, topicFinalizerName) {
// our finalizer is present, so lets handle any external dependency
if err := deleteTopic(ctx, topic.Spec.ProjectID, topic.Spec.TopicID); err != nil {
// if fail to delete the external dependency here, return with error
// so that it can be retried
return ctrl.Result{}, err
}

// remove our finalizer from the list and update it.
controllerutil.RemoveFinalizer(&topic, topicFinalizerName)
if err := r.Update(ctx, &topic); err != nil {
return ctrl.Result{}, err
}
}

// Stop reconciliation as the item is being deleted
return ctrl.Result{}, nil
}

t, err := createTopic(ctx, topic.Spec.ProjectID, topic.Spec.TopicID)
if err != nil {
if gs, ok := gRPCStatusFromError(err); ok && gs.Code() == codes.AlreadyExists {
Expand Down Expand Up @@ -91,3 +125,20 @@ func createTopic(ctx context.Context, projectID, topicID string) (*pubsub.Topic,

return t, nil
}

func deleteTopic(ctx context.Context, projectID, topicID string) error {
c, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
defer c.Close()

if err := c.Topic(topicID).Delete(ctx); err != nil {
if gs, ok := gRPCStatusFromError(err); ok && gs.Code() == codes.NotFound {
// for idempotent
return nil
}
return fmt.Errorf("unable to delete topic %s: %w", topicID, err)
}
return nil
}

0 comments on commit f0b10d1

Please sign in to comment.