From 87f9fcc5365dcc8e0d6d5a128b487339fbb9e9c4 Mon Sep 17 00:00:00 2001 From: Gaius Date: Tue, 2 Jan 2024 14:53:08 +0800 Subject: [PATCH] feat: remove object storage and model message (#2992) Signed-off-by: Gaius --- go.mod | 2 +- go.sum | 4 +- manager/rpcserver/manager_server_v2.go | 247 +----------------- manager/rpcserver/rpcserver.go | 2 +- pkg/rpc/manager/client/client_v2.go | 34 --- .../manager/client/mocks/client_v2_mock.go | 59 ----- scheduler/service/service_v2.go | 4 - 7 files changed, 10 insertions(+), 342 deletions(-) diff --git a/go.mod b/go.mod index 799746b659d..7395b939b3d 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2 go 1.21 require ( - d7y.io/api/v2 v2.0.72 + d7y.io/api/v2 v2.0.73 github.com/MysteriousPotato/go-lockable v1.0.0 github.com/RichardKnop/machinery v1.10.6 github.com/Showmax/go-fqdn v1.0.0 diff --git a/go.sum b/go.sum index 722dea85e01..9fac233abab 100644 --- a/go.sum +++ b/go.sum @@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= -d7y.io/api/v2 v2.0.72 h1:dodisPjCkVp65Li/vyhjZGI5UbyeIdDbRHGyZeFxOQI= -d7y.io/api/v2 v2.0.72/go.mod h1:hiZRuNTy1Tiv7+peJkYloDPm0Sq9GlPTVaiKb2UOhhU= +d7y.io/api/v2 v2.0.73 h1:ZzqErswvYeurqmsXVy4i5sUL8CfSq6n+A/r0UP9wI+M= +d7y.io/api/v2 v2.0.73/go.mod h1:hiZRuNTy1Tiv7+peJkYloDPm0Sq9GlPTVaiKb2UOhhU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= diff --git a/manager/rpcserver/manager_server_v2.go b/manager/rpcserver/manager_server_v2.go index c7c9b23857e..79fcf73964e 100644 --- a/manager/rpcserver/manager_server_v2.go +++ b/manager/rpcserver/manager_server_v2.go @@ -17,14 +17,10 @@ package rpcserver import ( - "bytes" "context" "encoding/json" "errors" - "fmt" "io" - "strings" - "time" cachev8 "github.com/go-redis/cache/v8" "github.com/go-redis/redis/v8" @@ -34,7 +30,6 @@ import ( "gorm.io/gorm" commonv2 "d7y.io/api/v2/pkg/apis/common/v2" - inferencev1 "d7y.io/api/v2/pkg/apis/inference/v1" managerv2 "d7y.io/api/v2/pkg/apis/manager/v2" logger "d7y.io/dragonfly/v2/internal/dflog" @@ -45,12 +40,8 @@ import ( "d7y.io/dragonfly/v2/manager/models" "d7y.io/dragonfly/v2/manager/searcher" "d7y.io/dragonfly/v2/manager/types" - "d7y.io/dragonfly/v2/pkg/digest" - "d7y.io/dragonfly/v2/pkg/idgen" - "d7y.io/dragonfly/v2/pkg/objectstorage" pkgredis "d7y.io/dragonfly/v2/pkg/redis" "d7y.io/dragonfly/v2/pkg/slices" - "d7y.io/dragonfly/v2/pkg/structure" ) // managerServerV2 is v2 version of the manager grpc server. @@ -69,22 +60,16 @@ type managerServerV2 struct { // Searcher interface. searcher searcher.Searcher - - // Object storage interface. - objectStorage objectstorage.ObjectStorage } // newManagerServerV2 returns v2 version of the manager server. -func newManagerServerV2( - cfg *config.Config, database *database.Database, cache *cache.Cache, searcher searcher.Searcher, - objectStorage objectstorage.ObjectStorage) managerv2.ManagerServer { +func newManagerServerV2(cfg *config.Config, database *database.Database, cache *cache.Cache, searcher searcher.Searcher) managerv2.ManagerServer { return &managerServerV2{ - config: cfg, - db: database.DB, - rdb: database.RDB, - cache: cache, - searcher: searcher, - objectStorage: objectStorage, + config: cfg, + db: database.DB, + rdb: database.RDB, + cache: cache, + searcher: searcher, } } @@ -152,7 +137,6 @@ func (s *managerServerV2) GetSeedPeer(ctx context.Context, req *managerv2.GetSee Ip: seedPeer.IP, Port: seedPeer.Port, DownloadPort: seedPeer.DownloadPort, - ObjectStoragePort: seedPeer.ObjectStoragePort, State: seedPeer.State, SeedPeerClusterId: uint64(seedPeer.SeedPeerClusterID), SeedPeerCluster: &managerv2.SeedPeerCluster{ @@ -225,7 +209,6 @@ func (s *managerServerV2) ListSeedPeers(ctx context.Context, req *managerv2.List Ip: seedPeer.IP, Port: seedPeer.Port, DownloadPort: seedPeer.DownloadPort, - ObjectStoragePort: seedPeer.ObjectStoragePort, State: seedPeer.State, SeedPeerClusterId: uint64(seedPeer.SeedPeerClusterID), SeedPeerCluster: &managerv2.SeedPeerCluster{ @@ -271,7 +254,6 @@ func (s *managerServerV2) UpdateSeedPeer(ctx context.Context, req *managerv2.Upd IP: req.GetIp(), Port: req.GetPort(), DownloadPort: req.GetDownloadPort(), - ObjectStoragePort: req.GetObjectStoragePort(), State: models.SeedPeerStateActive, SeedPeerClusterID: uint(req.GetSeedPeerClusterId()), }).Error; err != nil { @@ -294,7 +276,6 @@ func (s *managerServerV2) UpdateSeedPeer(ctx context.Context, req *managerv2.Upd Ip: seedPeer.IP, Port: seedPeer.Port, DownloadPort: seedPeer.DownloadPort, - ObjectStoragePort: seedPeer.ObjectStoragePort, State: seedPeer.State, SeedPeerClusterId: uint64(seedPeer.SeedPeerClusterID), }, nil @@ -310,7 +291,6 @@ func (s *managerServerV2) createSeedPeer(ctx context.Context, req *managerv2.Upd IP: req.GetIp(), Port: req.GetPort(), DownloadPort: req.GetDownloadPort(), - ObjectStoragePort: req.GetObjectStoragePort(), State: models.SeedPeerStateActive, SeedPeerClusterID: uint(req.GetSeedPeerClusterId()), } @@ -328,7 +308,6 @@ func (s *managerServerV2) createSeedPeer(ctx context.Context, req *managerv2.Upd Ip: seedPeer.IP, Port: seedPeer.Port, DownloadPort: seedPeer.DownloadPort, - ObjectStoragePort: seedPeer.ObjectStoragePort, SeedPeerClusterId: uint64(seedPeer.SeedPeerClusterID), State: seedPeer.State, }, nil @@ -412,7 +391,6 @@ func (s *managerServerV2) GetScheduler(ctx context.Context, req *managerv2.GetSc Ip: seedPeer.IP, Port: seedPeer.Port, DownloadPort: seedPeer.DownloadPort, - ObjectStoragePort: seedPeer.ObjectStoragePort, State: seedPeer.State, SeedPeerClusterId: uint64(seedPeer.SeedPeerClusterID), SeedPeerCluster: &managerv2.SeedPeerCluster{ @@ -631,7 +609,6 @@ func (s *managerServerV2) ListSchedulers(ctx context.Context, req *managerv2.Lis Ip: seedPeer.IP, Port: seedPeer.Port, DownloadPort: seedPeer.DownloadPort, - ObjectStoragePort: seedPeer.ObjectStoragePort, State: seedPeer.State, SeedPeerClusterId: uint64(seedPeer.SeedPeerClusterID), }) @@ -678,66 +655,6 @@ func (s *managerServerV2) ListSchedulers(ctx context.Context, req *managerv2.Lis return &pbListSchedulersResponse, nil } -// Get object storage configuration. -func (s *managerServerV2) GetObjectStorage(ctx context.Context, req *managerv2.GetObjectStorageRequest) (*managerv2.ObjectStorage, error) { - if !s.config.ObjectStorage.Enable { - return nil, status.Error(codes.NotFound, "object storage is disabled") - } - - return &managerv2.ObjectStorage{ - Name: s.config.ObjectStorage.Name, - Region: s.config.ObjectStorage.Region, - Endpoint: s.config.ObjectStorage.Endpoint, - AccessKey: s.config.ObjectStorage.AccessKey, - SecretKey: s.config.ObjectStorage.SecretKey, - S3ForcePathStyle: s.config.ObjectStorage.S3ForcePathStyle, - }, nil -} - -// List buckets configuration. -func (s *managerServerV2) ListBuckets(ctx context.Context, req *managerv2.ListBucketsRequest) (*managerv2.ListBucketsResponse, error) { - if !s.config.ObjectStorage.Enable { - return nil, status.Error(codes.NotFound, "object storage is disabled") - } - - log := logger.WithHostnameAndIP(req.Hostname, req.Ip) - var pbListBucketsResponse managerv2.ListBucketsResponse - cacheKey := pkgredis.MakeBucketKeyInManager(s.config.ObjectStorage.Name) - - // Cache hit. - if err := s.cache.Get(ctx, cacheKey, &pbListBucketsResponse); err != nil { - log.Warnf("%s cache miss because of %s", cacheKey, err.Error()) - } else { - log.Debugf("%s cache hit", cacheKey) - return &pbListBucketsResponse, nil - } - - // Cache miss and search buckets. - buckets, err := s.objectStorage.ListBucketMetadatas(ctx) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - // Construct schedulers. - for _, bucket := range buckets { - pbListBucketsResponse.Buckets = append(pbListBucketsResponse.Buckets, &managerv2.Bucket{ - Name: bucket.Name, - }) - } - - // Cache data. - if err := s.cache.Once(&cachev8.Item{ - Ctx: ctx, - Key: cacheKey, - Value: &pbListBucketsResponse, - TTL: s.cache.TTL, - }); err != nil { - log.Error(err) - } - - return &pbListBucketsResponse, nil -} - // List applications configuration. func (s *managerServerV2) ListApplications(ctx context.Context, req *managerv2.ListApplicationsRequest) (*managerv2.ListApplicationsResponse, error) { log := logger.WithHostnameAndIP(req.Hostname, req.Ip) @@ -812,158 +729,6 @@ func (s *managerServerV2) ListApplications(ctx context.Context, req *managerv2.L return &pbListApplicationsResponse, nil } -// CreateModel creates model and update data of model to object storage. -func (s *managerServerV2) CreateModel(ctx context.Context, req *managerv2.CreateModelRequest) (*emptypb.Empty, error) { - log := logger.WithHostnameAndIP(req.GetHostname(), req.GetIp()) - - if !s.config.ObjectStorage.Enable { - log.Warn("object storage is disabled") - return nil, status.Error(codes.Internal, "object storage is disabled") - } - - // Create model bucket, if not exist. - if err := s.createModelBucket(ctx); err != nil { - log.Error(err) - return nil, status.Error(codes.Internal, err.Error()) - } - - var ( - name string - typ string - evaluation types.ModelEvaluation - version = time.Now().Nanosecond() - ) - switch createModelRequest := req.GetRequest().(type) { - case *managerv2.CreateModelRequest_CreateGnnRequest: - name = idgen.GNNModelIDV1(req.GetIp(), req.GetHostname()) - typ = models.ModelTypeGNN - evaluation = types.ModelEvaluation{ - Precision: createModelRequest.CreateGnnRequest.GetPrecision(), - Recall: createModelRequest.CreateGnnRequest.GetRecall(), - F1Score: createModelRequest.CreateGnnRequest.GetF1Score(), - } - - // Update GNN model config to object storage. - if err := s.createModelConfig(ctx, name); err != nil { - log.Error(err) - return nil, status.Error(codes.Internal, err.Error()) - } - - // Upload GNN model file to object storage. - data := createModelRequest.CreateGnnRequest.GetData() - dgst := digest.New(digest.AlgorithmSHA256, digest.SHA256FromBytes(data)) - if err := s.objectStorage.PutObject(ctx, s.config.Trainer.BucketName, - types.MakeObjectKeyOfModelFile(name, version), dgst.String(), bytes.NewReader(data)); err != nil { - log.Error(err) - return nil, status.Error(codes.Internal, err.Error()) - } - case *managerv2.CreateModelRequest_CreateMlpRequest: - name = idgen.MLPModelIDV1(req.GetHostname(), req.GetIp()) - typ = models.ModelTypeMLP - evaluation = types.ModelEvaluation{ - MSE: createModelRequest.CreateMlpRequest.GetMse(), - MAE: createModelRequest.CreateMlpRequest.GetMae(), - } - - // Update MLP model config to object storage. - if err := s.createModelConfig(ctx, name); err != nil { - log.Error(err) - return nil, status.Error(codes.Internal, err.Error()) - } - - // Upload MLP model file to object storage. - data := createModelRequest.CreateMlpRequest.GetData() - dgst := digest.New(digest.AlgorithmSHA256, digest.SHA256FromBytes(data)) - if err := s.objectStorage.PutObject(ctx, s.config.Trainer.BucketName, - types.MakeObjectKeyOfModelFile(name, version), dgst.String(), bytes.NewReader(data)); err != nil { - log.Error(err) - return nil, status.Error(codes.Internal, err.Error()) - } - default: - msg := fmt.Sprintf("receive unknow request: %#v", createModelRequest) - log.Error(msg) - return nil, status.Error(codes.FailedPrecondition, msg) - } - - scheduler := models.Scheduler{} - if err := s.db.WithContext(ctx).First(&scheduler, &models.Scheduler{ - Hostname: req.Hostname, - IP: req.Ip, - }).Error; err != nil { - log.Error(err) - return nil, status.Error(codes.Internal, err.Error()) - } - - rawEvaluation, err := structure.StructToMap(evaluation) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - // Create model in database. - if err := s.db.WithContext(ctx).Model(&scheduler).Association("Models").Append(&models.Model{ - Type: typ, - Version: fmt.Sprint(version), - State: models.ModelVersionStateInactive, - Evaluation: rawEvaluation, - }); err != nil { - log.Error(err) - return nil, status.Error(codes.Internal, err.Error()) - } - - return new(emptypb.Empty), nil -} - -// createModelBucket creates model bucket if not exist. -func (s *managerServerV2) createModelBucket(ctx context.Context) error { - // Check bucket exist. - isExist, err := s.objectStorage.IsBucketExist(ctx, s.config.Trainer.BucketName) - if err != nil { - return err - } - - // Create bucket if not exist. - if !isExist { - if err := s.objectStorage.CreateBucket(ctx, s.config.Trainer.BucketName); err != nil { - return err - } - } - - return nil -} - -// createModelConfig creates model config to object storage. -func (s *managerServerV2) createModelConfig(ctx context.Context, name string) error { - objectKey := types.MakeObjectKeyOfModelConfigFile(name) - isExist, err := s.objectStorage.IsObjectExist(ctx, s.config.Trainer.BucketName, objectKey) - if err != nil { - return err - } - - // If the model config already exists, skip it. - if isExist { - return nil - } - - // If the model config does not exist, create a new model config. - pbModelConfig := inferencev1.ModelConfig{ - Name: name, - Platform: types.DefaultTritonPlatform, - VersionPolicy: &inferencev1.ModelVersionPolicy{ - PolicyChoice: &inferencev1.ModelVersionPolicy_Specific_{ - Specific: &inferencev1.ModelVersionPolicy_Specific{Versions: []int64{}}, - }, - }, - } - - dgst := digest.New(digest.AlgorithmSHA256, digest.SHA256FromStrings(pbModelConfig.String())) - if err := s.objectStorage.PutObject(ctx, s.config.Trainer.BucketName, - types.MakeObjectKeyOfModelConfigFile(name), dgst.String(), strings.NewReader(pbModelConfig.String())); err != nil { - return err - } - - return nil -} - // KeepAlive with manager. func (s *managerServerV2) KeepAlive(stream managerv2.Manager_KeepAliveServer) error { req, err := stream.Recv() diff --git a/manager/rpcserver/rpcserver.go b/manager/rpcserver/rpcserver.go index 067be24a1ed..16a486732a6 100644 --- a/manager/rpcserver/rpcserver.go +++ b/manager/rpcserver/rpcserver.go @@ -122,7 +122,7 @@ func New( return s, managerserver.New( newManagerServerV1(s.config, database, s.cache, s.searcher, s.objectStorage), - newManagerServerV2(s.config, database, s.cache, s.searcher, s.objectStorage), + newManagerServerV2(s.config, database, s.cache, s.searcher), newSecurityServerV1(s.selfSignedCert), s.serverOptions...), nil } diff --git a/pkg/rpc/manager/client/client_v2.go b/pkg/rpc/manager/client/client_v2.go index 94f1bf42cb2..a52da6d29c4 100644 --- a/pkg/rpc/manager/client/client_v2.go +++ b/pkg/rpc/manager/client/client_v2.go @@ -105,18 +105,9 @@ type V2 interface { // List acitve schedulers configuration. ListSchedulers(context.Context, *managerv2.ListSchedulersRequest, ...grpc.CallOption) (*managerv2.ListSchedulersResponse, error) - // Get object storage configuration. - GetObjectStorage(context.Context, *managerv2.GetObjectStorageRequest, ...grpc.CallOption) (*managerv2.ObjectStorage, error) - - // List buckets configuration. - ListBuckets(context.Context, *managerv2.ListBucketsRequest, ...grpc.CallOption) (*managerv2.ListBucketsResponse, error) - // List applications configuration. ListApplications(context.Context, *managerv2.ListApplicationsRequest, ...grpc.CallOption) (*managerv2.ListApplicationsResponse, error) - // Create model and update data of model to object storage. - CreateModel(context.Context, *managerv2.CreateModelRequest, ...grpc.CallOption) error - // KeepAlive with manager. KeepAlive(time.Duration, *managerv2.KeepAliveRequest, <-chan struct{}, ...grpc.CallOption) @@ -180,22 +171,6 @@ func (v *v2) ListSchedulers(ctx context.Context, req *managerv2.ListSchedulersRe return v.ManagerClient.ListSchedulers(ctx, req, opts...) } -// Get object storage configuration. -func (v *v2) GetObjectStorage(ctx context.Context, req *managerv2.GetObjectStorageRequest, opts ...grpc.CallOption) (*managerv2.ObjectStorage, error) { - ctx, cancel := context.WithTimeout(ctx, contextTimeout) - defer cancel() - - return v.ManagerClient.GetObjectStorage(ctx, req, opts...) -} - -// List buckets configuration. -func (v *v2) ListBuckets(ctx context.Context, req *managerv2.ListBucketsRequest, opts ...grpc.CallOption) (*managerv2.ListBucketsResponse, error) { - ctx, cancel := context.WithTimeout(ctx, contextTimeout) - defer cancel() - - return v.ManagerClient.ListBuckets(ctx, req, opts...) -} - // List applications configuration. func (v *v2) ListApplications(ctx context.Context, req *managerv2.ListApplicationsRequest, opts ...grpc.CallOption) (*managerv2.ListApplicationsResponse, error) { ctx, cancel := context.WithTimeout(ctx, contextTimeout) @@ -204,15 +179,6 @@ func (v *v2) ListApplications(ctx context.Context, req *managerv2.ListApplicatio return v.ManagerClient.ListApplications(ctx, req, opts...) } -// Create model and update data of model to object storage. -func (v *v2) CreateModel(ctx context.Context, req *managerv2.CreateModelRequest, opts ...grpc.CallOption) error { - ctx, cancel := context.WithTimeout(ctx, createModelContextTimeout) - defer cancel() - - _, err := v.ManagerClient.CreateModel(ctx, req, opts...) - return err -} - // List acitve schedulers configuration. func (v *v2) KeepAlive(interval time.Duration, keepalive *managerv2.KeepAliveRequest, done <-chan struct{}, opts ...grpc.CallOption) { log := logger.WithKeepAlive(keepalive.Hostname, keepalive.Ip, keepalive.SourceType.Enum().String(), keepalive.ClusterId) diff --git a/pkg/rpc/manager/client/mocks/client_v2_mock.go b/pkg/rpc/manager/client/mocks/client_v2_mock.go index 94bdb8e79e7..8953dc25c05 100644 --- a/pkg/rpc/manager/client/mocks/client_v2_mock.go +++ b/pkg/rpc/manager/client/mocks/client_v2_mock.go @@ -55,25 +55,6 @@ func (mr *MockV2MockRecorder) Close() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockV2)(nil).Close)) } -// CreateModel mocks base method. -func (m *MockV2) CreateModel(arg0 context.Context, arg1 *manager.CreateModelRequest, arg2 ...grpc.CallOption) error { - m.ctrl.T.Helper() - varargs := []any{arg0, arg1} - for _, a := range arg2 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "CreateModel", varargs...) - ret0, _ := ret[0].(error) - return ret0 -} - -// CreateModel indicates an expected call of CreateModel. -func (mr *MockV2MockRecorder) CreateModel(arg0, arg1 any, arg2 ...any) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]any{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateModel", reflect.TypeOf((*MockV2)(nil).CreateModel), varargs...) -} - // DeleteSeedPeer mocks base method. func (m *MockV2) DeleteSeedPeer(arg0 context.Context, arg1 *manager.DeleteSeedPeerRequest, arg2 ...grpc.CallOption) error { m.ctrl.T.Helper() @@ -93,26 +74,6 @@ func (mr *MockV2MockRecorder) DeleteSeedPeer(arg0, arg1 any, arg2 ...any) *gomoc return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteSeedPeer", reflect.TypeOf((*MockV2)(nil).DeleteSeedPeer), varargs...) } -// GetObjectStorage mocks base method. -func (m *MockV2) GetObjectStorage(arg0 context.Context, arg1 *manager.GetObjectStorageRequest, arg2 ...grpc.CallOption) (*manager.ObjectStorage, error) { - m.ctrl.T.Helper() - varargs := []any{arg0, arg1} - for _, a := range arg2 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "GetObjectStorage", varargs...) - ret0, _ := ret[0].(*manager.ObjectStorage) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetObjectStorage indicates an expected call of GetObjectStorage. -func (mr *MockV2MockRecorder) GetObjectStorage(arg0, arg1 any, arg2 ...any) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]any{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetObjectStorage", reflect.TypeOf((*MockV2)(nil).GetObjectStorage), varargs...) -} - // GetScheduler mocks base method. func (m *MockV2) GetScheduler(arg0 context.Context, arg1 *manager.GetSchedulerRequest, arg2 ...grpc.CallOption) (*manager.Scheduler, error) { m.ctrl.T.Helper() @@ -170,26 +131,6 @@ func (mr *MockV2MockRecorder) ListApplications(arg0, arg1 any, arg2 ...any) *gom return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListApplications", reflect.TypeOf((*MockV2)(nil).ListApplications), varargs...) } -// ListBuckets mocks base method. -func (m *MockV2) ListBuckets(arg0 context.Context, arg1 *manager.ListBucketsRequest, arg2 ...grpc.CallOption) (*manager.ListBucketsResponse, error) { - m.ctrl.T.Helper() - varargs := []any{arg0, arg1} - for _, a := range arg2 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "ListBuckets", varargs...) - ret0, _ := ret[0].(*manager.ListBucketsResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ListBuckets indicates an expected call of ListBuckets. -func (mr *MockV2MockRecorder) ListBuckets(arg0, arg1 any, arg2 ...any) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]any{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListBuckets", reflect.TypeOf((*MockV2)(nil).ListBuckets), varargs...) -} - // ListSchedulers mocks base method. func (m *MockV2) ListSchedulers(arg0 context.Context, arg1 *manager.ListSchedulersRequest, arg2 ...grpc.CallOption) (*manager.ListSchedulersResponse, error) { m.ctrl.T.Helper() diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index d1fe0d344ae..b342d0d3d2d 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -551,10 +551,6 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ options = append(options, resource.WithSchedulerClusterID(uint64(v.config.Manager.SchedulerClusterID))) } - if req.Host.GetObjectStoragePort() != 0 { - options = append(options, resource.WithObjectStoragePort(req.Host.GetObjectStoragePort())) - } - host = resource.NewHost( req.Host.GetId(), req.Host.GetIp(), req.Host.GetHostname(), req.Host.GetPort(), req.Host.GetDownloadPort(), types.HostType(req.Host.GetType()),