Skip to content

Commit

Permalink
Merge pull request #280 from Revolyssup/multicontext
Browse files Browse the repository at this point in the history
Multi context refactor
  • Loading branch information
Revolyssup authored Jun 11, 2022
2 parents 0628edf + 406f1bb commit 01c2751
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 83 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ replace (
)

require (
github.com/layer5io/meshery-adapter-library v0.5.4
github.com/layer5io/meshery-adapter-library v0.5.6
github.com/layer5io/meshkit v0.5.20
github.com/layer5io/service-mesh-performance v0.3.4
gopkg.in/yaml.v2 v2.4.0
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,12 @@ github.com/layer5io/learn-layer5/smi-conformance v0.0.0-20210317075357-06b4f88b3
github.com/layer5io/learn-layer5/smi-conformance v0.0.0-20210317075357-06b4f88b3e34/go.mod h1:BQPLwdJt7v7y0fXIejI4whR9zMyX07Wjt5xrbgEmHLw=
github.com/layer5io/meshery-adapter-library v0.5.4 h1:QQ+nVGHd7KhV58KhY40V00kC+IEM4+AlOhQcSHSbOUE=
github.com/layer5io/meshery-adapter-library v0.5.4/go.mod h1:YmLV0w6ucBagrqUB0x9q8ZVXrhN1tJBP5j+Pu6LOY/M=
github.com/layer5io/meshery-adapter-library v0.5.5 h1:4dGsHBDCLnkOOA/RaUM5n4bPdEdySREnkd3raU9LSDI=
github.com/layer5io/meshery-adapter-library v0.5.5/go.mod h1:YmLV0w6ucBagrqUB0x9q8ZVXrhN1tJBP5j+Pu6LOY/M=
github.com/layer5io/meshery-adapter-library v0.5.6 h1:pbZTMkWNcGWPk314K7WhO4UGVxSnKvGLmwQXBWZ05GI=
github.com/layer5io/meshery-adapter-library v0.5.6/go.mod h1:YmLV0w6ucBagrqUB0x9q8ZVXrhN1tJBP5j+Pu6LOY/M=
github.com/layer5io/meshkit v0.5.16/go.mod h1:tj5TAjty7T/WJ8YvlDfOZF94t4g3mhWuKBCc6MOUoNU=
github.com/layer5io/meshkit v0.5.18/go.mod h1:tj5TAjty7T/WJ8YvlDfOZF94t4g3mhWuKBCc6MOUoNU=
github.com/layer5io/meshkit v0.5.20 h1:QpN/SEepUZk+Jj2K4TBRZJCRr/pzuvHqDaUr30vWddI=
github.com/layer5io/meshkit v0.5.20/go.mod h1:EUfXIcztap9Dh0Ao3Dmoxf3FMsm4h7zFHGwagj+5ra4=
github.com/layer5io/service-mesh-performance v0.3.2-0.20210122142912-a94e0658b021/go.mod h1:W153amv8aHAeIWxO7b7d7Vibt9RhaEVh4Uh+RG+BumQ=
Expand Down
4 changes: 2 additions & 2 deletions kuma/custom_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package kuma

import "github.com/layer5io/meshery-adapter-library/status"

func (kuma *Kuma) applyCustomOperation(namespace string, manifest string, isDel bool) (string, error) {
func (kuma *Kuma) applyCustomOperation(namespace string, manifest string, isDel bool, kubeconfigs []string) (string, error) {
st := status.Starting

err := kuma.applyManifest(isDel, namespace, []byte(manifest))
err := kuma.applyManifest(isDel, namespace, []byte(manifest), kubeconfigs)
if err != nil {
return st, ErrCustomOperation(err)
}
Expand Down
156 changes: 123 additions & 33 deletions kuma/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,67 @@ import (
"os/exec"
"path"
"runtime"
"sync"

"github.com/layer5io/meshery-adapter-library/adapter"
"github.com/layer5io/meshery-adapter-library/status"
"github.com/layer5io/meshery-kuma/internal/config"
"github.com/layer5io/meshkit/models"
mesherykube "github.com/layer5io/meshkit/utils/kubernetes"
"gopkg.in/yaml.v2"
)

const (
kumaRepository = "https://kumahq.github.io/charts"
kumaChartName = "kuma"
)

func (kuma *Kuma) installKuma(del bool, useManifest bool, namespace string, version string) (string, error) {
//CreateKubeconfigs creates and writes passed kubeconfig onto the filesystem
func (kuma *Kuma) CreateKubeconfigs(kubeconfigs []string) error {
var errs = make([]error, 0)
for _, kubeconfig := range kubeconfigs {
kconfig := models.Kubeconfig{}
err := yaml.Unmarshal([]byte(kubeconfig), &kconfig)
if err != nil {
errs = append(errs, err)
continue
}

// To have control over what exactly to take in on kubeconfig
kuma.KubeconfigHandler.SetKey("kind", kconfig.Kind)
kuma.KubeconfigHandler.SetKey("apiVersion", kconfig.APIVersion)
kuma.KubeconfigHandler.SetKey("current-context", kconfig.CurrentContext)
err = kuma.KubeconfigHandler.SetObject("preferences", kconfig.Preferences)
if err != nil {
errs = append(errs, err)
continue
}

err = kuma.KubeconfigHandler.SetObject("clusters", kconfig.Clusters)
if err != nil {
errs = append(errs, err)
continue
}

err = kuma.KubeconfigHandler.SetObject("users", kconfig.Users)
if err != nil {
errs = append(errs, err)
continue
}

err = kuma.KubeconfigHandler.SetObject("contexts", kconfig.Contexts)
if err != nil {
errs = append(errs, err)
continue
}
}
if len(errs) == 0 {
return nil
}
return mergeErrors(errs)
}

func (kuma *Kuma) installKuma(del bool, useManifest bool, namespace string, version string, kubeconfigs []string) (string, error) {
st := status.Installing

if del {
Expand All @@ -35,27 +83,27 @@ func (kuma *Kuma) installKuma(del bool, useManifest bool, namespace string, vers
return st, ErrMeshConfig(err)
}
if useManifest {
return kuma.installUsingManifests(del, st, namespace, version)
return kuma.installUsingManifests(del, st, namespace, version, kubeconfigs)
}
kuma.Log.Info("Installing kuma using helm charts...")
err = kuma.applyHelmChart(del, version, namespace)
err = kuma.applyHelmChart(del, version, namespace, kubeconfigs)
if err != nil {

kuma.Log.Info("Failed helm installation. ", err)
kuma.Log.Info("Trying installing from manifests...")
return kuma.installUsingManifests(del, st, namespace, version)
return kuma.installUsingManifests(del, st, namespace, version, kubeconfigs)
}
return status.Installed, nil
}
func (kuma *Kuma) installUsingManifests(del bool, st string, namespace string, version string) (string, error) {
func (kuma *Kuma) installUsingManifests(del bool, st string, namespace string, version string, kubeconfigs []string) (string, error) {
kuma.Log.Info("Installing kuma using manifests...")
manifest, err := kuma.fetchManifest(version)
if err != nil {
kuma.Log.Error(ErrInstallKuma(err))
return st, ErrInstallKuma(err)
}

err = kuma.applyManifest(del, namespace, []byte(manifest))
err = kuma.applyManifest(del, namespace, []byte(manifest), kubeconfigs)
if err != nil {
kuma.Log.Error(ErrInstallKuma(err))
return st, ErrInstallKuma(err)
Expand All @@ -66,11 +114,7 @@ func (kuma *Kuma) installUsingManifests(del bool, st string, namespace string, v
}
return status.Installed, nil
}
func (kuma *Kuma) applyHelmChart(del bool, version, namespace string) error {
kClient := kuma.MesheryKubeclient
if kClient == nil {
return ErrNilClient
}
func (kuma *Kuma) applyHelmChart(del bool, version, namespace string, kubeconfigs []string) error {
chartVersion, err := mesherykube.HelmAppVersionToChartVersion(
kumaRepository,
kumaChartName,
Expand All @@ -85,19 +129,43 @@ func (kuma *Kuma) applyHelmChart(del bool, version, namespace string) error {
} else {
act = mesherykube.INSTALL
}
err = kClient.ApplyHelmChart(mesherykube.ApplyHelmChartConfig{
ChartLocation: mesherykube.HelmChartLocation{
Repository: kumaRepository,
Chart: kumaChartName,
Version: chartVersion,
},
Namespace: namespace,
Action: act,
CreateNamespace: true,
ReleaseName: kumaChartName,
})
if err != nil {
return ErrApplyHelmChart(err)
var errs []error
var wg sync.WaitGroup
var errMx sync.Mutex
for _, kubeconfig := range kubeconfigs {
wg.Add(1)
go func(kubeconfig string) {
defer wg.Done()
kClient, err := mesherykube.New([]byte(kubeconfig))
if err != nil {
errMx.Lock()
errs = append(errs, err)
errMx.Unlock()
return
}
err = kClient.ApplyHelmChart(mesherykube.ApplyHelmChartConfig{
ChartLocation: mesherykube.HelmChartLocation{
Repository: kumaRepository,
Chart: kumaChartName,
Version: chartVersion,
},
Namespace: namespace,
Action: act,
CreateNamespace: true,
ReleaseName: kumaChartName,
})
if err != nil {
errMx.Lock()
errs = append(errs, err)
errMx.Unlock()
return
}
}(kubeconfig)
}
wg.Wait()

if len(errs) != 0 {
return ErrApplyHelmChart(mergeErrors(errs))
}
return nil
}
Expand Down Expand Up @@ -127,17 +195,39 @@ func (kuma *Kuma) fetchManifest(version string) (string, error) {
return out.String(), nil
}

func (kuma *Kuma) applyManifest(del bool, namespace string, contents []byte) error {
func (kuma *Kuma) applyManifest(del bool, namespace string, contents []byte, kubeconfigs []string) error {
var errs []error
var wg sync.WaitGroup
var errMx sync.Mutex
for _, kubconfig := range kubeconfigs {
wg.Add(1)
go func(kubconfig string) {
defer wg.Done()
kClient, err := mesherykube.New([]byte(kubconfig))
if err != nil {
errMx.Lock()
errs = append(errs, err)
errMx.Unlock()
return
}
err = kClient.ApplyManifest(contents, mesherykube.ApplyOptions{
Namespace: namespace,
Update: true,
Delete: del,
})
if err != nil {
errMx.Lock()
errs = append(errs, err)
errMx.Unlock()
return
}
}(kubconfig)

err := kuma.MesheryKubeclient.ApplyManifest(contents, mesherykube.ApplyOptions{
Namespace: namespace,
Update: true,
Delete: del,
})
if err != nil {
return err
}

wg.Wait()
if len(errs) != 0 {
return mergeErrors(errs)
}
return nil
}

Expand Down
34 changes: 23 additions & 11 deletions kuma/kuma.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,15 @@ func New(c meshkitCfg.Handler, l logger.Handler, kc meshkitCfg.Handler) adapter.
}

// ApplyOperation applies the operation on kuma
func (kuma *Kuma) ApplyOperation(ctx context.Context, opReq adapter.OperationRequest) error {

func (kuma *Kuma) ApplyOperation(ctx context.Context, opReq adapter.OperationRequest, hchan *chan interface{}) error {
err := kuma.CreateKubeconfigs(opReq.K8sConfigs)
if err != nil {
return err
}
kuma.SetChannel(hchan)
kubeconfigs := opReq.K8sConfigs
operations := adapter.Operations{}
err := kuma.Config.GetObject(adapter.OperationsKey, &operations)
err = kuma.Config.GetObject(adapter.OperationsKey, &operations)
if err != nil {
return err
}
Expand All @@ -55,7 +60,7 @@ func (kuma *Kuma) ApplyOperation(ctx context.Context, opReq adapter.OperationReq
case internalconfig.KumaOperation:
go func(hh *Kuma, ee *adapter.Event) {
version := string(operations[opReq.OperationName].Versions[0])
stat, err := hh.installKuma(opReq.IsDeleteOperation, false, opReq.Namespace, version)
stat, err := hh.installKuma(opReq.IsDeleteOperation, false, opReq.Namespace, version, kubeconfigs)
if err != nil {
e.Summary = fmt.Sprintf("Error while %s Kuma service mesh", stat)
e.Details = err.Error()
Expand All @@ -69,7 +74,7 @@ func (kuma *Kuma) ApplyOperation(ctx context.Context, opReq adapter.OperationReq
case common.BookInfoOperation, common.HTTPBinOperation, common.ImageHubOperation, common.EmojiVotoOperation:
go func(hh *Kuma, ee *adapter.Event) {
appName := operations[opReq.OperationName].AdditionalProperties[common.ServiceName]
stat, err := hh.installSampleApp(opReq.IsDeleteOperation, opReq.Namespace, operations[opReq.OperationName].Templates)
stat, err := hh.installSampleApp(opReq.IsDeleteOperation, opReq.Namespace, operations[opReq.OperationName].Templates, kubeconfigs)
if err != nil {
e.Summary = fmt.Sprintf("Error while %s %s application", stat, appName)
e.Details = err.Error()
Expand All @@ -86,6 +91,7 @@ func (kuma *Kuma) ApplyOperation(ctx context.Context, opReq adapter.OperationReq
_, err := hh.RunSMITest(adapter.SMITestOptions{
Ctx: context.TODO(),
OperationID: ee.Operationid,
Kubeconfigs: kubeconfigs,
Manifest: string(operations[opReq.OperationName].Templates[0]),
Namespace: "meshery",
Labels: map[string]string{
Expand All @@ -102,7 +108,7 @@ func (kuma *Kuma) ApplyOperation(ctx context.Context, opReq adapter.OperationReq
}(kuma, e)
case common.CustomOperation:
go func(hh *Kuma, ee *adapter.Event) {
stat, err := hh.applyCustomOperation(opReq.Namespace, opReq.CustomBody, opReq.IsDeleteOperation)
stat, err := hh.applyCustomOperation(opReq.Namespace, opReq.CustomBody, opReq.IsDeleteOperation, kubeconfigs)
if err != nil {
e.Summary = fmt.Sprintf("Error while %s custom operation", stat)
e.Details = err.Error()
Expand All @@ -121,7 +127,13 @@ func (kuma *Kuma) ApplyOperation(ctx context.Context, opReq adapter.OperationReq
}

// ProcessOAM will handles the grpc invocation for handling OAM objects
func (kuma *Kuma) ProcessOAM(ctx context.Context, oamReq adapter.OAMRequest) (string, error) {
func (kuma *Kuma) ProcessOAM(ctx context.Context, oamReq adapter.OAMRequest, hchan *chan interface{}) (string, error) {
err := kuma.CreateKubeconfigs(oamReq.K8sConfigs)
if err != nil {
return "", err
}
kuma.SetChannel(hchan)
kubeconfigs := oamReq.K8sConfigs
var comps []v1alpha1.Component
for _, acomp := range oamReq.OamComps {
comp, err := oam.ParseApplicationComponent(acomp)
Expand All @@ -141,13 +153,13 @@ func (kuma *Kuma) ProcessOAM(ctx context.Context, oamReq adapter.OAMRequest) (st
// If operation is delete then first HandleConfiguration and then handle the deployment
if oamReq.DeleteOp {
// Process configuration
msg2, err := kuma.HandleApplicationConfiguration(config, oamReq.DeleteOp)
msg2, err := kuma.HandleApplicationConfiguration(config, oamReq.DeleteOp, kubeconfigs)
if err != nil {
return msg2, ErrProcessOAM(err)
}

// Process components
msg1, err := kuma.HandleComponents(comps, oamReq.DeleteOp)
msg1, err := kuma.HandleComponents(comps, oamReq.DeleteOp, kubeconfigs)
if err != nil {
return msg1 + "\n" + msg2, ErrProcessOAM(err)
}
Expand All @@ -156,13 +168,13 @@ func (kuma *Kuma) ProcessOAM(ctx context.Context, oamReq adapter.OAMRequest) (st
}

// Process components
msg1, err := kuma.HandleComponents(comps, oamReq.DeleteOp)
msg1, err := kuma.HandleComponents(comps, oamReq.DeleteOp, kubeconfigs)
if err != nil {
return msg1, ErrProcessOAM(err)
}

// Process configuration
msg2, err := kuma.HandleApplicationConfiguration(config, oamReq.DeleteOp)
msg2, err := kuma.HandleApplicationConfiguration(config, oamReq.DeleteOp, kubeconfigs)
if err != nil {
return msg1 + "\n" + msg2, ErrProcessOAM(err)
}
Expand Down
Loading

0 comments on commit 01c2751

Please sign in to comment.