Skip to content

Commit

Permalink
Add subscription status (#133)
Browse files Browse the repository at this point in the history
* Add subscription status

* Generated by GitHub Actions (go / generate)

https://github.com/quipper/google-cloud-pubsub-operator/actions/runs/6033848028

* Remove `CreateSubscriptionErrorInjectionReactor()`

---------

Co-authored-by: update-generated-files-action <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
int128 and github-actions[bot] authored Sep 21, 2023
1 parent 83f0254 commit b8b9eb3
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 11 deletions.
10 changes: 8 additions & 2 deletions api/v1/subscription_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,16 @@ type SubscriptionSpec struct {

// SubscriptionStatus defines the observed state of Subscription
type SubscriptionStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
Phase SubscriptionStatusPhase `json:"phase,omitempty"`
}

type SubscriptionStatusPhase string

const (
SubscriptionStatusPhaseActive SubscriptionStatusPhase = "Active"
SubscriptionStatusPhaseError SubscriptionStatusPhase = "Error"
)

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ spec:
type: object
status:
description: SubscriptionStatus defines the observed state of Subscription
properties:
phase:
type: string
type: object
type: object
served: true
Expand Down
25 changes: 22 additions & 3 deletions internal/controller/subscription_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ func (r *SubscriptionReconciler) Reconcile(ctx context.Context, req ctrl.Request
// on deleted requests.
return ctrl.Result{}, crclient.IgnoreNotFound(err)
}

logger.Info("Found the subscription", "subscription", subscription)
logger.Info("Found the subscription resource")

// examine DeletionTimestamp to determine if object is under deletion
if subscription.ObjectMeta.DeletionTimestamp.IsZero() {
Expand Down Expand Up @@ -95,14 +94,34 @@ func (r *SubscriptionReconciler) Reconcile(ctx context.Context, req ctrl.Request
if isPubSubAlreadyExistsError(err) {
// don't treat as error
logger.Info("PubSub subscription already exists")
subscriptionPatch := crclient.MergeFrom(subscription.DeepCopy())
subscription.Status.Phase = googlecloudpubsuboperatorv1.SubscriptionStatusPhaseActive
if err := r.Client.Status().Patch(ctx, &subscription, subscriptionPatch); err != nil {
logger.Error(err, "unable to update status")
return ctrl.Result{}, err
}
logger.Info("Subscription status has been patched to Active")
return ctrl.Result{}, nil
}

subscriptionPatch := crclient.MergeFrom(subscription.DeepCopy())
subscription.Status.Phase = googlecloudpubsuboperatorv1.SubscriptionStatusPhaseError
if err := r.Client.Status().Patch(ctx, &subscription, subscriptionPatch); err != nil {
logger.Error(err, "unable to update status")
return ctrl.Result{}, err
}
logger.Info("Subscription status has been patched to Error")
return ctrl.Result{}, err
}

logger.Info(fmt.Sprintf("Subscription created: %v", s.ID()), "subscription", subscription)

subscriptionPatch := crclient.MergeFrom(subscription.DeepCopy())
subscription.Status.Phase = googlecloudpubsuboperatorv1.SubscriptionStatusPhaseActive
if err := r.Client.Status().Patch(ctx, &subscription, subscriptionPatch); err != nil {
logger.Error(err, "unable to update status")
return ctrl.Result{}, err
}
logger.Info("Subscription status has been patched to Active")
return ctrl.Result{}, nil
}

Expand Down
45 changes: 40 additions & 5 deletions internal/controller/subscription_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,31 @@ import (
googlecloudpubsuboperatorv1 "github.com/quipper/google-cloud-pubsub-operator/api/v1"
"github.com/quipper/google-cloud-pubsub-operator/internal/pubsubtest"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
//+kubebuilder:scaffold:imports
)

var _ = Describe("Subscription controller", func() {
Context("When creating a Subscription resource", func() {
const projectID = "subscription-project"
It("Should create a Pub/Sub Subscription", func(ctx context.Context) {
const projectID = "subscription-project-1"
psClient, err := pubsubtest.NewClient(ctx, projectID, psServer)
Expect(err).ShouldNot(HaveOccurred())

By("Creating a Topic")
topicID := "my-topic"
_, err = psClient.CreateTopic(ctx, topicID)
Expect(err).ShouldNot(HaveOccurred())

By("Creating a Subscription")
topic := &googlecloudpubsuboperatorv1.Subscription{
subscription := &googlecloudpubsuboperatorv1.Subscription{
TypeMeta: metav1.TypeMeta{
APIVersion: "googlecloudpubsuboperator.quipper.github.io/v1",
Kind: "Subscription",
},
ObjectMeta: metav1.ObjectMeta{
Name: "example",
Namespace: "default",
GenerateName: "example-",
Namespace: "default",
},
Spec: googlecloudpubsuboperatorv1.SubscriptionSpec{
SubscriptionProjectID: projectID,
Expand All @@ -40,7 +42,7 @@ var _ = Describe("Subscription controller", func() {
TopicID: topicID,
},
}
Expect(k8sClient.Create(ctx, topic)).Should(Succeed())
Expect(k8sClient.Create(ctx, subscription)).Should(Succeed())

By("Checking if the Subscription exists")
Eventually(func(g Gomega) {
Expand All @@ -49,5 +51,38 @@ var _ = Describe("Subscription controller", func() {
g.Expect(subscriptionExists).Should(BeTrue())
}, 3*time.Second, 100*time.Millisecond).Should(Succeed())
})

It("Should update the status if error", func(ctx context.Context) {
const projectID = "subscription-project-2"

By("Creating a Subscription")
subscription := &googlecloudpubsuboperatorv1.Subscription{
TypeMeta: metav1.TypeMeta{
APIVersion: "googlecloudpubsuboperator.quipper.github.io/v1",
Kind: "Subscription",
},
ObjectMeta: metav1.ObjectMeta{
GenerateName: "example-",
Namespace: "default",
},
Spec: googlecloudpubsuboperatorv1.SubscriptionSpec{
SubscriptionProjectID: projectID,
SubscriptionID: "my-subscription",
// CreateSubscription API should fail because the topic does not exist.
// We don't need to explicitly inject an error.
TopicProjectID: projectID,
TopicID: "invalid-topic",
},
}
Expect(k8sClient.Create(ctx, subscription)).Should(Succeed())
subscriptionRef := types.NamespacedName{Namespace: subscription.Namespace, Name: subscription.Name}

By("Checking if the status is Error")
Eventually(func(g Gomega) {
var subscription googlecloudpubsuboperatorv1.Subscription
g.Expect(k8sClient.Get(ctx, subscriptionRef, &subscription)).Should(Succeed())
g.Expect(subscription.Status.Phase).Should(Equal(googlecloudpubsuboperatorv1.SubscriptionStatusPhaseError))
}, 3*time.Second, 100*time.Millisecond).Should(Succeed())
})
})
})
2 changes: 1 addition & 1 deletion internal/controller/topic_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var _ = Describe("Topic controller", func() {
})

It("Should update the status if error", func(ctx context.Context) {
const projectID = "error-injected-project-1"
const projectID = "error-injected-project-topic-1"

By("Creating a Topic")
topic := &googlecloudpubsuboperatorv1.Topic{
Expand Down

0 comments on commit b8b9eb3

Please sign in to comment.