diff --git a/apistructs/addon.go b/apistructs/addon.go index c69c343961e..af85ba1c111 100644 --- a/apistructs/addon.go +++ b/apistructs/addon.go @@ -327,6 +327,8 @@ const ( AddonCloudGateway = "alicloud-gateway" // sourcecov code coverage agent AddonSourcecov = "sourcecov" + // influxdb + AddonInfluxDB = "influxdb" OriginalReplicas = "original_replicas" diff --git a/go.mod b/go.mod index 5415d12dd80..0c6b8abc1b0 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/IBM/sarama v1.43.2 github.com/Masterminds/semver v1.5.0 github.com/WeiZhang555/tabwriter v0.0.0-20200115015932-e5c45f4da38d + github.com/agiledragon/gomonkey/v2 v2.12.0 github.com/ahmetb/go-linq/v3 v3.2.0 github.com/alecthomas/assert v0.0.0-20170929043011-405dbfeb8e38 github.com/alibabacloud-go/bailian-20230601 v1.0.0 diff --git a/go.sum b/go.sum index 677b8afd853..36e72c15c0b 100644 --- a/go.sum +++ b/go.sum @@ -852,6 +852,8 @@ github.com/actgardner/gogen-avro/v10 v10.1.0/go.mod h1:o+ybmVjEa27AAr35FRqU98DJu github.com/actgardner/gogen-avro/v10 v10.2.1/go.mod h1:QUhjeHPchheYmMDni/Nx7VB0RsT/ee8YIgGY/xpEQgQ= github.com/actgardner/gogen-avro/v9 v9.1.0/go.mod h1:nyTj6wPqDJoxM3qdnjcLv+EnMDSDFqE0qDpva2QRmKc= github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= +github.com/agiledragon/gomonkey/v2 v2.12.0 h1:ek0dYu9K1rSV+TgkW5LvNNPRWyDZVIxGMCFI6Pz9o38= +github.com/agiledragon/gomonkey/v2 v2.12.0/go.mod h1:ap1AmDzcVOAz1YpeJ3TCzIgstoaWLA6jbbgxfB4w2iY= github.com/agrison/go-tablib v0.0.0-20160310143025-4930582c22ee h1:0RklYSvekYaIFI9JUx7TFPQvo++TdILmZiV17QI4nXk= github.com/agrison/go-tablib v0.0.0-20160310143025-4930582c22ee/go.mod h1:M9nmO4lBRWR/bBv7UCOmDJ1MB2DVoqz19B4JchDA+K0= github.com/agrison/mxj v0.0.0-20160310142625-1269f8afb3b4 h1:XBNSe5eibe5Fh131ah+xnO6s4A97U1T3tKZKLQQvqu0= diff --git a/internal/tools/orchestrator/scheduler/executor/plugins/k8s/statefulset.go b/internal/tools/orchestrator/scheduler/executor/plugins/k8s/statefulset.go index 2d0542c86c8..25a1883f109 100644 --- a/internal/tools/orchestrator/scheduler/executor/plugins/k8s/statefulset.go +++ b/internal/tools/orchestrator/scheduler/executor/plugins/k8s/statefulset.go @@ -217,10 +217,6 @@ func (k *Kubernetes) createStatefulSet(ctx context.Context, info types.Statefuls } //setEnv(container, info.envs, info.sg, info.namespace) - if info.Namespace == "fake-test" { - return nil - } - SetPodAnnotationsBaseContainerEnvs(set.Spec.Template.Spec.Containers[0], set.Spec.Template.Annotations) return k.sts.Create(set) diff --git a/internal/tools/orchestrator/scheduler/executor/plugins/k8s/statefulset_test.go b/internal/tools/orchestrator/scheduler/executor/plugins/k8s/statefulset_test.go index f840e458767..53a153c8730 100644 --- a/internal/tools/orchestrator/scheduler/executor/plugins/k8s/statefulset_test.go +++ b/internal/tools/orchestrator/scheduler/executor/plugins/k8s/statefulset_test.go @@ -35,10 +35,8 @@ import ( "github.com/erda-project/erda/internal/tools/orchestrator/scheduler/executor/plugins/k8s/k8sservice" "github.com/erda-project/erda/internal/tools/orchestrator/scheduler/executor/plugins/k8s/namespace" "github.com/erda-project/erda/internal/tools/orchestrator/scheduler/executor/plugins/k8s/persistentvolumeclaim" - "github.com/erda-project/erda/internal/tools/orchestrator/scheduler/executor/plugins/k8s/secret" "github.com/erda-project/erda/internal/tools/orchestrator/scheduler/executor/plugins/k8s/statefulset" "github.com/erda-project/erda/internal/tools/orchestrator/scheduler/executor/plugins/k8s/toleration" - "github.com/erda-project/erda/internal/tools/orchestrator/scheduler/executor/plugins/k8s/types" "github.com/erda-project/erda/pkg/http/httpclient" "github.com/erda-project/erda/pkg/parser/diceyml" "github.com/erda-project/erda/pkg/strutil" @@ -295,79 +293,6 @@ func TestParseJobSpecTemplate(t *testing.T) { assert.Equal(t, clusterInfo["MOUNTPOINT_PATH"], newPath) } -func TestCreateStatefulSet(t *testing.T) { - kubernetes := &Kubernetes{ - secret: &secret.Secret{}, - } - - defer monkey.UnpatchAll() - - monkey.PatchInstanceMethod(reflect.TypeOf(kubernetes.secret), "Get", func(sec *secret.Secret, namespace, name string) (*apiv1.Secret, error) { - b := []byte{} - return &apiv1.Secret{Data: map[string][]byte{".dockerconfigjson": b}}, nil - }) - - info := types.StatefulsetInfo{ - Sg: &apistructs.ServiceGroup{ - Dice: apistructs.Dice{ - ID: "fake-test-dice", - Type: "addon", - Labels: nil, - Services: []apistructs.Service{ - { - Name: "fake-test-service", - Namespace: "fake-test", - Image: "", - ImageUsername: "", - ImagePassword: "", - Cmd: "", - Ports: nil, - ProxyPorts: nil, - Vip: "", - ShortVIP: "", - ProxyIp: "", - PublicIp: "", - Scale: 0, - Resources: apistructs.Resources{ - Cpu: 100, - Mem: 200, - Disk: 0, - }, - Depends: nil, - Env: nil, - Labels: map[string]string{"ADDON_GROUP_ID": "11111111"}, - DeploymentLabels: nil, - Selectors: nil, - Binds: nil, - Volumes: nil, - Hosts: nil, - HealthCheck: nil, - NewHealthCheck: nil, - SideCars: nil, - InitContainer: nil, - InstanceInfos: nil, - MeshEnable: nil, - TrafficSecurity: diceyml.TrafficSecurity{}, - WorkLoad: "", - ProjectServiceName: "", - K8SSnippet: nil, - StatusDesc: apistructs.StatusDesc{}, - }, - }, - ServiceDiscoveryKind: "", - ServiceDiscoveryMode: "", - ProjectNamespace: "", - }, - }, - Namespace: "fake-test", - Envs: map[string]string{}, - Annotations: map[string]string{}, - } - - err := kubernetes.createStatefulSet(context.Background(), info) - assert.Nil(t, err) -} - func Test_scaleStatefulSet(t *testing.T) { k := &Kubernetes{ sts: &statefulset.StatefulSet{}, diff --git a/internal/tools/orchestrator/services/addon/addon_deploy.go b/internal/tools/orchestrator/services/addon/addon_deploy.go index 038eaebd888..b0552646c4f 100644 --- a/internal/tools/orchestrator/services/addon/addon_deploy.go +++ b/internal/tools/orchestrator/services/addon/addon_deploy.go @@ -95,6 +95,8 @@ func (a *Addon) GetAddonResourceStatus(addonIns *dbclient.AddonInstance, case apistructs.AddonSourcecov: asm := &SourcecovAddonManagement{bdl: a.bdl, org: a.org} configMap, err = asm.DeployStatus(addonIns, &serviceGroup) + case apistructs.AddonInfluxDB: + configMap, err = a.InfluxDBDeployStatus(addonIns, &serviceGroup) default: // 非基础addon,走通用的处理逻辑 configMap, err = a.CommonDeployStatus(addonIns, &serviceGroup, addonDice, addonSpec) @@ -638,6 +640,8 @@ func (a *Addon) BuildAddonRequestGroup(params *apistructs.AddonHandlerCreateItem } sam := &SourcecovAddonManagement{bdl: a.bdl, org: a.org} buildErr = sam.BuildSourceCovServiceItem(params, addonIns, addonSpec, addonDice, &clusterInfo) + case apistructs.AddonInfluxDB: + buildErr = a.BuildInfluxDBServiceItem(params, addonIns, addonSpec, addonDice) default: //default case buildErr = a.BuildCommonServiceItem(params, addonIns, addonSpec, addonDice, &clusterInfo) } diff --git a/internal/tools/orchestrator/services/addon/addon_influxdb.go b/internal/tools/orchestrator/services/addon/addon_influxdb.go new file mode 100644 index 00000000000..b8e77dbd6b0 --- /dev/null +++ b/internal/tools/orchestrator/services/addon/addon_influxdb.go @@ -0,0 +1,176 @@ +// Copyright (c) 2021 Terminus, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package addon + +import ( + "crypto/md5" + "fmt" + "strconv" + "strings" + "time" + + "github.com/pkg/errors" + + "github.com/erda-project/erda/apistructs" + "github.com/erda-project/erda/internal/tools/orchestrator/dbclient" + "github.com/erda-project/erda/pkg/parser/diceyml" +) + +const ( + InfluxDBDataPath = "/var/lib/influxdb2" + InfluxDBConfHost = "INFLUX_HOST" + InfluxDBConfUser = "INFLUX_USERNAME" + InfluxDBConfPassword = "INFLUX_PASSWORD" + InfluxDBConfORG = "INFLUX_ORG" + InfluxDBServicePort = "8086" +) + +const ( + InfluxDBKMSPasswordKey = "influxdb-password" +) + +const ( + InfluxDBInitPrefix = "DOCKER_INFLUXDB_INIT_" + + InfluxDBInitUserNameKey = InfluxDBInitPrefix + "USERNAME" + InfluxDBInitUserName = "influxdb" + InfluxDBInitPasswordKey = InfluxDBInitPrefix + "PASSWORD" + + // InfluxDBInitModeKey default init mode: setup + InfluxDBInitModeKey = InfluxDBInitPrefix + "MODE" + InfluxDBInitMode = "setup" + + // InfluxDBInitOrgKey erda project name -> influxdb org + InfluxDBInitOrgKey = InfluxDBInitPrefix + "ORG" + + // InfluxDBInitBucketKey erda application name -> influxdb bucket + InfluxDBInitBucketKey = InfluxDBInitPrefix + "BUCKET" + InfluxDBInitBucket = "erda" + + // InfluxDBInitRetentionKey default 1w + InfluxDBInitRetentionKey = InfluxDBInitPrefix + "RETENTION" + InfluxDBInitRetention = "1w" +) + +const ( + InfluxDBParamsOrg = "org" + InfluxDBParamsBucket = "bucket" + InfluxDBParamsRetention = "retention" +) + +// BuildInfluxDBServiceItem build influxdb service item +func (a *Addon) BuildInfluxDBServiceItem(params *apistructs.AddonHandlerCreateItem, addonIns *dbclient.AddonInstance, + addonSpec *apistructs.AddonExtension, addonDice *diceyml.Object) error { + addonDeployPlan := addonSpec.Plan[params.Plan] + serviceMap := diceyml.Services{} + + // InfluxDB support 1 node only now. + serviceItem := *addonDice.Services[addonSpec.Name] + serviceItem.Resources = diceyml.Resources{ + CPU: addonDeployPlan.CPU, + MaxCPU: addonDeployPlan.MaxCPU, + Mem: addonDeployPlan.Mem, + MaxMem: addonDeployPlan.MaxMem, + } + + // init config render + if err := a.influxDBInitRender(params, addonIns, serviceItem.Envs); err != nil { + return err + } + + // label + if len(serviceItem.Labels) == 0 { + serviceItem.Labels = map[string]string{} + } + serviceItem.Labels["ADDON_GROUP_ID"] = addonSpec.Name + SetlabelsFromOptions(params.Options, serviceItem.Labels) + // binding data + vol := SetAddonVolumes(params.Options, InfluxDBDataPath, false) + serviceItem.Volumes = diceyml.Volumes{vol} + + serviceMap[strings.Join([]string{addonSpec.Name, strconv.Itoa(0)}, "-")] = &serviceItem + addonDice.Services = serviceMap + + return nil +} + +func (a *Addon) InfluxDBDeployStatus(addonIns *dbclient.AddonInstance, serviceGroup *apistructs.ServiceGroup) (map[string]string, error) { + password, err := a.db.GetByInstanceIDAndField(addonIns.ID, InfluxDBKMSPasswordKey) + if err != nil { + return nil, err + } + + configMap := map[string]string{} + if len(serviceGroup.Services) == 0 { + return nil, errors.New("service group is empty") + } + influxDBService := serviceGroup.Services[0] + configMap[InfluxDBConfHost] = fmt.Sprintf("http://%s:%s", influxDBService.Vip, InfluxDBServicePort) + configMap[InfluxDBConfUser] = InfluxDBInitUserName + configMap[InfluxDBConfORG] = genInfluxDBOrg(addonIns) + configMap[InfluxDBConfPassword] = password.Value + + return configMap, nil +} + +func (a *Addon) influxDBInitRender(params *apistructs.AddonHandlerCreateItem, + addonIns *dbclient.AddonInstance, serviceEnv diceyml.EnvMap) error { + // Init mode + serviceEnv[InfluxDBInitModeKey] = InfluxDBInitMode + + // Username + serviceEnv[InfluxDBInitUserNameKey] = InfluxDBInitUserName + + // Password + password, err := a.savePassword(addonIns, InfluxDBKMSPasswordKey) + if err != nil { + return err + } + serviceEnv[InfluxDBInitPasswordKey] = password + + // Org + org := params.Options[InfluxDBParamsOrg] + if org == "" { + org = genInfluxDBOrg(addonIns) + } + serviceEnv[InfluxDBInitOrgKey] = org + + // Bucket + bucket := params.Options[InfluxDBParamsBucket] + if bucket == "" { + bucket = InfluxDBInitBucket + } + serviceEnv[InfluxDBInitBucketKey] = bucket + + // Retention + retention := params.Options[InfluxDBParamsRetention] + if retention == "" { + retention = InfluxDBInitRetention + } else { + _, err := time.ParseDuration(retention) + if err != nil { + return errors.Wrapf(err, "failed to parse retention time %s", retention) + } + } + serviceEnv[InfluxDBInitRetentionKey] = retention + + return nil +} + +func genInfluxDBOrg(addonIns *dbclient.AddonInstance) string { + // ORG_PROJECT_WORKSPACE + influxDBOrgSource := fmt.Sprintf("%s_%s_%s", addonIns.OrgID, addonIns.ProjectID, addonIns.Workspace) + return fmt.Sprintf("%x", md5.Sum([]byte(influxDBOrgSource))) +} diff --git a/internal/tools/orchestrator/services/addon/addon_influxdb_test.go b/internal/tools/orchestrator/services/addon/addon_influxdb_test.go new file mode 100644 index 00000000000..f074efba8d1 --- /dev/null +++ b/internal/tools/orchestrator/services/addon/addon_influxdb_test.go @@ -0,0 +1,180 @@ +// Copyright (c) 2021 Terminus, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package addon + +import ( + "crypto/md5" + "encoding/hex" + "reflect" + "regexp" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/agiledragon/gomonkey/v2" + "github.com/jinzhu/gorm" + _ "github.com/jinzhu/gorm/dialects/mysql" + + "github.com/erda-project/erda/apistructs" + "github.com/erda-project/erda/bundle" + "github.com/erda-project/erda/internal/tools/orchestrator/dbclient" + "github.com/erda-project/erda/pkg/database/dbengine" + "github.com/erda-project/erda/pkg/kms/kmstypes" + "github.com/erda-project/erda/pkg/parser/diceyml" +) + +func InitAddonSQLMock(t *testing.T) (*gorm.DB, sqlmock.Sqlmock, func() error) { + sqlDB, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Failed to create sqlmock: %v", err) + } + + gormDB, err := gorm.Open("mysql", sqlDB) + if err != nil { + t.Fatalf("Failed to initialize GORM: %v", err) + } + return gormDB, mock, func() error { + return sqlDB.Close() + } +} + +const ( + instanceID = "e27cc10b88d31483bb901adc75881d589" + encryptPassword = "MDMyMTBhZTY1MDkxY2U4NDAzNGEzYzdhMjNkZGE2MTgxZWIwMTKc1hV+Bkh4n3UkqOhhIA6SOaqeAYMDx7vcFj1+p+fE7ZqhmFpku+XbKfan6w==" +) + +var ( + mockInfluxDBInstance = &dbclient.AddonInstance{ + ID: instanceID, + } + mockInfluxDBServiceGroup = &apistructs.ServiceGroup{ + Dice: apistructs.Dice{ + Services: []apistructs.Service{ + { + Vip: "influxdb.addon-influxdb--cb510e986b12d471c967be012987abb32.svc.cluster.local", + }, + }, + }, + } +) + +func TestDeployStatus(t *testing.T) { + gormDB, mock, closeFn := InitAddonSQLMock(t) + defer closeFn() + + a := Addon{ + db: &dbclient.DBClient{ + DBEngine: &dbengine.DBEngine{ + DB: gormDB, + }, + }, + } + + d := &dbclient.AddonInstanceExtra{ + ID: a.getRandomId(), + InstanceID: instanceID, + Field: InfluxDBKMSPasswordKey, + Value: encryptPassword, + Deleted: apistructs.AddonNotDeleted, + } + + mock.ExpectQuery("SELECT \\* FROM `tb_middle_instance_extra` WHERE \\(instance_id = \\?\\) AND \\(field = \\?\\) AND \\(is_deleted = \\?\\) ORDER BY `tb_middle_instance_extra`\\.`id` ASC LIMIT 1"). + WithArgs(instanceID, InfluxDBKMSPasswordKey, apistructs.AddonNotDeleted). + WillReturnRows(sqlmock.NewRows([]string{"id", "instance_id", "field", "is_deleted", "value"}). + AddRow("1", d.InstanceID, d.Field, apistructs.AddonNotDeleted, d.Value)) + + _, err := a.InfluxDBDeployStatus(mockInfluxDBInstance, mockInfluxDBServiceGroup) + if err != nil { + t.Fatal(err) + } +} + +func TestInfluxDBInitRender(t *testing.T) { + envMap := make(diceyml.EnvMap) + gormDB, mock, closeFn := InitAddonSQLMock(t) + defer closeFn() + + bdl := bundle.New(bundle.WithKMS()) + + a := Addon{ + bdl: bdl, + db: &dbclient.DBClient{ + DBEngine: &dbengine.DBEngine{ + DB: gormDB, + }, + }, + } + + patches := gomonkey.ApplyMethod(reflect.TypeOf(bdl), "KMSEncrypt", func(_ *bundle.Bundle, req apistructs.KMSEncryptRequest) (*kmstypes.EncryptResponse, error) { + return &kmstypes.EncryptResponse{ + KeyID: "fake", + CiphertextBase64: encryptPassword, + }, nil + }) + defer patches.Reset() + + instance := dbclient.AddonInstanceExtra{ + InstanceID: instanceID, + Field: InfluxDBKMSPasswordKey, + Value: encryptPassword, + Deleted: apistructs.AddonNotDeleted, + } + + mock.ExpectBegin() + mock.ExpectExec(regexp.QuoteMeta("INSERT INTO `tb_middle_instance_extra` (`id`,`instance_id`,`field`,`value`,`is_deleted`,`create_time`,`update_time`) VALUES (?,?,?,?,?,?,?)")). + WithArgs(sqlmock.AnyArg(), instance.InstanceID, instance.Field, instance.Value, instance.Deleted, sqlmock.AnyArg(), sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + err := a.influxDBInitRender(&apistructs.AddonHandlerCreateItem{ + Options: map[string]string{ + InfluxDBParamsBucket: "new_bucket", + }, + }, mockInfluxDBInstance, envMap) + if err != nil { + t.Fatal(err) + } +} + +func TestGenInfluxDBOrg(t *testing.T) { + tests := []struct { + name string + addonIns *dbclient.AddonInstance + expectedMD5 string + }{ + { + name: "Basic case", + addonIns: &dbclient.AddonInstance{ + OrgID: "org1", + ProjectID: "proj1", + Workspace: "dev", + }, + expectedMD5: calculateMD5("org1_proj1_dev"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := genInfluxDBOrg(tt.addonIns) + if result != tt.expectedMD5 { + t.Errorf("genInfluxDBOrg(%v) = %s; want %s", tt.addonIns, result, tt.expectedMD5) + } + }) + } +} + +func calculateMD5(input string) string { + hash := md5.Sum([]byte(input)) + return hex.EncodeToString(hash[:]) +}