Skip to content

Commit

Permalink
fix: print all the resources during watch in scenarios involving mult…
Browse files Browse the repository at this point in the history
…iple replicas (#1293)
  • Loading branch information
Yangyang96 authored Sep 23, 2024
1 parent 4318f92 commit 0b7337c
Showing 1 changed file with 136 additions and 58 deletions.
194 changes: 136 additions & 58 deletions pkg/engine/runtime/kubernetes/kubernetes_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"os"
"strings"
"time"

jsonpatch "github.com/evanphx/json-patch"
yamlv2 "gopkg.in/yaml.v2"
Expand Down Expand Up @@ -46,6 +47,12 @@ type KubernetesRuntime struct {
mapper meta.RESTMapper
}

// KubernetesWatchEvent is a wrapper of k8swatch.Event
type KubernetesWatchEvent struct {
Event <-chan k8swatch.Event
Resource *unstructured.Unstructured
}

// NewKubernetesRuntime create a new KubernetesRuntime
func NewKubernetesRuntime(spec apiv1.Spec) (runtime.Runtime, error) {
client, mapper, err := getKubernetesClient(spec)
Expand Down Expand Up @@ -334,19 +341,19 @@ func (k *KubernetesRuntime) Watch(ctx context.Context, request *runtime.WatchReq
if err != nil {
return &runtime.WatchResponse{Status: v1.NewErrorStatus(err)}
}
rootCh := doWatch(ctx, w, func(watched *unstructured.Unstructured) bool {
events := doWatch(ctx, w, 1, func(watched *unstructured.Unstructured) bool {
return watched.GetName() == reqObj.GetName()
})

if rootCh == nil {
if len(events) == 0 {
return &runtime.WatchResponse{Status: v1.NewErrorStatus(fmt.Errorf("failed to get the root channel for watching %s",
request.Resource.ResourceKey()))}
}

// Collect all
watchers := runtime.NewWatchers()
watchers.Insert(engine.BuildIDForKubernetes(reqObj), rootCh)

// reqObj has only one watch event
watchers.Insert(engine.BuildIDForKubernetes(reqObj), events[0].Event)
if reqObj.GetKind() == convertor.Service { // Watch Endpoints or EndpointSlice
if gvk, err := k.mapper.KindFor(schema.GroupVersionResource{
Group: discoveryv1.SchemeGroupVersion.Group,
Expand All @@ -355,39 +362,64 @@ func (k *KubernetesRuntime) Watch(ctx context.Context, request *runtime.WatchReq
}); gvk.Empty() || err != nil { // Watch Endpoints
log.Errorf("k8s runtime has no kind for EndpointSlice, err: %v", err)
namedGVK := getNamedGVK(reqObj.GroupVersionKind())
ch, dependent, err := k.WatchByRelation(ctx, reqObj, namedGVK, namedBy)
events, err = k.WatchByRelation(ctx, reqObj, namedGVK, 1, namedBy)
if err != nil {
return &runtime.WatchResponse{Status: v1.NewErrorStatus(err)}
}
watchers.Insert(engine.BuildIDForKubernetes(dependent), ch)

// Endpoints has only one watch event
watchers.Insert(engine.BuildIDForKubernetes(events[0].Resource), events[0].Event)
} else { // Watch EndpointSlice
dependentGVK := getDependentGVK(reqObj.GroupVersionKind())
ch, dependent, err := k.WatchByRelation(ctx, reqObj, dependentGVK, ownedBy)
events, err = k.WatchByRelation(ctx, reqObj, dependentGVK, 1, ownedBy)
if err != nil {
return &runtime.WatchResponse{Status: v1.NewErrorStatus(err)}
}
watchers.Insert(engine.BuildIDForKubernetes(dependent), ch)

// EndpointSlice has only one watch event
watchers.Insert(engine.BuildIDForKubernetes(events[0].Resource), events[0].Event)
}
} else if reqObj.GetKind() == convertor.Namespace { // Watch Namespace's default serviceAccount
dependentGVK := getDependentGVK(reqObj.GroupVersionKind())
events, err = k.WatchByRelation(ctx, reqObj, dependentGVK, 1, isDefaultServiceAccount)
if err != nil {
return &runtime.WatchResponse{Status: v1.NewErrorStatus(err)}
}
// default serviceAccount has only one watch event
watchers.Insert(engine.BuildIDForKubernetes(events[0].Resource), events[0].Event)
} else {
depth := 1
// default amount of dependent resource
amount := 1
// Try to get dependent resource by owner reference
dependentGVK := getDependentGVK(reqObj.GroupVersionKind())
if !dependentGVK.Empty() {
owner := reqObj
for !dependentGVK.Empty() {
ch, dependent, err := k.WatchByRelation(ctx, owner, dependentGVK, ownedBy)
// Get amount of dependent resource if it start to watch the maximum depth
// For instance, Deployments, StatefulSets have more than 1 replica, Cronjobs have more than 1 completion
if depth > 1 {
amount = getDependentAmount(reqObj)
}
// Get all the watch events in this depth
events, err := k.WatchByRelation(ctx, owner, dependentGVK, amount, ownedBy)
if err != nil {
return &runtime.WatchResponse{Status: v1.NewErrorStatus(err)}
}

if dependent == nil {
if len(events) == 0 {
break
}
watchers.Insert(engine.BuildIDForKubernetes(dependent), ch)
// Send all the watch events
for _, event := range events {
watchers.Insert(engine.BuildIDForKubernetes(event.Resource), event.Event)
}

// Try to get deeper, max depth is 3 including root
dependentGVK = getDependentGVK(dependent.GroupVersionKind())
// Replace owner
owner = dependent
dependentGVK = getDependentGVK(events[0].Resource.GroupVersionKind())
// Replace owner and increase depth
owner = events[0].Resource
depth += 1
}
}
}
Expand Down Expand Up @@ -543,68 +575,58 @@ func convertString2Unstructured(yamlContent []byte) (*unstructured.Unstructured,
return obj, gvk, nil
}

// WatchBySelector watch resources by gvk and filter by selector
func (k *KubernetesRuntime) WatchBySelector(
ctx context.Context,
o *unstructured.Unstructured,
gvk schema.GroupVersionKind,
id, labelStr string,
) (<-chan k8swatch.Event, error) {
clientForResource, err := buildDynamicResource(k.client, k.mapper, &gvk, id, o.GetNamespace())
if err != nil {
return nil, err
}

w, err := clientForResource.Watch(ctx, metav1.ListOptions{LabelSelector: labelStr})
if err != nil {
return nil, err
}

return doWatch(ctx, w, nil), nil
}

// WatchByRelation watched resources by giving gvk if related() return true
func (k *KubernetesRuntime) WatchByRelation(
ctx context.Context,
cur *unstructured.Unstructured,
gvk schema.GroupVersionKind,
amount int,
related func(watched, cur *unstructured.Unstructured) bool,
) (<-chan k8swatch.Event, *unstructured.Unstructured, error) {
) ([]KubernetesWatchEvent, error) {
mapping, err := k.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, nil, err
return nil, err
}

clientForResource := k.client.Resource(mapping.Resource).Namespace(cur.GetNamespace())
w, err := clientForResource.Watch(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, nil, err
return nil, err
}

var next *unstructured.Unstructured

eventCh := doWatch(ctx, w, func(watched *unstructured.Unstructured) bool {
// Get all the watch events of the dependents of a resource
events := doWatch(ctx, w, amount, func(watched *unstructured.Unstructured) bool {
ok := related(watched, cur)
if ok {
next = watched
}
return ok
})
if eventCh == nil {
if len(events) == 0 {
err = fmt.Errorf("failed to get the event channel for watching related resources of %s with kind of %s",
cur.GetName(), cur.GetKind())
}

return eventCh, next, err
return events, err
}

// doWatch send watched object if check ok
func doWatch(ctx context.Context, watcher k8swatch.Interface, checker func(watched *unstructured.Unstructured) bool) <-chan k8swatch.Event {
// Buffered channel, store new event
resultCh := make(chan k8swatch.Event, 1)
// Wait for the first watched obj
first := true
// doWatch send KubernetesWatchEvent Slice which contains all the watch channels and their belonging resources if check ok
func doWatch(ctx context.Context, watcher k8swatch.Interface, amount int, checker func(watched *unstructured.Unstructured) bool) []KubernetesWatchEvent {
events := []KubernetesWatchEvent{}
eventMap := make(map[string]chan k8swatch.Event) // eventMap to store the watch channels to use the same watch channel for each resource
signal := make(chan struct{})
timeoutCh := make(chan struct{})
first := false
// No resource amount is specified
if amount == -1 {
go func() {
// Wait for 2 seconds after receiving the first event
for {
if first {
log.Infof("dependent amount is not specified, will return after timeout")
time.Sleep(2 * time.Second)
timeoutCh <- struct{}{}
}
}
}()
}

go func() {
defer watcher.Stop()
for {
Expand All @@ -614,12 +636,26 @@ func doWatch(ctx context.Context, watcher k8swatch.Interface, checker func(watch
if !ok {
break
}
if !first {
first = true
}
// Check
if checker == nil || checker != nil && checker(dependent) {
resultCh <- e
if first {
signal <- struct{}{}
first = false
resName := dependent.GetName()
ch, ok := eventMap[resName]
if ok && ch != nil {
ch <- e
} else {
// Buffered channel, put new event into the event map
ch := make(chan k8swatch.Event, 1)
ch <- e
eventMap[resName] = ch
// Add channels to be consumed during watching
events = append(events, KubernetesWatchEvent{Event: ch, Resource: dependent})
// Got enough events
if len(events) == amount {
signal <- struct{}{}
}
}
}
case <-ctx.Done():
Expand All @@ -628,15 +664,25 @@ func doWatch(ctx context.Context, watcher k8swatch.Interface, checker func(watch
}
}()

// Owner&Dependent check pass, return the dependent Obj
// Return the watch events channels and their belonging resources
select {
// Got enough events
case <-signal:
return resultCh
return events
// Context done
case <-ctx.Done():
return nil
// No specified amount of events, return after timeout
case <-timeoutCh:
return events
}
}

// Judge dependent is default service account
func isDefaultServiceAccount(dependent, owner *unstructured.Unstructured) bool {
return dependent.GetName() == "default" && dependent.GetNamespace() == owner.GetName()
}

// Judge dependent is owned by owner
func ownedBy(dependent, owner *unstructured.Unstructured) bool {
// Parse dependent's metadata.ownerReferences
Expand Down Expand Up @@ -706,6 +752,13 @@ func getDependentGVK(gvk schema.GroupVersionKind) schema.GroupVersionKind {
Version: discoveryv1.SchemeGroupVersion.Version,
Kind: convertor.EndpointSlice,
}
// Namespace is the owner of ServiceAccount
case convertor.Namespace:
return schema.GroupVersionKind{
Group: corev1.SchemeGroupVersion.Group,
Version: corev1.SchemeGroupVersion.Version,
Kind: convertor.ServiceAccount,
}
default:
return schema.GroupVersionKind{}
}
Expand All @@ -724,6 +777,31 @@ func getNamedGVK(gvk schema.GroupVersionKind) schema.GroupVersionKind {
}
}

// Get the amount of dependent resources of the owner
func getDependentAmount(object *unstructured.Unstructured) int {
gvk := object.GroupVersionKind()

switch gvk.Kind {
case convertor.Deployment:
if replicas, found, err := unstructured.NestedInt64(object.Object, "spec", "replicas"); found && err == nil {
return int(replicas)
}
return 1
case convertor.StatefulSet:
if replicas, found, err := unstructured.NestedInt64(object.Object, "spec", "replicas"); found && err == nil {
return int(replicas)
}
return 1
case convertor.CronJob:
if completions, found, err := unstructured.NestedInt64(object.Object, "spec", "jobTemplate", "spec", "completions"); found && err == nil {
return int(completions)
}
return 1
default:
return -1
}
}

func validateResourceID(id string, gvk *schema.GroupVersionKind) error {
keys := strings.Split(id, engine.Separator)
if len(keys) < 2 {
Expand Down

0 comments on commit 0b7337c

Please sign in to comment.