Skip to content

Commit

Permalink
Notify deadline exceeded error on subscription creation (#180)
Browse files Browse the repository at this point in the history
  • Loading branch information
int128 authored Dec 7, 2023
1 parent bc7b823 commit 14e527a
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 3 deletions.
4 changes: 4 additions & 0 deletions api/v1/subscription_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,19 @@ import (
// SubscriptionSpec defines the desired state of Subscription
type SubscriptionSpec struct {
// subscription ID
//+kubebuilder:validation:XValidation:message="Immutable field",rule="self == oldSelf"
SubscriptionID string `json:"subscriptionID,omitempty"`

// project ID of subscription
//+kubebuilder:validation:XValidation:message="Immutable field",rule="self == oldSelf"
SubscriptionProjectID string `json:"subscriptionProjectID,omitempty"`

// topic ID
//+kubebuilder:validation:XValidation:message="Immutable field",rule="self == oldSelf"
TopicID string `json:"topicID,omitempty"`

// project ID of topic
//+kubebuilder:validation:XValidation:message="Immutable field",rule="self == oldSelf"
TopicProjectID string `json:"topicProjectID,omitempty"`
}

Expand Down
2 changes: 2 additions & 0 deletions api/v1/topic_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
// TopicSpec defines the desired state of Topic
type TopicSpec struct {
// ID of project
//+kubebuilder:validation:XValidation:message="Immutable field",rule="self == oldSelf"
ProjectID string `json:"projectID,omitempty"`

// ID of topic
//+kubebuilder:validation:XValidation:message="Immutable field",rule="self == oldSelf"
TopicID string `json:"topicID,omitempty"`
}

Expand Down
3 changes: 3 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"os"

"cloud.google.com/go/pubsub"
"k8s.io/utils/clock"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
Expand Down Expand Up @@ -106,6 +108,7 @@ func main() {
Scheme: mgr.GetScheme(),
NewClient: pubsub.NewClient,
Recorder: mgr.GetEventRecorderFor("subscription-controller"),
Clock: clock.RealClock{},
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Subscription")
os.Exit(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,27 @@ spec:
subscriptionID:
description: subscription ID
type: string
x-kubernetes-validations:
- message: Immutable field
rule: self == oldSelf
subscriptionProjectID:
description: project ID of subscription
type: string
x-kubernetes-validations:
- message: Immutable field
rule: self == oldSelf
topicID:
description: topic ID
type: string
x-kubernetes-validations:
- message: Immutable field
rule: self == oldSelf
topicProjectID:
description: project ID of topic
type: string
x-kubernetes-validations:
- message: Immutable field
rule: self == oldSelf
type: object
status:
description: SubscriptionStatus defines the observed state of Subscription
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,15 @@ spec:
projectID:
description: ID of project
type: string
x-kubernetes-validations:
- message: Immutable field
rule: self == oldSelf
topicID:
description: ID of topic
type: string
x-kubernetes-validations:
- message: Immutable field
rule: self == oldSelf
type: object
status:
description: TopicStatus defines the observed state of Topic
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
k8s.io/api v0.28.4
k8s.io/apimachinery v0.28.4
k8s.io/client-go v0.28.4
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2
sigs.k8s.io/controller-runtime v0.15.1
)

Expand Down Expand Up @@ -82,7 +83,6 @@ require (
k8s.io/component-base v0.27.2 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
Expand Down
13 changes: 11 additions & 2 deletions internal/controller/subscription_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
crclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand All @@ -35,12 +36,15 @@ import (

const subscriptionFinalizerName = "subscription.googlecloudpubsuboperator.quipper.github.io/finalizer"

const subscriptionCreationWarningDeadline = 10 * time.Minute

// SubscriptionReconciler reconciles a Subscription object
type SubscriptionReconciler struct {
crclient.Client
Scheme *runtime.Scheme
NewClient newPubSubClientFunc
Recorder record.EventRecorder
Clock clock.PassiveClock
}

//+kubebuilder:rbac:groups=googlecloudpubsuboperator.quipper.github.io,resources=subscriptions,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -108,8 +112,13 @@ func (r *SubscriptionReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{}, nil
}

r.Recorder.Event(&subscription, corev1.EventTypeWarning, "SubscriptionCreateError",
fmt.Sprintf("Failed to create Subscription in Pub/Sub: %s", err))
if r.Clock.Since(subscription.CreationTimestamp.Time) > subscriptionCreationWarningDeadline {
r.Recorder.Event(&subscription, corev1.EventTypeWarning, "SubscriptionCreateErrorDeadlineExceeded",
fmt.Sprintf("Failed to create Subscription for %s in Pub/Sub: %s", subscriptionCreationWarningDeadline, err))
} else {
r.Recorder.Event(&subscription, corev1.EventTypeNormal, "SubscriptionCreateError",
fmt.Sprintf("Failed to create Subscription in Pub/Sub: %s", err))
}
subscriptionPatch := crclient.MergeFrom(subscription.DeepCopy())
subscription.Status.Phase = googlecloudpubsuboperatorv1.SubscriptionStatusPhaseError
if err := r.Client.Status().Patch(ctx, &subscription, subscriptionPatch); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions internal/controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"path/filepath"
"testing"
"time"

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsub/pstest"
Expand All @@ -29,6 +30,7 @@ import (
"google.golang.org/api/option"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
clocktesting "k8s.io/utils/clock/testing"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
Expand Down Expand Up @@ -113,6 +115,8 @@ var _ = BeforeSuite(func() {
Scheme: k8sManager.GetScheme(),
NewClient: newClient,
Recorder: k8sManager.GetEventRecorderFor("subscription-controller"),
// TODO: how to change the time in test code?
Clock: clocktesting.NewFakePassiveClock(time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC)),
}).SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())

Expand Down

0 comments on commit 14e527a

Please sign in to comment.