Skip to content

Commit

Permalink
fix: clustertree bugs
Browse files Browse the repository at this point in the history
Signed-off-by: wangyizhi1 <[email protected]>
  • Loading branch information
wangyizhi1 committed Oct 29, 2023
1 parent 4e857b9 commit c5e2bb5
Show file tree
Hide file tree
Showing 10 changed files with 148 additions and 91 deletions.
35 changes: 20 additions & 15 deletions cmd/clustertree/cluster-manager/app/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,30 +117,35 @@ func run(ctx context.Context, opts *options.Options) error {
return fmt.Errorf("error starting %s: %v", clusterManager.ControllerName, err)
}

// add serviceExport controller
ServiceExportController := mcs.ServiceExportController{
RootClient: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(mcs.ServiceExportControllerName),
Logger: mgr.GetLogger(),
}
if err = ServiceExportController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting %s: %v", mcs.ServiceExportControllerName, err)
if opts.MultiClusterService {
// add serviceExport controller
ServiceExportController := mcs.ServiceExportController{
RootClient: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(mcs.ServiceExportControllerName),
Logger: mgr.GetLogger(),
}
if err = ServiceExportController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting %s: %v", mcs.ServiceExportControllerName, err)
}
}

GlobalDaemonSetService := &GlobalDaemonSetService{
opts: opts,
ctx: ctx,
defaultWorkNum: 1,
}
if err = GlobalDaemonSetService.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting global daemonset : %v", err)
if opts.DaemonSetController {
daemonSetController := &GlobalDaemonSetService{
opts: opts,
ctx: ctx,
defaultWorkNum: 1,
}
if err = daemonSetController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting global daemonset : %v", err)
}
}

// init rootPodController
RootPodReconciler := podcontrollers.RootPodReconciler{
GlobalLeafManager: globalleafManager,
RootClient: mgr.GetClient(),
DynamicRootClient: dynamicClient,
Options: opts,
}
if err := RootPodReconciler.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting RootPodReconciler %s: %v", podcontrollers.RootPodControllerName, err)
Expand Down
10 changes: 7 additions & 3 deletions cmd/clustertree/cluster-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ const (
)

type Options struct {
LeaderElection componentbaseconfig.LeaderElectionConfiguration
KubernetesOptions KubernetesOptions
ListenPort int32
LeaderElection componentbaseconfig.LeaderElectionConfiguration
KubernetesOptions KubernetesOptions
ListenPort int32
DaemonSetController bool
MultiClusterService bool
}

type KubernetesOptions struct {
Expand All @@ -43,4 +45,6 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
flags.StringVar(&o.KubernetesOptions.KubeConfig, "kubeconfig", "", "Path for kubernetes kubeconfig file, if left blank, will use in cluster way.")
flags.StringVar(&o.KubernetesOptions.Master, "master", "", "Used to generate kubeconfig for downloading, if not specified, will use host in kubeconfig.")
flags.Int32Var(&o.ListenPort, "listen-port", 10250, "Listen port for requests from the kube-apiserver.")
flags.BoolVar(&o.DaemonSetController, "daemonset-controller", false, "Turn on or off daemonset controller.")
flags.BoolVar(&o.MultiClusterService, "multi-cluster-service", false, "Turn on or off mcs support.")
}
54 changes: 26 additions & 28 deletions pkg/clustertree/cluster-manager/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ type ClusterController struct {
ControllerManagersLock sync.Mutex

RootResourceManager *utils.ResourceManager
mgr manager.Manager

GlobalLeafManager leafUtils.LeafResourceManager
}
Expand Down Expand Up @@ -116,9 +115,6 @@ func (c *ClusterController) SetupWithManager(mgr manager.Manager) error {
c.ManagerCancelFuncs = make(map[string]*context.CancelFunc)
c.ControllerManagers = make(map[string]manager.Manager)
c.Logger = mgr.GetLogger()

// TODO this may not be a good idea
c.mgr = mgr
return controllerruntime.NewControllerManagedBy(mgr).
Named(ControllerName).
WithOptions(controller.Options{}).
Expand Down Expand Up @@ -246,12 +242,13 @@ func (c *ClusterController) clearClusterControllers(cluster *clusterlinkv1alpha1
}

func (c *ClusterController) setupControllers(mgr manager.Manager, cluster *clusterlinkv1alpha1.Cluster, node *corev1.Node, clientDynamic *dynamic.DynamicClient, leafClient kubernetes.Interface, kosmosClient kosmosversioned.Interface) error {
c.GlobalLeafManager.AddLeafResource(cluster.Name, &leafUtils.LeafResource{
nodeName := fmt.Sprintf("%s%s", utils.KosmosNodePrefix, cluster.Name)
c.GlobalLeafManager.AddLeafResource(nodeName, &leafUtils.LeafResource{
Client: mgr.GetClient(),
DynamicClient: clientDynamic,
Clientset: leafClient,
NodeName: cluster.Name,
Namespace: cluster.Spec.Namespace,
NodeName: nodeName,
Namespace: "",
IgnoreLabels: strings.Split("", ","),
EnableServiceAccount: true,
})
Expand All @@ -271,23 +268,24 @@ func (c *ClusterController) setupControllers(mgr manager.Manager, cluster *clust
return fmt.Errorf("error starting %s: %v", controllers.NodeLeaseControllerName, err)
}

serviceImportController := &mcs.ServiceImportController{
LeafClient: mgr.GetClient(),
RootClient: c.Root,
RootKosmosClient: kosmosClient,
EventRecorder: mgr.GetEventRecorderFor(mcs.LeafServiceImportControllerName),
Logger: mgr.GetLogger(),
LeafNodeName: cluster.Name,
RootResourceManager: c.RootResourceManager,
}

if err := serviceImportController.AddController(mgr); err != nil {
return fmt.Errorf("error starting %s: %v", mcs.LeafServiceImportControllerName, err)
if c.Options.MultiClusterService {
serviceImportController := &mcs.ServiceImportController{
LeafClient: mgr.GetClient(),
RootClient: c.Root,
RootKosmosClient: kosmosClient,
EventRecorder: mgr.GetEventRecorderFor(mcs.LeafServiceImportControllerName),
Logger: mgr.GetLogger(),
LeafNodeName: nodeName,
RootResourceManager: c.RootResourceManager,
}
if err := serviceImportController.AddController(mgr); err != nil {
return fmt.Errorf("error starting %s: %v", mcs.LeafServiceImportControllerName, err)
}
}

leafPodController := podcontrollers.LeafPodReconciler{
RootClient: c.Root,
Namespace: cluster.Spec.Namespace,
Namespace: "",
}

if err := leafPodController.SetupWithManager(mgr); err != nil {
Expand All @@ -313,34 +311,34 @@ func (c *ClusterController) setupStorageControllers(mgr manager.Manager, node *c
return fmt.Errorf("error starting leaf pvc controller %v", err)
}

leafPVontroller := pv.LeafPVController{
leafPVController := pv.LeafPVController{
LeafClient: mgr.GetClient(),
RootClient: c.Root,
RootClientSet: c.RootClient,
NodeName: node.Name,
}
if err := leafPVontroller.SetupWithManager(mgr); err != nil {
if err := leafPVController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting leaf pv controller %v", err)
}

return nil
}

func (c *ClusterController) createNode(ctx context.Context, cluster *clusterlinkv1alpha1.Cluster, leafClient kubernetes.Interface) (*corev1.Node, error) {
nodeName := fmt.Sprintf("%s%s", utils.KosmosNodePrefix, cluster.Name)
serverVersion, err := leafClient.Discovery().ServerVersion()
if err != nil {
klog.Errorf("create node failed, can not connect to leaf %s", cluster.Name)
klog.Errorf("create node failed, can not connect to leaf %s", nodeName)
return nil, err
}

node, err := c.RootClient.CoreV1().Nodes().Get(ctx, cluster.Name, metav1.GetOptions{})
node, err := c.RootClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
klog.Errorf("create node failed, can not get node %s", cluster.Name)
klog.Errorf("create node failed, can not get node %s", nodeName)
return nil, err
}

if err != nil && errors.IsNotFound(err) {
node = utils.BuildNodeTemplate(cluster)
node = utils.BuildNodeTemplate(nodeName)
node.Status.NodeInfo.KubeletVersion = serverVersion.GitVersion
node.Status.DaemonEndpoints = corev1.NodeDaemonEndpoints{
KubeletEndpoint: corev1.DaemonEndpoint{
Expand All @@ -349,7 +347,7 @@ func (c *ClusterController) createNode(ctx context.Context, cluster *clusterlink
}
node, err = c.RootClient.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{})
if err != nil && !errors.IsAlreadyExists(err) {
klog.Errorf("create node %s failed, err: %v", cluster.Name, err)
klog.Errorf("create node %s failed, err: %v", nodeName, err)
return nil, err
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand Down Expand Up @@ -74,11 +73,22 @@ func (dopt *rootDeleteOption) ApplyToDelete(opt *client.DeleteOptions) {
}

func NewRootDeleteOption(pod *corev1.Pod) client.DeleteOption {
gracePeriodSeconds := pod.DeletionGracePeriodSeconds
// TODO
//gracePeriodSeconds := pod.DeletionGracePeriodSeconds
//
//current := metav1.NewTime(time.Now())
//if pod.DeletionTimestamp.Before(&current) {
// gracePeriodSeconds = new(int64)
//}
return &rootDeleteOption{
GracePeriodSeconds: new(int64),
}
}

current := metav1.NewTime(time.Now())
if pod.DeletionTimestamp.Before(&current) {
gracePeriodSeconds = new(int64)
func NewLeafDeleteOption(pod *corev1.Pod) client.DeleteOption {
gracePeriodSeconds := new(int64)
if pod.DeletionGracePeriodSeconds != nil {
gracePeriodSeconds = pod.DeletionGracePeriodSeconds
}

return &rootDeleteOption{
Expand Down Expand Up @@ -124,7 +134,9 @@ func (r *LeafPodReconciler) SetupWithManager(mgr manager.Manager) error {
if len(r.Namespace) > 0 && r.Namespace != obj.GetNamespace() {
return false
}
return true

p := obj.(*corev1.Pod)
return podutils.IsKosmosPod(p)
}

return ctrl.NewControllerManagedBy(mgr).
Expand All @@ -133,14 +145,14 @@ func (r *LeafPodReconciler) SetupWithManager(mgr manager.Manager) error {
For(&corev1.Pod{}, builder.WithPredicates(predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool {
// ignore create event
return false
return skipFunc(createEvent.Object)
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
pod1 := updateEvent.ObjectOld.(*corev1.Pod)
pod2 := updateEvent.ObjectNew.(*corev1.Pod)
if !skipFunc(updateEvent.ObjectNew) {
return false
}
pod1 := updateEvent.ObjectOld.(*corev1.Pod)
pod2 := updateEvent.ObjectNew.(*corev1.Pod)
return !cmp.Equal(pod1.Status, pod2.Status)
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
Expand Down
Loading

0 comments on commit c5e2bb5

Please sign in to comment.