diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go index 595fd5036d0e1..6abd18d8309ed 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go @@ -503,6 +503,10 @@ func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watch klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, c.groupResource, c.identifier, processingTime) } + // send bookmark after sending all events in cacheInterval for watchlist request + if cacheInterval.initialEventsEndBookmark != nil { + c.sendWatchCacheEvent(cacheInterval.initialEventsEndBookmark) + } c.process(ctx, resourceVersion) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 48791bd7b63b9..716b5cc38e7cf 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -653,6 +653,8 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions return newErrWatcher(err), nil } + c.setInitialEventsEndBookmarkIfRequested(cacheInterval, opts, c.watchCache.resourceVersion) + addedWatcher := false func() { c.Lock() @@ -1439,6 +1441,26 @@ func (c *Cacher) Wait(ctx context.Context) error { return c.ready.wait(ctx) } +// setInitialEventsEndBookmarkIfRequested sets initialEventsEndBookmark field in watchCacheInterval for watchlist request +func (c *Cacher) setInitialEventsEndBookmarkIfRequested(cacheInterval *watchCacheInterval, opts storage.ListOptions, currentResourceVersion uint64) { + if opts.SendInitialEvents != nil && *opts.SendInitialEvents && opts.Predicate.AllowWatchBookmarks { + // We don't need to set the InitialEventsAnnotation for this bookmark event, + // because this will be automatically set during event conversion in cacheWatcher.convertToWatchEvent method + initialEventsEndBookmark := &watchCacheEvent{ + Type: watch.Bookmark, + Object: c.newFunc(), + ResourceVersion: currentResourceVersion, + } + + if err := c.versioner.UpdateObject(initialEventsEndBookmark.Object, initialEventsEndBookmark.ResourceVersion); err != nil { + klog.Errorf("failure to set resourceVersion to %d on initialEventsEndBookmark event %+v for watchlist request and wait for bookmark trigger to send", initialEventsEndBookmark.ResourceVersion, initialEventsEndBookmark.Object) + initialEventsEndBookmark = nil + } + + cacheInterval.initialEventsEndBookmark = initialEventsEndBookmark + } +} + // errWatcher implements watch.Interface to return a single error type errWatcher struct { result chan watch.Event diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index fa7257f216d65..5e87fa567667e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -49,6 +49,7 @@ import ( "k8s.io/apiserver/pkg/storage/cacher/metrics" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" etcdfeature "k8s.io/apiserver/pkg/storage/feature" + storagetesting "k8s.io/apiserver/pkg/storage/testing" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" k8smetrics "k8s.io/component-base/metrics" @@ -1166,6 +1167,106 @@ func TestCacherSendBookmarkEvents(t *testing.T) { } } +func TestInitialEventsEndBookmark(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true) + forceRequestWatchProgressSupport(t) + + backingStorage := &dummyStorage{} + cacher, _, err := newTestCacher(backingStorage) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } + } + + makePod := func(index uint64) *example.Pod { + return &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod-%d", index), + Namespace: "ns", + ResourceVersion: fmt.Sprintf("%v", 100+index), + }, + } + } + + numberOfPods := 3 + var expectedPodEvents []watch.Event + for i := 1; i <= numberOfPods; i++ { + pod := makePod(uint64(i)) + if err := cacher.watchCache.Add(pod); err != nil { + t.Fatalf("failed to add a pod: %v", err) + } + expectedPodEvents = append(expectedPodEvents, watch.Event{Type: watch.Added, Object: pod}) + } + var currentResourceVersion uint64 = 100 + 3 + + trueVal, falseVal := true, false + + scenarios := []struct { + name string + allowWatchBookmarks bool + sendInitialEvents *bool + }{ + { + name: "allowWatchBookmarks=false, sendInitialEvents=false", + allowWatchBookmarks: false, + sendInitialEvents: &falseVal, + }, + { + name: "allowWatchBookmarks=false, sendInitialEvents=true", + allowWatchBookmarks: false, + sendInitialEvents: &trueVal, + }, + { + name: "allowWatchBookmarks=true, sendInitialEvents=true", + allowWatchBookmarks: true, + sendInitialEvents: &trueVal, + }, + { + name: "allowWatchBookmarks=true, sendInitialEvents=false", + allowWatchBookmarks: true, + sendInitialEvents: &falseVal, + }, + { + name: "allowWatchBookmarks=false, sendInitialEvents=nil", + allowWatchBookmarks: true, + }, + } + + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + expectedWatchEvents := expectedPodEvents + if scenario.allowWatchBookmarks && scenario.sendInitialEvents != nil && *scenario.sendInitialEvents { + expectedWatchEvents = append(expectedWatchEvents, watch.Event{ + Type: watch.Bookmark, + Object: &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: strconv.FormatUint(currentResourceVersion, 10), + Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"}, + }, + }, + }) + } + + pred := storage.Everything + pred.AllowWatchBookmarks = scenario.allowWatchBookmarks + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "100", SendInitialEvents: scenario.sendInitialEvents, Predicate: pred}) + if err != nil { + t.Fatalf("Failed to create watch: %v", err) + } + storagetesting.TestCheckResultsInStrictOrder(t, w, expectedWatchEvents) + storagetesting.TestCheckNoMoreResultsWithIgnoreFunc(t, w, nil) + }) + } +} + func TestCacherSendsMultipleWatchBookmarks(t *testing.T) { backingStorage := &dummyStorage{} cacher, _, err := newTestCacher(backingStorage) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go index fa7d38946860e..89a6b5e95d9d6 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go @@ -98,6 +98,9 @@ type watchCacheInterval struct { // Given that indexer and indexValidator only read state, if // possible, Locker obtained through RLocker() is provided. lock sync.Locker + + // initialEventsEndBookmark will be sent after sending all events in cacheInterval + initialEventsEndBookmark *watchCacheEvent } type attrFunc func(runtime.Object) (labels.Set, fields.Set, error) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go index e30f086184a87..0f19048d670dc 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go @@ -169,6 +169,36 @@ func testCheckResultFunc(t *testing.T, w watch.Interface, check func(actualEvent } } +func testCheckResultWithIgnoreFunc(t *testing.T, w watch.Interface, expectedEvents []watch.Event, ignore func(watch.Event) bool) { + checkIndex := 0 + for { + select { + case event := <-w.ResultChan(): + obj := event.Object + if co, ok := obj.(runtime.CacheableObject); ok { + event.Object = co.GetObject() + } + if ignore != nil && ignore(event) { + continue + } + if checkIndex < len(expectedEvents) { + expectNoDiff(t, "incorrect event", expectedEvents[checkIndex], event) + checkIndex++ + } else { + t.Fatalf("cannot receive correct event, expect no event, but get a event: %+v", event) + } + case <-time.After(100 * time.Millisecond): + // wait 100ms forcibly in order to receive watchEvents including bookmark event. + // we cannot guarantee that we will receive all bookmark events within 100ms, + // but too large timeout value will lead to exceed the timeout of package test. + if checkIndex < len(expectedEvents) { + t.Fatalf("cannot receive enough events within specific time, rest expected events: %+v", expectedEvents[checkIndex:]) + } + return + } + } +} + func testCheckStop(t *testing.T, w watch.Interface) { select { case e, ok := <-w.ResultChan(): @@ -187,16 +217,18 @@ func testCheckStop(t *testing.T, w watch.Interface) { } } -func testCheckResultsInStrictOrder(t *testing.T, w watch.Interface, expectedEvents []watch.Event) { +func TestCheckResultsInStrictOrder(t *testing.T, w watch.Interface, expectedEvents []watch.Event) { for _, expectedEvent := range expectedEvents { testCheckResult(t, w, expectedEvent) } } -func testCheckNoMoreResults(t *testing.T, w watch.Interface) { +func TestCheckNoMoreResultsWithIgnoreFunc(t *testing.T, w watch.Interface, ignore func(watch.Event) bool) { select { case e := <-w.ResultChan(): - t.Errorf("Unexpected: %#v event received, expected no events", e) + if ignore == nil || !ignore(e) { + t.Errorf("Unexpected: %#v event received, expected no events", e) + } // We consciously make the timeout short here to speed up tests. case <-time.After(100 * time.Millisecond): return diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go index dee88a0e61d6b..ad4bab27496f2 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go @@ -1478,7 +1478,7 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac defer w.Stop() // make sure we only get initial events - testCheckResultsInStrictOrder(t, w, scenario.expectedInitialEvents(createdPods)) + TestCheckResultsInStrictOrder(t, w, scenario.expectedInitialEvents(createdPods)) // make sure that the actual bookmark has at least RV >= to the expected one if scenario.expectedInitialEventsBookmarkWithMinimalRV != nil { @@ -1512,8 +1512,9 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac require.NoError(t, err, "failed to add a pod: %v") createdPods = append(createdPods, out) } - testCheckResultsInStrictOrder(t, w, scenario.expectedEventsAfterEstablishingWatch(createdPods)) - testCheckNoMoreResults(t, w) + ignoreEventsFn := func(event watch.Event) bool { return event.Type == watch.Bookmark } + testCheckResultWithIgnoreFunc(t, w, scenario.expectedEventsAfterEstablishingWatch(createdPods), ignoreEventsFn) + TestCheckNoMoreResultsWithIgnoreFunc(t, w, ignoreEventsFn) }) } } @@ -1567,8 +1568,63 @@ func RunWatchSemanticInitialEventsExtended(ctx context.Context, t *testing.T, st // make sure we only get initial events from the first ns // followed by the bookmark with the global RV - testCheckResultsInStrictOrder(t, w, expectedInitialEventsInStrictOrder(initialPods, otherNsPod.ResourceVersion)) - testCheckNoMoreResults(t, w) + TestCheckResultsInStrictOrder(t, w, expectedInitialEventsInStrictOrder(initialPods, otherNsPod.ResourceVersion)) + TestCheckNoMoreResultsWithIgnoreFunc(t, w, nil) +} + +func RunWatchListMatchSingle(ctx context.Context, t *testing.T, store storage.Interface) { + trueVal := true + expectedInitialEventsInStrictOrder := func(initialPod *example.Pod, globalResourceVersion string) []watch.Event { + watchEvents := []watch.Event{} + watchEvents = append(watchEvents, watch.Event{Type: watch.Added, Object: initialPod}) + watchEvents = append(watchEvents, watch.Event{Type: watch.Bookmark, Object: &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: globalResourceVersion, + Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"}, + }, + }}) + return watchEvents + } + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true) + + // add the pod for which the field selector will be constructed + ns := "ns-foo" + expectedPod := &example.Pod{} + initialPod := makePod("1") + initialPod.Namespace = ns + err := store.Create(ctx, computePodKey(initialPod), initialPod, expectedPod, 0) + require.NoError(t, err, "failed to add a pod: %v") + + // add more pods that won't match the field selector + lastAddedPod := &example.Pod{} + for _, otherPod := range []*example.Pod{makePod("2"), makePod("3"), makePod("4"), makePod("5")} { + otherPod.Namespace = ns + err = store.Create(ctx, computePodKey(otherPod), otherPod, lastAddedPod, 0) + require.NoError(t, err, "failed to add a pod: %v") + } + + opts := storage.ListOptions{ + Predicate: storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.ParseSelectorOrDie("metadata.name=pod-1"), + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, nil + }, + }, + Recursive: true, + } + opts.SendInitialEvents = &trueVal + opts.Predicate.AllowWatchBookmarks = true + + w, err := store.Watch(context.Background(), "/pods", opts) + require.NoError(t, err, "failed to create watch: %v") + defer w.Stop() + + // make sure we only get a single pod matching the field selector + // followed by the bookmark with the global RV + TestCheckResultsInStrictOrder(t, w, expectedInitialEventsInStrictOrder(expectedPod, lastAddedPod.ResourceVersion)) + TestCheckNoMoreResultsWithIgnoreFunc(t, w, nil) } func makePod(namePrefix string) *example.Pod {