Skip to content

Commit

Permalink
feat: remove object storage and model message (#2992)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Jan 2, 2024
1 parent eb9a92a commit 87f9fcc
Show file tree
Hide file tree
Showing 7 changed files with 10 additions and 342 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.21

require (
d7y.io/api/v2 v2.0.72
d7y.io/api/v2 v2.0.73
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
d7y.io/api/v2 v2.0.72 h1:dodisPjCkVp65Li/vyhjZGI5UbyeIdDbRHGyZeFxOQI=
d7y.io/api/v2 v2.0.72/go.mod h1:hiZRuNTy1Tiv7+peJkYloDPm0Sq9GlPTVaiKb2UOhhU=
d7y.io/api/v2 v2.0.73 h1:ZzqErswvYeurqmsXVy4i5sUL8CfSq6n+A/r0UP9wI+M=
d7y.io/api/v2 v2.0.73/go.mod h1:hiZRuNTy1Tiv7+peJkYloDPm0Sq9GlPTVaiKb2UOhhU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
Expand Down
247 changes: 6 additions & 241 deletions manager/rpcserver/manager_server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@
package rpcserver

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"strings"
"time"

cachev8 "github.com/go-redis/cache/v8"
"github.com/go-redis/redis/v8"
Expand All @@ -34,7 +30,6 @@ import (
"gorm.io/gorm"

commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
inferencev1 "d7y.io/api/v2/pkg/apis/inference/v1"
managerv2 "d7y.io/api/v2/pkg/apis/manager/v2"

logger "d7y.io/dragonfly/v2/internal/dflog"
Expand All @@ -45,12 +40,8 @@ import (
"d7y.io/dragonfly/v2/manager/models"
"d7y.io/dragonfly/v2/manager/searcher"
"d7y.io/dragonfly/v2/manager/types"
"d7y.io/dragonfly/v2/pkg/digest"
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/objectstorage"
pkgredis "d7y.io/dragonfly/v2/pkg/redis"
"d7y.io/dragonfly/v2/pkg/slices"
"d7y.io/dragonfly/v2/pkg/structure"
)

// managerServerV2 is v2 version of the manager grpc server.
Expand All @@ -69,22 +60,16 @@ type managerServerV2 struct {

// Searcher interface.
searcher searcher.Searcher

// Object storage interface.
objectStorage objectstorage.ObjectStorage
}

// newManagerServerV2 returns v2 version of the manager server.
func newManagerServerV2(
cfg *config.Config, database *database.Database, cache *cache.Cache, searcher searcher.Searcher,
objectStorage objectstorage.ObjectStorage) managerv2.ManagerServer {
func newManagerServerV2(cfg *config.Config, database *database.Database, cache *cache.Cache, searcher searcher.Searcher) managerv2.ManagerServer {
return &managerServerV2{
config: cfg,
db: database.DB,
rdb: database.RDB,
cache: cache,
searcher: searcher,
objectStorage: objectStorage,
config: cfg,
db: database.DB,
rdb: database.RDB,
cache: cache,
searcher: searcher,
}
}

Expand Down Expand Up @@ -152,7 +137,6 @@ func (s *managerServerV2) GetSeedPeer(ctx context.Context, req *managerv2.GetSee
Ip: seedPeer.IP,
Port: seedPeer.Port,
DownloadPort: seedPeer.DownloadPort,
ObjectStoragePort: seedPeer.ObjectStoragePort,
State: seedPeer.State,
SeedPeerClusterId: uint64(seedPeer.SeedPeerClusterID),
SeedPeerCluster: &managerv2.SeedPeerCluster{
Expand Down Expand Up @@ -225,7 +209,6 @@ func (s *managerServerV2) ListSeedPeers(ctx context.Context, req *managerv2.List
Ip: seedPeer.IP,
Port: seedPeer.Port,
DownloadPort: seedPeer.DownloadPort,
ObjectStoragePort: seedPeer.ObjectStoragePort,
State: seedPeer.State,
SeedPeerClusterId: uint64(seedPeer.SeedPeerClusterID),
SeedPeerCluster: &managerv2.SeedPeerCluster{
Expand Down Expand Up @@ -271,7 +254,6 @@ func (s *managerServerV2) UpdateSeedPeer(ctx context.Context, req *managerv2.Upd
IP: req.GetIp(),
Port: req.GetPort(),
DownloadPort: req.GetDownloadPort(),
ObjectStoragePort: req.GetObjectStoragePort(),
State: models.SeedPeerStateActive,
SeedPeerClusterID: uint(req.GetSeedPeerClusterId()),
}).Error; err != nil {
Expand All @@ -294,7 +276,6 @@ func (s *managerServerV2) UpdateSeedPeer(ctx context.Context, req *managerv2.Upd
Ip: seedPeer.IP,
Port: seedPeer.Port,
DownloadPort: seedPeer.DownloadPort,
ObjectStoragePort: seedPeer.ObjectStoragePort,
State: seedPeer.State,
SeedPeerClusterId: uint64(seedPeer.SeedPeerClusterID),
}, nil
Expand All @@ -310,7 +291,6 @@ func (s *managerServerV2) createSeedPeer(ctx context.Context, req *managerv2.Upd
IP: req.GetIp(),
Port: req.GetPort(),
DownloadPort: req.GetDownloadPort(),
ObjectStoragePort: req.GetObjectStoragePort(),
State: models.SeedPeerStateActive,
SeedPeerClusterID: uint(req.GetSeedPeerClusterId()),
}
Expand All @@ -328,7 +308,6 @@ func (s *managerServerV2) createSeedPeer(ctx context.Context, req *managerv2.Upd
Ip: seedPeer.IP,
Port: seedPeer.Port,
DownloadPort: seedPeer.DownloadPort,
ObjectStoragePort: seedPeer.ObjectStoragePort,
SeedPeerClusterId: uint64(seedPeer.SeedPeerClusterID),
State: seedPeer.State,
}, nil
Expand Down Expand Up @@ -412,7 +391,6 @@ func (s *managerServerV2) GetScheduler(ctx context.Context, req *managerv2.GetSc
Ip: seedPeer.IP,
Port: seedPeer.Port,
DownloadPort: seedPeer.DownloadPort,
ObjectStoragePort: seedPeer.ObjectStoragePort,
State: seedPeer.State,
SeedPeerClusterId: uint64(seedPeer.SeedPeerClusterID),
SeedPeerCluster: &managerv2.SeedPeerCluster{
Expand Down Expand Up @@ -631,7 +609,6 @@ func (s *managerServerV2) ListSchedulers(ctx context.Context, req *managerv2.Lis
Ip: seedPeer.IP,
Port: seedPeer.Port,
DownloadPort: seedPeer.DownloadPort,
ObjectStoragePort: seedPeer.ObjectStoragePort,
State: seedPeer.State,
SeedPeerClusterId: uint64(seedPeer.SeedPeerClusterID),
})
Expand Down Expand Up @@ -678,66 +655,6 @@ func (s *managerServerV2) ListSchedulers(ctx context.Context, req *managerv2.Lis
return &pbListSchedulersResponse, nil
}

// Get object storage configuration.
func (s *managerServerV2) GetObjectStorage(ctx context.Context, req *managerv2.GetObjectStorageRequest) (*managerv2.ObjectStorage, error) {
if !s.config.ObjectStorage.Enable {
return nil, status.Error(codes.NotFound, "object storage is disabled")
}

return &managerv2.ObjectStorage{
Name: s.config.ObjectStorage.Name,
Region: s.config.ObjectStorage.Region,
Endpoint: s.config.ObjectStorage.Endpoint,
AccessKey: s.config.ObjectStorage.AccessKey,
SecretKey: s.config.ObjectStorage.SecretKey,
S3ForcePathStyle: s.config.ObjectStorage.S3ForcePathStyle,
}, nil
}

// List buckets configuration.
func (s *managerServerV2) ListBuckets(ctx context.Context, req *managerv2.ListBucketsRequest) (*managerv2.ListBucketsResponse, error) {
if !s.config.ObjectStorage.Enable {
return nil, status.Error(codes.NotFound, "object storage is disabled")
}

log := logger.WithHostnameAndIP(req.Hostname, req.Ip)
var pbListBucketsResponse managerv2.ListBucketsResponse
cacheKey := pkgredis.MakeBucketKeyInManager(s.config.ObjectStorage.Name)

// Cache hit.
if err := s.cache.Get(ctx, cacheKey, &pbListBucketsResponse); err != nil {
log.Warnf("%s cache miss because of %s", cacheKey, err.Error())
} else {
log.Debugf("%s cache hit", cacheKey)
return &pbListBucketsResponse, nil
}

// Cache miss and search buckets.
buckets, err := s.objectStorage.ListBucketMetadatas(ctx)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

// Construct schedulers.
for _, bucket := range buckets {
pbListBucketsResponse.Buckets = append(pbListBucketsResponse.Buckets, &managerv2.Bucket{
Name: bucket.Name,
})
}

// Cache data.
if err := s.cache.Once(&cachev8.Item{
Ctx: ctx,
Key: cacheKey,
Value: &pbListBucketsResponse,
TTL: s.cache.TTL,
}); err != nil {
log.Error(err)
}

return &pbListBucketsResponse, nil
}

// List applications configuration.
func (s *managerServerV2) ListApplications(ctx context.Context, req *managerv2.ListApplicationsRequest) (*managerv2.ListApplicationsResponse, error) {
log := logger.WithHostnameAndIP(req.Hostname, req.Ip)
Expand Down Expand Up @@ -812,158 +729,6 @@ func (s *managerServerV2) ListApplications(ctx context.Context, req *managerv2.L
return &pbListApplicationsResponse, nil
}

// CreateModel creates model and update data of model to object storage.
func (s *managerServerV2) CreateModel(ctx context.Context, req *managerv2.CreateModelRequest) (*emptypb.Empty, error) {
log := logger.WithHostnameAndIP(req.GetHostname(), req.GetIp())

if !s.config.ObjectStorage.Enable {
log.Warn("object storage is disabled")
return nil, status.Error(codes.Internal, "object storage is disabled")
}

// Create model bucket, if not exist.
if err := s.createModelBucket(ctx); err != nil {
log.Error(err)
return nil, status.Error(codes.Internal, err.Error())
}

var (
name string
typ string
evaluation types.ModelEvaluation
version = time.Now().Nanosecond()
)
switch createModelRequest := req.GetRequest().(type) {
case *managerv2.CreateModelRequest_CreateGnnRequest:
name = idgen.GNNModelIDV1(req.GetIp(), req.GetHostname())
typ = models.ModelTypeGNN
evaluation = types.ModelEvaluation{
Precision: createModelRequest.CreateGnnRequest.GetPrecision(),
Recall: createModelRequest.CreateGnnRequest.GetRecall(),
F1Score: createModelRequest.CreateGnnRequest.GetF1Score(),
}

// Update GNN model config to object storage.
if err := s.createModelConfig(ctx, name); err != nil {
log.Error(err)
return nil, status.Error(codes.Internal, err.Error())
}

// Upload GNN model file to object storage.
data := createModelRequest.CreateGnnRequest.GetData()
dgst := digest.New(digest.AlgorithmSHA256, digest.SHA256FromBytes(data))
if err := s.objectStorage.PutObject(ctx, s.config.Trainer.BucketName,
types.MakeObjectKeyOfModelFile(name, version), dgst.String(), bytes.NewReader(data)); err != nil {
log.Error(err)
return nil, status.Error(codes.Internal, err.Error())
}
case *managerv2.CreateModelRequest_CreateMlpRequest:
name = idgen.MLPModelIDV1(req.GetHostname(), req.GetIp())
typ = models.ModelTypeMLP
evaluation = types.ModelEvaluation{
MSE: createModelRequest.CreateMlpRequest.GetMse(),
MAE: createModelRequest.CreateMlpRequest.GetMae(),
}

// Update MLP model config to object storage.
if err := s.createModelConfig(ctx, name); err != nil {
log.Error(err)
return nil, status.Error(codes.Internal, err.Error())
}

// Upload MLP model file to object storage.
data := createModelRequest.CreateMlpRequest.GetData()
dgst := digest.New(digest.AlgorithmSHA256, digest.SHA256FromBytes(data))
if err := s.objectStorage.PutObject(ctx, s.config.Trainer.BucketName,
types.MakeObjectKeyOfModelFile(name, version), dgst.String(), bytes.NewReader(data)); err != nil {
log.Error(err)
return nil, status.Error(codes.Internal, err.Error())
}
default:
msg := fmt.Sprintf("receive unknow request: %#v", createModelRequest)
log.Error(msg)
return nil, status.Error(codes.FailedPrecondition, msg)
}

scheduler := models.Scheduler{}
if err := s.db.WithContext(ctx).First(&scheduler, &models.Scheduler{
Hostname: req.Hostname,
IP: req.Ip,
}).Error; err != nil {
log.Error(err)
return nil, status.Error(codes.Internal, err.Error())
}

rawEvaluation, err := structure.StructToMap(evaluation)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

// Create model in database.
if err := s.db.WithContext(ctx).Model(&scheduler).Association("Models").Append(&models.Model{
Type: typ,
Version: fmt.Sprint(version),
State: models.ModelVersionStateInactive,
Evaluation: rawEvaluation,
}); err != nil {
log.Error(err)
return nil, status.Error(codes.Internal, err.Error())
}

return new(emptypb.Empty), nil
}

// createModelBucket creates model bucket if not exist.
func (s *managerServerV2) createModelBucket(ctx context.Context) error {
// Check bucket exist.
isExist, err := s.objectStorage.IsBucketExist(ctx, s.config.Trainer.BucketName)
if err != nil {
return err
}

// Create bucket if not exist.
if !isExist {
if err := s.objectStorage.CreateBucket(ctx, s.config.Trainer.BucketName); err != nil {
return err
}
}

return nil
}

// createModelConfig creates model config to object storage.
func (s *managerServerV2) createModelConfig(ctx context.Context, name string) error {
objectKey := types.MakeObjectKeyOfModelConfigFile(name)
isExist, err := s.objectStorage.IsObjectExist(ctx, s.config.Trainer.BucketName, objectKey)
if err != nil {
return err
}

// If the model config already exists, skip it.
if isExist {
return nil
}

// If the model config does not exist, create a new model config.
pbModelConfig := inferencev1.ModelConfig{
Name: name,
Platform: types.DefaultTritonPlatform,
VersionPolicy: &inferencev1.ModelVersionPolicy{
PolicyChoice: &inferencev1.ModelVersionPolicy_Specific_{
Specific: &inferencev1.ModelVersionPolicy_Specific{Versions: []int64{}},
},
},
}

dgst := digest.New(digest.AlgorithmSHA256, digest.SHA256FromStrings(pbModelConfig.String()))
if err := s.objectStorage.PutObject(ctx, s.config.Trainer.BucketName,
types.MakeObjectKeyOfModelConfigFile(name), dgst.String(), strings.NewReader(pbModelConfig.String())); err != nil {
return err
}

return nil
}

// KeepAlive with manager.
func (s *managerServerV2) KeepAlive(stream managerv2.Manager_KeepAliveServer) error {
req, err := stream.Recv()
Expand Down
2 changes: 1 addition & 1 deletion manager/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func New(

return s, managerserver.New(
newManagerServerV1(s.config, database, s.cache, s.searcher, s.objectStorage),
newManagerServerV2(s.config, database, s.cache, s.searcher, s.objectStorage),
newManagerServerV2(s.config, database, s.cache, s.searcher),
newSecurityServerV1(s.selfSignedCert),
s.serverOptions...), nil
}
Expand Down
Loading

0 comments on commit 87f9fcc

Please sign in to comment.