Skip to content

Commit

Permalink
Merge pull request #10849 from cahillsf/populate-mpm-status-version
Browse files Browse the repository at this point in the history
✨ Set Kubernetes version in machinepool machine Status.Version
  • Loading branch information
k8s-ci-robot authored Jul 22, 2024
2 parents 248ce59 + 12d8855 commit 5a04c4d
Show file tree
Hide file tree
Showing 6 changed files with 411 additions and 87 deletions.
29 changes: 25 additions & 4 deletions exp/internal/controllers/machinepool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,21 @@ type MachinePoolReconciler struct {
externalTracker external.ObjectTracker
}

// scope holds the different objects that are read and used during the reconcile.
type scope struct {
// cluster is the Cluster object the Machine belongs to.
// It is set at the beginning of the reconcile function.
cluster *clusterv1.Cluster

// machinePool is the MachinePool object. It is set at the beginning
// of the reconcile function.
machinePool *expv1.MachinePool

// nodeRefMapResult is a map of providerIDs to Nodes that are associated with the Cluster.
// It is set after reconcileInfrastructure is called.
nodeRefMap map[string]*corev1.Node
}

func (r *MachinePoolReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
clusterToMachinePools, err := util.ClusterToTypedObjectsMapper(mgr.GetClient(), &expv1.MachinePoolList{}, mgr.GetScheme())
if err != nil {
Expand Down Expand Up @@ -210,7 +225,11 @@ func (r *MachinePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

// Handle normal reconciliation loop.
res, err := r.reconcile(ctx, cluster, mp)
scope := &scope{
cluster: cluster,
machinePool: mp,
}
res, err := r.reconcile(ctx, scope)
// Requeue if the reconcile failed because the ClusterCacheTracker was locked for
// the current cluster because of concurrent access.
if errors.Is(err, remote.ErrClusterLocked) {
Expand All @@ -220,16 +239,18 @@ func (r *MachinePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return res, err
}

func (r *MachinePoolReconciler) reconcile(ctx context.Context, cluster *clusterv1.Cluster, mp *expv1.MachinePool) (ctrl.Result, error) {
func (r *MachinePoolReconciler) reconcile(ctx context.Context, s *scope) (ctrl.Result, error) {
// Ensure the MachinePool is owned by the Cluster it belongs to.
cluster := s.cluster
mp := s.machinePool
mp.SetOwnerReferences(util.EnsureOwnerRef(mp.GetOwnerReferences(), metav1.OwnerReference{
APIVersion: clusterv1.GroupVersion.String(),
Kind: "Cluster",
Name: cluster.Name,
UID: cluster.UID,
}))

phases := []func(context.Context, *clusterv1.Cluster, *expv1.MachinePool) (ctrl.Result, error){
phases := []func(context.Context, *scope) (ctrl.Result, error){
r.reconcileBootstrap,
r.reconcileInfrastructure,
r.reconcileNodeRefs,
Expand All @@ -239,7 +260,7 @@ func (r *MachinePoolReconciler) reconcile(ctx context.Context, cluster *clusterv
errs := []error{}
for _, phase := range phases {
// Call the inner reconciliation methods.
phaseResult, err := phase(ctx, cluster, mp)
phaseResult, err := phase(ctx, s)
if err != nil {
errs = append(errs, err)
}
Expand Down
37 changes: 12 additions & 25 deletions exp/internal/controllers/machinepool_controller_noderef.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ type getNodeReferencesResult struct {
ready int
}

func (r *MachinePoolReconciler) reconcileNodeRefs(ctx context.Context, cluster *clusterv1.Cluster, mp *expv1.MachinePool) (ctrl.Result, error) {
func (r *MachinePoolReconciler) reconcileNodeRefs(ctx context.Context, s *scope) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)
cluster := s.cluster
mp := s.machinePool

// Create a watch on the nodes in the Cluster.
if err := r.watchClusterNodes(ctx, cluster); err != nil {
Expand Down Expand Up @@ -81,8 +83,13 @@ func (r *MachinePoolReconciler) reconcileNodeRefs(ctx context.Context, cluster *
return ctrl.Result{}, err
}

// Return early if nodeRefMap is nil.
if s.nodeRefMap == nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to get node references")
}

// Get the Node references.
nodeRefsResult, err := r.getNodeReferences(ctx, clusterClient, mp.Spec.ProviderIDList, mp.Spec.MinReadySeconds)
nodeRefsResult, err := r.getNodeReferences(ctx, mp.Spec.ProviderIDList, mp.Spec.MinReadySeconds, s.nodeRefMap)
if err != nil {
if err == errNoAvailableNodes {
log.Info("Cannot assign NodeRefs to MachinePool, no matching Nodes")
Expand Down Expand Up @@ -153,30 +160,10 @@ func (r *MachinePoolReconciler) deleteRetiredNodes(ctx context.Context, c client
return nil
}

func (r *MachinePoolReconciler) getNodeReferences(ctx context.Context, c client.Client, providerIDList []string, minReadySeconds *int32) (getNodeReferencesResult, error) {
func (r *MachinePoolReconciler) getNodeReferences(ctx context.Context, providerIDList []string, minReadySeconds *int32, nodeRefsMap map[string]*corev1.Node) (getNodeReferencesResult, error) {
log := ctrl.LoggerFrom(ctx, "providerIDList", len(providerIDList))

var ready, available int
nodeRefsMap := make(map[string]corev1.Node)
nodeList := corev1.NodeList{}
for {
if err := c.List(ctx, &nodeList, client.Continue(nodeList.Continue)); err != nil {
return getNodeReferencesResult{}, errors.Wrapf(err, "failed to List nodes")
}

for _, node := range nodeList.Items {
if node.Spec.ProviderID == "" {
log.V(2).Info("No ProviderID detected, skipping", "providerID", node.Spec.ProviderID)
continue
}

nodeRefsMap[node.Spec.ProviderID] = node
}

if nodeList.Continue == "" {
break
}
}

var nodeRefs []corev1.ObjectReference
for _, providerID := range providerIDList {
Expand All @@ -185,9 +172,9 @@ func (r *MachinePoolReconciler) getNodeReferences(ctx context.Context, c client.
continue
}
if node, ok := nodeRefsMap[providerID]; ok {
if noderefutil.IsNodeReady(&node) {
if noderefutil.IsNodeReady(node) {
ready++
if noderefutil.IsNodeAvailable(&node, *minReadySeconds, metav1.Now()) {
if noderefutil.IsNodeAvailable(node, *minReadySeconds, metav1.Now()) {
available++
}
}
Expand Down
21 changes: 9 additions & 12 deletions exp/internal/controllers/machinepool_controller_noderef_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@ func TestMachinePoolGetNodeReference(t *testing.T) {
Client: fake.NewClientBuilder().Build(),
recorder: record.NewFakeRecorder(32),
}

nodeList := []client.Object{
&corev1.Node{
nodeRefsMap := map[string]*corev1.Node{
"aws://us-east-1/id-node-1": {
ObjectMeta: metav1.ObjectMeta{
Name: "node-1",
},
Expand All @@ -54,7 +53,7 @@ func TestMachinePoolGetNodeReference(t *testing.T) {
},
},
},
&corev1.Node{
"aws://us-west-2/id-node-2": {
ObjectMeta: metav1.ObjectMeta{
Name: "node-2",
},
Expand All @@ -70,15 +69,15 @@ func TestMachinePoolGetNodeReference(t *testing.T) {
},
},
},
&corev1.Node{
"aws://us-west-2/id-node-3": {
ObjectMeta: metav1.ObjectMeta{
Name: "node-3",
},
Spec: corev1.NodeSpec{
ProviderID: "aws://us-west-2/id-node-3",
},
},
&corev1.Node{
"gce://us-central1/gce-id-node-2": {
ObjectMeta: metav1.ObjectMeta{
Name: "gce-node-2",
},
Expand All @@ -94,7 +93,7 @@ func TestMachinePoolGetNodeReference(t *testing.T) {
},
},
},
&corev1.Node{
"azure://westus2/id-node-4": {
ObjectMeta: metav1.ObjectMeta{
Name: "azure-node-4",
},
Expand All @@ -110,7 +109,7 @@ func TestMachinePoolGetNodeReference(t *testing.T) {
},
},
},
&corev1.Node{
"azure://westus2/id-nodepool1/0": {
ObjectMeta: metav1.ObjectMeta{
Name: "azure-nodepool1-0",
},
Expand All @@ -126,7 +125,7 @@ func TestMachinePoolGetNodeReference(t *testing.T) {
},
},
},
&corev1.Node{
"azure://westus2/id-nodepool2/0": {
ObjectMeta: metav1.ObjectMeta{
Name: "azure-nodepool2-0",
},
Expand All @@ -144,8 +143,6 @@ func TestMachinePoolGetNodeReference(t *testing.T) {
},
}

client := fake.NewClientBuilder().WithObjects(nodeList...).Build()

testCases := []struct {
name string
providerIDList []string
Expand Down Expand Up @@ -284,7 +281,7 @@ func TestMachinePoolGetNodeReference(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
g := NewWithT(t)

result, err := r.getNodeReferences(ctx, client, test.providerIDList, ptr.To(test.minReadySeconds))
result, err := r.getNodeReferences(ctx, test.providerIDList, ptr.To(test.minReadySeconds), nodeRefsMap)
if test.err == nil {
g.Expect(err).ToNot(HaveOccurred())
} else {
Expand Down
81 changes: 69 additions & 12 deletions exp/internal/controllers/machinepool_controller_phases.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,10 @@ func (r *MachinePoolReconciler) reconcileExternal(ctx context.Context, cluster *
}

// reconcileBootstrap reconciles the Spec.Bootstrap.ConfigRef object on a MachinePool.
func (r *MachinePoolReconciler) reconcileBootstrap(ctx context.Context, cluster *clusterv1.Cluster, m *expv1.MachinePool) (ctrl.Result, error) {
func (r *MachinePoolReconciler) reconcileBootstrap(ctx context.Context, s *scope) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)

cluster := s.cluster
m := s.machinePool
// Call generic external reconciler if we have an external reference.
var bootstrapConfig *unstructured.Unstructured
if m.Spec.Template.Spec.Bootstrap.ConfigRef != nil {
Expand Down Expand Up @@ -241,9 +242,10 @@ func (r *MachinePoolReconciler) reconcileBootstrap(ctx context.Context, cluster
}

// reconcileInfrastructure reconciles the Spec.InfrastructureRef object on a MachinePool.
func (r *MachinePoolReconciler) reconcileInfrastructure(ctx context.Context, cluster *clusterv1.Cluster, mp *expv1.MachinePool) (ctrl.Result, error) {
func (r *MachinePoolReconciler) reconcileInfrastructure(ctx context.Context, s *scope) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)

cluster := s.cluster
mp := s.machinePool
// Call generic external reconciler.
infraReconcileResult, err := r.reconcileExternal(ctx, cluster, mp, &mp.Spec.Template.Spec.InfrastructureRef)
if err != nil {
Expand Down Expand Up @@ -283,8 +285,19 @@ func (r *MachinePoolReconciler) reconcileInfrastructure(ctx context.Context, clu
conditions.WithFallbackValue(ready, clusterv1.WaitingForInfrastructureFallbackReason, clusterv1.ConditionSeverityInfo, ""),
)

if err := r.reconcileMachines(ctx, mp, infraConfig); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to reconcile Machines for MachinePool %s", klog.KObj(mp))
clusterClient, err := r.Tracker.GetClient(ctx, util.ObjectKey(cluster))
if err != nil {
return ctrl.Result{}, err
}

var getNodeRefsErr error
// Get the nodeRefsMap from the cluster.
s.nodeRefMap, getNodeRefsErr = r.getNodeRefMap(ctx, clusterClient)

err = r.reconcileMachines(ctx, s, infraConfig)

if err != nil || getNodeRefsErr != nil {
return ctrl.Result{}, kerrors.NewAggregate([]error{errors.Wrapf(err, "failed to reconcile Machines for MachinePool %s", klog.KObj(mp)), errors.Wrapf(getNodeRefsErr, "failed to get nodeRefs for MachinePool %s", klog.KObj(mp))})
}

if !mp.Status.InfrastructureReady {
Expand Down Expand Up @@ -328,8 +341,9 @@ func (r *MachinePoolReconciler) reconcileInfrastructure(ctx context.Context, clu
// infrastructure is created accordingly.
// Note: When supported by the cloud provider implementation of the MachinePool, machines will provide a means to interact
// with the corresponding infrastructure (e.g. delete a specific machine in case MachineHealthCheck detects it is unhealthy).
func (r *MachinePoolReconciler) reconcileMachines(ctx context.Context, mp *expv1.MachinePool, infraMachinePool *unstructured.Unstructured) error {
func (r *MachinePoolReconciler) reconcileMachines(ctx context.Context, s *scope, infraMachinePool *unstructured.Unstructured) error {
log := ctrl.LoggerFrom(ctx)
mp := s.machinePool

var infraMachineKind string
if err := util.UnstructuredUnmarshalField(infraMachinePool, &infraMachineKind, "status", "infrastructureMachineKind"); err != nil {
Expand Down Expand Up @@ -376,15 +390,15 @@ func (r *MachinePoolReconciler) reconcileMachines(ctx context.Context, mp *expv1
return err
}

if err := r.createOrUpdateMachines(ctx, mp, machineList.Items, infraMachineList.Items); err != nil {
if err := r.createOrUpdateMachines(ctx, s, machineList.Items, infraMachineList.Items); err != nil {
return errors.Wrapf(err, "failed to create machines for MachinePool %q in namespace %q", mp.Name, mp.Namespace)
}

return nil
}

// createOrUpdateMachines creates a MachinePool Machine for each infraMachine if it doesn't already exist and sets the owner reference and infraRef.
func (r *MachinePoolReconciler) createOrUpdateMachines(ctx context.Context, mp *expv1.MachinePool, machines []clusterv1.Machine, infraMachines []unstructured.Unstructured) error {
func (r *MachinePoolReconciler) createOrUpdateMachines(ctx context.Context, s *scope, machines []clusterv1.Machine, infraMachines []unstructured.Unstructured) error {
log := ctrl.LoggerFrom(ctx)

// Construct a set of names of infraMachines that already have a Machine.
Expand All @@ -398,19 +412,30 @@ func (r *MachinePoolReconciler) createOrUpdateMachines(ctx context.Context, mp *
var errs []error
for i := range infraMachines {
infraMachine := &infraMachines[i]

// Get Spec.ProviderID from the infraMachine.
var providerID string
var node *corev1.Node
if err := util.UnstructuredUnmarshalField(infraMachine, &providerID, "spec", "providerID"); err != nil {
log.V(4).Info("could not retrieve providerID for infraMachine", "infraMachine", klog.KObj(infraMachine))
} else {
// Retrieve the Node for the infraMachine from the nodeRefsMap using the providerID.
node = s.nodeRefMap[providerID]
}

// If infraMachine already has a Machine, update it if needed.
if existingMachine, ok := infraMachineToMachine[infraMachine.GetName()]; ok {
log.V(2).Info("Patching existing Machine for infraMachine", infraMachine.GetKind(), klog.KObj(infraMachine), "Machine", klog.KObj(&existingMachine))

desiredMachine := computeDesiredMachine(mp, infraMachine, &existingMachine)
desiredMachine := r.computeDesiredMachine(s.machinePool, infraMachine, &existingMachine, node)
if err := ssa.Patch(ctx, r.Client, MachinePoolControllerName, desiredMachine, ssa.WithCachingProxy{Cache: r.ssaCache, Original: &existingMachine}); err != nil {
log.Error(err, "failed to update Machine", "Machine", klog.KObj(desiredMachine))
errs = append(errs, errors.Wrapf(err, "failed to update Machine %q", klog.KObj(desiredMachine)))
}
} else {
// Otherwise create a new Machine for the infraMachine.
log.Info("Creating new Machine for infraMachine", "infraMachine", klog.KObj(infraMachine))
machine := computeDesiredMachine(mp, infraMachine, nil)
machine := r.computeDesiredMachine(s.machinePool, infraMachine, nil, node)

if err := ssa.Patch(ctx, r.Client, MachinePoolControllerName, machine); err != nil {
errs = append(errs, errors.Wrapf(err, "failed to create new Machine for infraMachine %q in namespace %q", infraMachine.GetName(), infraMachine.GetNamespace()))
Expand All @@ -432,14 +457,19 @@ func (r *MachinePoolReconciler) createOrUpdateMachines(ctx context.Context, mp *

// computeDesiredMachine constructs the desired Machine for an infraMachine.
// If the Machine exists, it ensures the Machine always owned by the MachinePool.
func computeDesiredMachine(mp *expv1.MachinePool, infraMachine *unstructured.Unstructured, existingMachine *clusterv1.Machine) *clusterv1.Machine {
func (r *MachinePoolReconciler) computeDesiredMachine(mp *expv1.MachinePool, infraMachine *unstructured.Unstructured, existingMachine *clusterv1.Machine, existingNode *corev1.Node) *clusterv1.Machine {
infraRef := corev1.ObjectReference{
APIVersion: infraMachine.GetAPIVersion(),
Kind: infraMachine.GetKind(),
Name: infraMachine.GetName(),
Namespace: infraMachine.GetNamespace(),
}

var kubernetesVersion *string
if existingNode != nil && existingNode.Status.NodeInfo.KubeletVersion != "" {
kubernetesVersion = &existingNode.Status.NodeInfo.KubeletVersion
}

machine := &clusterv1.Machine{
ObjectMeta: metav1.ObjectMeta{
Name: infraMachine.GetName(),
Expand All @@ -452,6 +482,7 @@ func computeDesiredMachine(mp *expv1.MachinePool, infraMachine *unstructured.Uns
Spec: clusterv1.MachineSpec{
ClusterName: mp.Spec.ClusterName,
InfrastructureRef: infraRef,
Version: kubernetesVersion,
},
}

Expand Down Expand Up @@ -537,3 +568,29 @@ func (r *MachinePoolReconciler) waitForMachineCreation(ctx context.Context, mach

return nil
}

func (r *MachinePoolReconciler) getNodeRefMap(ctx context.Context, c client.Client) (map[string]*corev1.Node, error) {
log := ctrl.LoggerFrom(ctx)
nodeRefsMap := make(map[string]*corev1.Node)
nodeList := corev1.NodeList{}
for {
if err := c.List(ctx, &nodeList, client.Continue(nodeList.Continue)); err != nil {
return nil, err
}

for _, node := range nodeList.Items {
if node.Spec.ProviderID == "" {
log.V(2).Info("No ProviderID detected, skipping", "providerID", node.Spec.ProviderID)
continue
}

nodeRefsMap[node.Spec.ProviderID] = &node
}

if nodeList.Continue == "" {
break
}
}

return nodeRefsMap, nil
}
Loading

0 comments on commit 5a04c4d

Please sign in to comment.