diff --git a/scheduler/config/config.go b/scheduler/config/config.go index c771ff42034..6bd1a4579f8 100644 --- a/scheduler/config/config.go +++ b/scheduler/config/config.go @@ -40,6 +40,9 @@ type Config struct { // Server configuration. Server ServerConfig `yaml:"server" mapstructure:"server"` + // Database configuration. + Database DatabaseConfig `yaml:"database" mapstructure:"database"` + // Dynconfig configuration. DynConfig DynConfig `yaml:"dynConfig" mapstructure:"dynConfig"` @@ -112,6 +115,11 @@ type ServerConfig struct { DataDir string `yaml:"dataDir" mapstructure:"dataDir"` } +type DatabaseConfig struct { + // Redis configuration. + Redis RedisConfig `yaml:"redis" mapstructure:"redis"` +} + type SchedulerConfig struct { // Algorithm is scheduling algorithm used by the scheduler. Algorithm string `yaml:"algorithm" mapstructure:"algorithm"` @@ -208,7 +216,7 @@ type JobConfig struct { // Number of workers in local queue. LocalWorkerNum uint `yaml:"localWorkerNum" mapstructure:"localWorkerNum"` - // Redis configuration. + // DEPRECATED: Please use the `database.redis` field instead. Redis RedisConfig `yaml:"redis" mapstructure:"redis"` } @@ -248,6 +256,9 @@ type RedisConfig struct { // BackendDB is backend database name. BackendDB int `yaml:"backendDB" mapstructure:"backendDB"` + + // NetworkTopologyDB is network topology database name. + NetworkTopologyDB int `yaml:"networkTopologyDB" mapstructure:"networkTopologyDB"` } type MetricsConfig struct { @@ -302,9 +313,6 @@ type NetworkTopologyConfig struct { // Enable network topology service, including probe, network topology collection and synchronization service. Enable bool `yaml:"enable" mapstructure:"enable"` - // SyncInterval is the interval of synchronizing network topology between schedulers. - SyncInterval time.Duration `mapstructure:"syncInterval" yaml:"syncInterval"` - // CollectInterval is the interval of collecting network topology. CollectInterval time.Duration `mapstructure:"collectInterval" yaml:"collectInterval"` @@ -342,6 +350,13 @@ func New() *Config { AdvertisePort: DefaultServerAdvertisePort, Host: fqdn.FQDNHostname, }, + Database: DatabaseConfig{ + Redis: RedisConfig{ + BrokerDB: DefaultRedisBrokerDB, + BackendDB: DefaultRedisBackendDB, + NetworkTopologyDB: DefaultNetworkTopologyDB, + }, + }, Scheduler: SchedulerConfig{ Algorithm: DefaultSchedulerAlgorithm, BackToSourceCount: DefaultSchedulerBackToSourceCount, @@ -375,10 +390,6 @@ func New() *Config { GlobalWorkerNum: DefaultJobGlobalWorkerNum, SchedulerWorkerNum: DefaultJobSchedulerWorkerNum, LocalWorkerNum: DefaultJobLocalWorkerNum, - Redis: RedisConfig{ - BrokerDB: DefaultJobRedisBrokerDB, - BackendDB: DefaultJobRedisBackendDB, - }, }, Storage: StorageConfig{ MaxSize: DefaultStorageMaxSize, @@ -405,7 +416,6 @@ func New() *Config { }, NetworkTopology: NetworkTopologyConfig{ Enable: true, - SyncInterval: DefaultNetworkTopologySyncInterval, CollectInterval: DefaultNetworkTopologyCollectInterval, Probe: ProbeConfig{ QueueLength: DefaultProbeQueueLength, @@ -443,6 +453,22 @@ func (cfg *Config) Validate() error { return errors.New("server requires parameter host") } + if len(cfg.Database.Redis.Addrs) == 0 { + return errors.New("redis requires parameter addrs") + } + + if cfg.Database.Redis.BrokerDB < 0 { + return errors.New("redis requires parameter brokerDB") + } + + if cfg.Database.Redis.BackendDB < 0 { + return errors.New("redis requires parameter backendDB") + } + + if cfg.Database.Redis.NetworkTopologyDB < 0 { + return errors.New("redis requires parameter networkTopologyDB") + } + if cfg.Scheduler.Algorithm == "" { return errors.New("scheduler requires parameter algorithm") } @@ -515,18 +541,6 @@ func (cfg *Config) Validate() error { if cfg.Job.LocalWorkerNum == 0 { return errors.New("job requires parameter localWorkerNum") } - - if len(cfg.Job.Redis.Addrs) == 0 { - return errors.New("job requires parameter addrs") - } - - if cfg.Job.Redis.BrokerDB < 0 { - return errors.New("job requires parameter redis brokerDB") - } - - if cfg.Job.Redis.BackendDB < 0 { - return errors.New("job requires parameter redis backendDB") - } } if cfg.Storage.MaxSize <= 0 { @@ -569,10 +583,6 @@ func (cfg *Config) Validate() error { } } - if cfg.NetworkTopology.SyncInterval <= 0 { - return errors.New("networkTopology requires parameter syncInterval") - } - if cfg.NetworkTopology.CollectInterval <= 0 { return errors.New("networkTopology requires parameter collectInterval") } @@ -613,9 +623,39 @@ func (cfg *Config) Convert() error { cfg.Scheduler.RetryBackToSourceLimit = cfg.Scheduler.RetryBackSourceLimit } - // TODO Compatible with deprecated fields host and port. - if len(cfg.Job.Redis.Addrs) == 0 && cfg.Job.Redis.Host != "" && cfg.Job.Redis.Port > 0 { - cfg.Job.Redis.Addrs = []string{fmt.Sprintf("%s:%d", cfg.Job.Redis.Host, cfg.Job.Redis.Port)} + // TODO Compatible with deprecated fields address of redis of job. + if len(cfg.Database.Redis.Addrs) == 0 && len(cfg.Job.Redis.Addrs) != 0 { + cfg.Database.Redis.Addrs = cfg.Job.Redis.Addrs + } + + // TODO Compatible with deprecated fields host and port of redis of job. + if len(cfg.Database.Redis.Addrs) == 0 && len(cfg.Job.Redis.Addrs) == 0 && cfg.Job.Redis.Host != "" && cfg.Job.Redis.Port > 0 { + cfg.Database.Redis.Addrs = []string{fmt.Sprintf("%s:%d", cfg.Job.Redis.Host, cfg.Job.Redis.Port)} + } + + // TODO Compatible with deprecated fields master name of redis of job. + if cfg.Database.Redis.MasterName == "" && cfg.Job.Redis.MasterName != "" { + cfg.Database.Redis.MasterName = cfg.Job.Redis.MasterName + } + + // TODO Compatible with deprecated fields user name of redis of job. + if cfg.Database.Redis.Username == "" && cfg.Job.Redis.Username != "" { + cfg.Database.Redis.Username = cfg.Job.Redis.Username + } + + // TODO Compatible with deprecated fields password of redis of job. + if cfg.Database.Redis.Password == "" && cfg.Job.Redis.Password != "" { + cfg.Database.Redis.Password = cfg.Job.Redis.Password + } + + // TODO Compatible with deprecated fields broker database of redis of job. + if cfg.Database.Redis.BrokerDB == 0 && cfg.Job.Redis.BrokerDB != 0 { + cfg.Database.Redis.BrokerDB = cfg.Job.Redis.BrokerDB + } + + // TODO Compatible with deprecated fields backend database of redis of job. + if cfg.Database.Redis.BackendDB == 0 && cfg.Job.Redis.BackendDB != 0 { + cfg.Database.Redis.BackendDB = cfg.Job.Redis.BackendDB } // TODO Compatible with deprecated fields ip. diff --git a/scheduler/config/config_test.go b/scheduler/config/config_test.go index 6488c4f57dc..b9e1653046b 100644 --- a/scheduler/config/config_test.go +++ b/scheduler/config/config_test.go @@ -43,14 +43,6 @@ var ( GlobalWorkerNum: DefaultJobGlobalWorkerNum, SchedulerWorkerNum: DefaultJobSchedulerWorkerNum, LocalWorkerNum: DefaultJobLocalWorkerNum, - Redis: RedisConfig{ - Addrs: []string{"127.0.0.1:6379"}, - MasterName: "master", - Username: "foo", - Password: "bar", - BrokerDB: DefaultJobRedisBrokerDB, - BackendDB: DefaultJobRedisBackendDB, - }, } mockMetricsConfig = MetricsConfig{ @@ -68,6 +60,16 @@ var ( ValidityPeriod: DefaultCertValidityPeriod, }, } + + mockRedisConfig = RedisConfig{ + Addrs: []string{"127.0.0.0:6379"}, + MasterName: "master", + Username: "baz", + Password: "bax", + BrokerDB: DefaultRedisBrokerDB, + BackendDB: DefaultRedisBackendDB, + NetworkTopologyDB: DefaultNetworkTopologyDB, + } ) func TestConfig_Load(t *testing.T) { @@ -99,6 +101,18 @@ func TestConfig_Load(t *testing.T) { PluginDir: "foo", DataDir: "foo", }, + Database: DatabaseConfig{ + Redis: RedisConfig{ + Host: "127.0.0.1", + Password: "foo", + Addrs: []string{"foo", "bar"}, + MasterName: "baz", + Port: 6379, + BrokerDB: DefaultRedisBrokerDB, + BackendDB: DefaultRedisBackendDB, + NetworkTopologyDB: DefaultNetworkTopologyDB, + }, + }, DynConfig: DynConfig{ RefreshInterval: 10 * time.Second, }, @@ -121,15 +135,6 @@ func TestConfig_Load(t *testing.T) { GlobalWorkerNum: 1, SchedulerWorkerNum: 1, LocalWorkerNum: 5, - Redis: RedisConfig{ - Addrs: []string{"foo", "bar"}, - MasterName: "baz", - Host: "127.0.0.1", - Port: 6379, - Password: "foo", - BrokerDB: 1, - BackendDB: 2, - }, }, Storage: StorageConfig{ MaxSize: 1, @@ -157,12 +162,11 @@ func TestConfig_Load(t *testing.T) { }, NetworkTopology: NetworkTopologyConfig{ Enable: true, - SyncInterval: 30 * time.Second, CollectInterval: 60 * time.Second, Probe: ProbeConfig{ QueueLength: 5, SyncInterval: 30 * time.Second, - SyncCount: 50, + SyncCount: 10, }, }, Trainer: TrainerConfig{ @@ -194,6 +198,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig }, expect: func(t *testing.T, err error) { @@ -253,11 +258,64 @@ func TestConfig_Validate(t *testing.T) { assert.EqualError(err, "server requires parameter host") }, }, + { + name: "redis requires parameter addrs", + config: New(), + mock: func(cfg *Config) { + cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig + cfg.Database.Redis.Addrs = []string{} + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.EqualError(err, "redis requires parameter addrs") + }, + }, + { + name: "redis requires parameter brokerDB", + config: New(), + mock: func(cfg *Config) { + cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig + cfg.Database.Redis.BrokerDB = -1 + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.EqualError(err, "redis requires parameter brokerDB") + }, + }, + { + name: "redis requires parameter backendDB", + config: New(), + mock: func(cfg *Config) { + cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig + cfg.Database.Redis.BackendDB = -1 + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.EqualError(err, "redis requires parameter backendDB") + }, + }, + { + name: "redis requires parameter networkTopologyDB", + config: New(), + mock: func(cfg *Config) { + cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig + cfg.Database.Redis.NetworkTopologyDB = -1 + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.EqualError(err, "redis requires parameter networkTopologyDB") + }, + }, { name: "scheduler requires parameter algorithm", config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Scheduler.Algorithm = "" }, @@ -271,6 +329,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Scheduler.BackToSourceCount = 0 }, @@ -284,6 +343,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Scheduler.RetryBackToSourceLimit = 0 }, @@ -297,6 +357,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Scheduler.RetryLimit = 0 }, @@ -310,6 +371,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Scheduler.RetryInterval = 0 }, @@ -323,6 +385,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Scheduler.GC.PieceDownloadTimeout = 0 }, @@ -336,6 +399,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Scheduler.GC.PeerTTL = 0 }, @@ -349,6 +413,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Scheduler.GC.PeerGCInterval = 0 }, @@ -362,6 +427,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Scheduler.GC.TaskGCInterval = 0 }, @@ -375,6 +441,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Scheduler.GC.HostGCInterval = 0 }, @@ -388,6 +455,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Scheduler.GC.HostTTL = 0 }, @@ -401,6 +469,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.DynConfig.RefreshInterval = 0 }, @@ -414,6 +483,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Manager.Addr = "" }, @@ -427,6 +497,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Manager.SchedulerClusterID = 0 }, @@ -440,6 +511,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Manager.KeepAlive.Interval = 0 }, @@ -453,6 +525,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Job.GlobalWorkerNum = 0 }, @@ -466,6 +539,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Job.SchedulerWorkerNum = 0 }, @@ -479,6 +553,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Job.LocalWorkerNum = 0 }, @@ -487,50 +562,12 @@ func TestConfig_Validate(t *testing.T) { assert.EqualError(err, "job requires parameter localWorkerNum") }, }, - { - name: "job requires parameter addrs", - config: New(), - mock: func(cfg *Config) { - cfg.Manager = mockManagerConfig - cfg.Job = mockJobConfig - cfg.Job.Redis.Addrs = []string{} - }, - expect: func(t *testing.T, err error) { - assert := assert.New(t) - assert.EqualError(err, "job requires parameter addrs") - }, - }, - { - name: "job requires parameter redis brokerDB", - config: New(), - mock: func(cfg *Config) { - cfg.Manager = mockManagerConfig - cfg.Job = mockJobConfig - cfg.Job.Redis.BrokerDB = -1 - }, - expect: func(t *testing.T, err error) { - assert := assert.New(t) - assert.EqualError(err, "job requires parameter redis brokerDB") - }, - }, - { - name: "job requires parameter redis backendDB", - config: New(), - mock: func(cfg *Config) { - cfg.Manager = mockManagerConfig - cfg.Job = mockJobConfig - cfg.Job.Redis.BackendDB = -1 - }, - expect: func(t *testing.T, err error) { - assert := assert.New(t) - assert.EqualError(err, "job requires parameter redis backendDB") - }, - }, { name: "storage requires parameter maxSize", config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Storage.MaxSize = 0 }, @@ -544,6 +581,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Storage.MaxBackups = 0 }, @@ -557,6 +595,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Storage.BufferSize = 0 }, @@ -570,6 +609,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Metrics = mockMetricsConfig cfg.Metrics.Addr = "" @@ -584,6 +624,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Security = mockSecurityConfig cfg.Security.CACert = "" @@ -598,6 +639,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Security = mockSecurityConfig cfg.Security.TLSPolicy = "" @@ -612,6 +654,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Security = mockSecurityConfig cfg.Security.CertSpec.IPAddresses = []net.IP{} @@ -626,6 +669,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Security = mockSecurityConfig cfg.Security.CertSpec.DNSNames = []string{} @@ -640,6 +684,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Security = mockSecurityConfig cfg.Security.CertSpec.ValidityPeriod = 0 @@ -649,24 +694,12 @@ func TestConfig_Validate(t *testing.T) { assert.EqualError(err, "certSpec requires parameter validityPeriod") }, }, - { - name: "networkTopology requires parameter syncInterval", - config: New(), - mock: func(cfg *Config) { - cfg.Manager = mockManagerConfig - cfg.Job = mockJobConfig - cfg.NetworkTopology.SyncInterval = 0 - }, - expect: func(t *testing.T, err error) { - assert := assert.New(t) - assert.EqualError(err, "networkTopology requires parameter syncInterval") - }, - }, { name: "networkTopology requires parameter collectInterval", config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.NetworkTopology.CollectInterval = 0 }, @@ -680,6 +713,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.NetworkTopology.Probe.QueueLength = 0 }, @@ -693,6 +727,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.NetworkTopology.Probe.SyncInterval = 0 }, @@ -706,6 +741,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.NetworkTopology.Probe.SyncCount = 0 }, @@ -719,6 +755,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Trainer.Enable = true cfg.Trainer.Addr = "" @@ -733,6 +770,7 @@ func TestConfig_Validate(t *testing.T) { config: New(), mock: func(cfg *Config) { cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig cfg.Job = mockJobConfig cfg.Trainer.Enable = true cfg.Trainer.Interval = 0 diff --git a/scheduler/config/constants.go b/scheduler/config/constants.go index ff8dc1cff36..1287a552f36 100644 --- a/scheduler/config/constants.go +++ b/scheduler/config/constants.go @@ -48,6 +48,17 @@ const ( DefaultServerAdvertisePort = 8002 ) +const ( + // DefaultRedisBrokerDB is default db for redis broker. + DefaultRedisBrokerDB = 1 + + // DefaultRedisBackendDB is default db for redis backend. + DefaultRedisBackendDB = 2 + + // DefaultNetworkTopologyDB is default db for network topology. + DefaultNetworkTopologyDB = 3 +) + const ( // DefaultSchedulerAlgorithm is default algorithm for scheduler. DefaultSchedulerAlgorithm = "default" @@ -152,24 +163,20 @@ const ( ) const ( - // TODO(XZ): The default setting needs to be changed after testing. - // DefaultNetworkTopologySyncInterval is the default interval of synchronizing network topology between schedulers. - DefaultNetworkTopologySyncInterval = 30 * time.Second - - // TODO(XZ): The default setting needs to be changed after testing. + // TODO(fcgxz2003): The default setting needs to be changed after testing. // DefaultNetworkTopologyCollectInterval is the default interval of collecting network topology. DefaultNetworkTopologyCollectInterval = 60 * time.Second // DefaultProbeQueueLength is the default length of probe queue in directed graph. DefaultProbeQueueLength = 5 - // TODO(XZ): The default setting needs to be changed after testing. + // TODO(fcgxz2003): The default setting needs to be changed after testing. // DefaultProbeSyncInterval is the default interval of synchronizing host's probes. DefaultProbeSyncInterval = 30 * time.Second - // TODO(XZ): The default setting needs to be changed after testing. + // TODO(fcgxz2003): The default setting needs to be changed after testing. // DefaultProbeSyncCount is the default number of probing hosts. - DefaultProbeSyncCount = 50 + DefaultProbeSyncCount = 10 ) const ( diff --git a/scheduler/config/testdata/scheduler.yaml b/scheduler/config/testdata/scheduler.yaml index 1f2db5dc7ee..eb2d660a245 100644 --- a/scheduler/config/testdata/scheduler.yaml +++ b/scheduler/config/testdata/scheduler.yaml @@ -10,6 +10,17 @@ server: pluginDir: foo dataDir: foo +database: + redis: + addrs: [ "foo", "bar" ] + masterName: "baz" + host: 127.0.0.1 + port: 6379 + password: foo + brokerDB: 1 + backendDB: 2 + networkTopologyDB: 3 + scheduler: algorithm: default backToSourceCount: 3 @@ -45,14 +56,6 @@ job: globalWorkerNum: 1 schedulerWorkerNum: 1 localWorkerNum: 5 - redis: - addrs: [ "foo", "bar" ] - masterName: "baz" - host: 127.0.0.1 - port: 6379 - password: foo - brokerDB: 1 - backendDB: 2 storage: maxSize: 1 @@ -81,12 +84,11 @@ network: networkTopology: enable: true - syncInterval: 30s collectInterval: 60s probe: queueLength: 5 syncInterval: 30s - syncCount: 50 + syncCount: 10 trainer: enable: false diff --git a/scheduler/job/job.go b/scheduler/job/job.go index 91dcf57fa5f..a4453ac56d4 100644 --- a/scheduler/job/job.go +++ b/scheduler/job/job.go @@ -58,12 +58,12 @@ type job struct { func New(cfg *config.Config, resource resource.Resource) (Job, error) { redisConfig := &internaljob.Config{ - Addrs: cfg.Job.Redis.Addrs, - MasterName: cfg.Job.Redis.MasterName, - Username: cfg.Job.Redis.Username, - Password: cfg.Job.Redis.Password, - BrokerDB: cfg.Job.Redis.BrokerDB, - BackendDB: cfg.Job.Redis.BackendDB, + Addrs: cfg.Database.Redis.Addrs, + MasterName: cfg.Database.Redis.MasterName, + Username: cfg.Database.Redis.Username, + Password: cfg.Database.Redis.Password, + BrokerDB: cfg.Database.Redis.BrokerDB, + BackendDB: cfg.Database.Redis.BackendDB, } globalJob, err := internaljob.New(redisConfig, internaljob.GlobalQueue) diff --git a/scheduler/networktopology/network_topology.go b/scheduler/networktopology/network_topology.go new file mode 100644 index 00000000000..d7360376a59 --- /dev/null +++ b/scheduler/networktopology/network_topology.go @@ -0,0 +1,60 @@ +/* + * Copyright 2023 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package networktopology + +import ( + "github.com/go-redis/redis/v8" + + "d7y.io/dragonfly/v2/scheduler/config" + "d7y.io/dragonfly/v2/scheduler/resource" + "d7y.io/dragonfly/v2/scheduler/storage" +) + +// TODO(fcgxz2003): implement the network topology. +type NetworkTopology interface { + // Peek returns the oldest probe without removing it. + Peek(src, dest string) (*Probe, bool) +} + +type networkTopology struct { + // Scheduler config. + config *config.Config + + // Redis universal client interface. + rdb redis.UniversalClient + + // Resource interface. + resource resource.Resource + + // Storage interface. + storage storage.Storage +} + +// New network topology interface. +func NewNetworkTopology(cfg *config.Config, rdb redis.UniversalClient, resource resource.Resource, storage storage.Storage) (NetworkTopology, error) { + return &networkTopology{ + config: cfg, + rdb: rdb, + resource: resource, + storage: storage, + }, nil +} + +// Peek returns the oldest probe without removing it. +func (n *networkTopology) Peek(src, dest string) (*Probe, bool) { + return nil, false +} diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 1a99292aeaf..69f85b0cf7d 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -26,6 +26,7 @@ import ( "path/filepath" "time" + "github.com/go-redis/redis/v8" "github.com/johanbrandhorst/certify" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -39,6 +40,7 @@ import ( "d7y.io/dragonfly/v2/pkg/gc" "d7y.io/dragonfly/v2/pkg/issuer" "d7y.io/dragonfly/v2/pkg/net/ip" + pkgredis "d7y.io/dragonfly/v2/pkg/redis" "d7y.io/dragonfly/v2/pkg/rpc" managerclient "d7y.io/dragonfly/v2/pkg/rpc/manager/client" securityclient "d7y.io/dragonfly/v2/pkg/rpc/security/client" @@ -47,6 +49,7 @@ import ( "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/job" "d7y.io/dragonfly/v2/scheduler/metrics" + "d7y.io/dragonfly/v2/scheduler/networktopology" "d7y.io/dragonfly/v2/scheduler/resource" "d7y.io/dragonfly/v2/scheduler/rpcserver" "d7y.io/dragonfly/v2/scheduler/scheduling" @@ -90,6 +93,9 @@ type Server struct { // Announcer interface. announcer announcer.Announcer + // Network topology interface. + networkTopology networktopology.NetworkTopology + // GC service. gc gc.GC } @@ -97,6 +103,18 @@ type Server struct { func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, error) { s := &Server{config: cfg} + // Initialize redis client. + rdb, err := pkgredis.NewRedis(&redis.UniversalOptions{ + Addrs: cfg.Database.Redis.Addrs, + MasterName: cfg.Database.Redis.MasterName, + DB: cfg.Database.Redis.NetworkTopologyDB, + Username: cfg.Database.Redis.Username, + Password: cfg.Database.Redis.Password, + }) + if err != nil { + return nil, err + } + // Initialize manager client and dial options of manager grpc client. managerDialOptions := []grpc.DialOption{} if cfg.Security.AutoIssueCert { @@ -226,6 +244,14 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err s.metricsServer = metrics.New(&cfg.Metrics, s.grpcServer) } + // Initialize network topology service. + if cfg.NetworkTopology.Enable { + s.networkTopology, err = networktopology.NewNetworkTopology(cfg, rdb, resource, s.storage) + if err != nil { + return nil, err + } + } + return s, nil }