Skip to content

Commit

Permalink
controllers: Add Egress watcher
Browse files Browse the repository at this point in the history
This commit adds Egress watcher to let coild to watch Egress resource
to update the configuration for the existing NAT clients, following the
change of the Egress. Each coild watches and updates the configuration
for the Pods scheduled on the same node.

Signed-off-by: Yusuke Suzuki <[email protected]>
  • Loading branch information
ysksuzuki committed Oct 1, 2023
1 parent f7fd5e0 commit 972b5cb
Show file tree
Hide file tree
Showing 9 changed files with 394 additions and 2 deletions.
1 change: 1 addition & 0 deletions v2/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ config/rbac/coild_role.yaml: $(COILD_DEPENDS)
-rm -rf work
mkdir work
sed '0,/^package/s/.*/package work/' controllers/blockrequest_watcher.go > work/blockrequest_watcher.go
sed '0,/^package/s/.*/package work/' controllers/egress_watcher.go > work/egress_watcher.go
sed '0,/^package/s/.*/package work/' pkg/ipam/node.go > work/node.go
sed '0,/^package/s/.*/package work/' runners/coild_server.go > work/coild_server.go
$(CONTROLLER_GEN) rbac:roleName=coild paths=./work output:stdout > $@
Expand Down
17 changes: 16 additions & 1 deletion v2/cmd/coild/sub/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
coilv2 "github.com/cybozu-go/coil/v2/api/v2"
"github.com/cybozu-go/coil/v2/controllers"
"github.com/cybozu-go/coil/v2/pkg/constants"
"github.com/cybozu-go/coil/v2/pkg/indexing"
"github.com/cybozu-go/coil/v2/pkg/ipam"
"github.com/cybozu-go/coil/v2/pkg/nodenet"
"github.com/cybozu-go/coil/v2/runners"
Expand Down Expand Up @@ -123,8 +124,22 @@ func subMain() error {
return err
}

egressWatcher := &controllers.EgressWatcher{
Client: mgr.GetClient(),
NodeName: nodeName,
PodNet: podNet,
EgressPort: config.egressPort,
}
if err := egressWatcher.SetupWithManager(mgr); err != nil {
return err
}
ctx2 := ctrl.SetupSignalHandler()
if err := indexing.SetupIndexForPodByNodeName(ctx2, mgr); err != nil {
return err
}

setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
if err := mgr.Start(ctx2); err != nil {
setupLog.Error(err, "problem running manager")
return err
}
Expand Down
2 changes: 2 additions & 0 deletions v2/config/rbac/coild_role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ rules:
- pods
verbs:
- get
- list
- watch
- apiGroups:
- coil.cybozu.com
resources:
Expand Down
198 changes: 198 additions & 0 deletions v2/controllers/egress_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package controllers

import (
"context"
"errors"
"fmt"
"github.com/go-logr/logr"
"net"
"strings"

coilv2 "github.com/cybozu-go/coil/v2/api/v2"
"github.com/cybozu-go/coil/v2/pkg/constants"
"github.com/cybozu-go/coil/v2/pkg/founat"
"github.com/cybozu-go/coil/v2/pkg/nodenet"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

type EgressWatcher struct {
client.Client
NodeName string
PodNet nodenet.PodNetwork
EgressPort int
}

// +kubebuilder:rbac:groups=coil.cybozu.com,resources=egresses,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch

// Reconcile implements Reconciler interface.
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *EgressWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)

logger.Info("start reconciling egress")
eg := &coilv2.Egress{}
if err := r.Get(ctx, req.NamespacedName, eg); err != nil {
if apierrors.IsNotFound(err) {
return ctrl.Result{}, nil
}
logger.Error(err, "failed to get egress")
return ctrl.Result{}, err
}
if eg.DeletionTimestamp != nil {
return ctrl.Result{}, nil
}

pods := &corev1.PodList{}
err := r.Client.List(ctx, pods, client.MatchingFields{
constants.PodNodeNameKey: r.NodeName,
})
if err != nil {
logger.Error(err, "failed to list Pod")
return ctrl.Result{}, err
}

for _, pod := range pods.Items {
for k, v := range pod.Annotations {
if !strings.HasPrefix(k, constants.AnnEgressPrefix) {
continue
}

if k[len(constants.AnnEgressPrefix):] != eg.Namespace {
continue
}

// shortcut for the most typical case
if v == eg.Name {
// Do reconcile
if err := r.reconcileEgressClient(ctx, eg, &pod, &logger); err != nil {
logger.Error(err, "failed to reconcile Egress client pod")
return ctrl.Result{}, err
}
continue
}

for _, n := range strings.Split(v, ",") {
if n == eg.Name {
if err := r.reconcileEgressClient(ctx, eg, &pod, &logger); err != nil {
logger.Error(err, "failed to reconcile Egress client pod")
return ctrl.Result{}, err
}
continue
}
}
}
}
return ctrl.Result{}, nil
}

func (r *EgressWatcher) reconcileEgressClient(ctx context.Context, eg *coilv2.Egress, pod *corev1.Pod, logger *logr.Logger) error {
hook, err := r.getHook(ctx, eg, logger)
if err != nil {
return fmt.Errorf("failed to setup NAT hook: %w", err)
}

var ipv4, ipv6 net.IP
for _, podIP := range pod.Status.PodIPs {
ip := net.ParseIP(podIP.IP)
if ip.To4() != nil {
ipv4 = ip.To4()
continue
}
if ip.To16() != nil {
ipv6 = ip.To16()
}
}
if err := r.PodNet.Update(ipv4, ipv6, hook); err != nil {
return fmt.Errorf("failed to update NAT configuration: %w", err)
}

return nil
}

type gwNets struct {
gateway net.IP
networks []*net.IPNet
sportAuto bool
}

func (r *EgressWatcher) getHook(ctx context.Context, eg *coilv2.Egress, logger *logr.Logger) (nodenet.SetupHook, error) {
var gw gwNets
svc := &corev1.Service{}

if err := r.Get(ctx, client.ObjectKey{Namespace: eg.Namespace, Name: eg.Name}, svc); err != nil {
return nil, err
}

// as of k8s 1.19, dual stack Service is alpha and will be re-written
// in 1.20. So, we cannot use dual stack services.
svcIP := net.ParseIP(svc.Spec.ClusterIP)
if svcIP == nil {
return nil, fmt.Errorf("invalid ClusterIP in Service %s %s", eg.Name, svc.Spec.ClusterIP)
}
var subnets []*net.IPNet
if ip4 := svcIP.To4(); ip4 != nil {
svcIP = ip4
for _, sn := range eg.Spec.Destinations {
_, subnet, err := net.ParseCIDR(sn)
if err != nil {
return nil, fmt.Errorf("invalid network in Egress %s", eg.Name)
}
if subnet.IP.To4() != nil {
subnets = append(subnets, subnet)
}
}
} else {
for _, sn := range eg.Spec.Destinations {
_, subnet, err := net.ParseCIDR(sn)
if err != nil {
return nil, fmt.Errorf("invalid network in Egress %s", eg.Name)
}
if subnet.IP.To4() == nil {
subnets = append(subnets, subnet)
}
}
}

if len(subnets) > 0 {
gw = gwNets{gateway: svcIP, networks: subnets, sportAuto: eg.Spec.FouSourcePortAuto}
return r.hook(gw, logger), nil
}

return nil, nil
}

func (r *EgressWatcher) hook(gwn gwNets, log *logr.Logger) func(ipv4, ipv6 net.IP) error {
return func(ipv4, ipv6 net.IP) error {
// We assume that coild already has configured NAT for the client,
// so we don't need to call Init functions here.
ft := founat.NewFoUTunnel(r.EgressPort, ipv4, ipv6)
cl := founat.NewNatClient(ipv4, ipv6, nil)

link, err := ft.AddPeer(gwn.gateway, gwn.sportAuto)
if errors.Is(err, founat.ErrIPFamilyMismatch) {
// ignore unsupported IP family link
log.Info("ignored unsupported gateway", "gw", gwn.gateway)
return nil
}
if err != nil {
return err
}
if err := cl.AddEgress(link, gwn.networks); err != nil {
return err
}

return nil
}
}

// SetupWithManager registers this with the manager.
func (r *EgressWatcher) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&coilv2.Egress{}).
Complete(r)
}
Loading

0 comments on commit 972b5cb

Please sign in to comment.