Skip to content

Commit

Permalink
feat: support preheating by v2 grpc protocol (#3201)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Apr 17, 2024
1 parent 501a7be commit 4abb179
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 46 deletions.
1 change: 1 addition & 0 deletions internal/job/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type PreheatRequest struct {
Headers map[string]string `json:"headers" validate:"omitempty"`
Application string `json:"application" validate:"omitempty"`
Priority int32 `json:"priority" validate:"omitempty"`
PieceLength uint32 `json:"pieceLength" validate:"omitempty"`
}

type PreheatResponse struct {
Expand Down
2 changes: 2 additions & 0 deletions manager/job/preheat.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Schedul
URL: json.URL,
Tag: json.Tag,
FilteredQueryParams: json.FilteredQueryParams,
PieceLength: json.PieceLength,
Headers: json.Headers,
},
}
Expand Down Expand Up @@ -304,6 +305,7 @@ func (p *preheat) parseLayers(manifests []distribution.Manifest, args types.Preh
URL: image.blobsURL(v.Digest.String()),
Tag: args.Tag,
FilteredQueryParams: args.FilteredQueryParams,
PieceLength: args.PieceLength,
Headers: nethttp.HeaderToMap(header),
}

Expand Down
4 changes: 4 additions & 0 deletions manager/service/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheat
return nil, err
}

if json.Args.PieceLength == 0 {
json.Args.PieceLength = types.DefaultPreheatJobPieceLength
}

groupJobState, err := s.job.CreatePreheat(ctx, candidateSchedulers, json.Args)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions manager/service/preheat.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func (s *service) CreateV1Preheat(ctx context.Context, json types.CreateV1Prehea
Type: json.Type,
URL: json.URL,
FilteredQueryParams: json.FilteredQueryParams,
PieceLength: types.DefaultPreheatJobPieceLength,
Headers: json.Headers,
},
})
Expand Down
8 changes: 8 additions & 0 deletions manager/types/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@

package types

const (
// DefaultPreheatJobPieceLength is the default piece length for preheating.
DefaultPreheatJobPieceLength = 4 * 1024 * 1024
)

type CreateJobRequest struct {
BIO string `json:"bio" binding:"omitempty"`
Type string `json:"type" binding:"required"`
Expand Down Expand Up @@ -65,6 +70,9 @@ type PreheatArgs struct {
// FilteredQueryParams is the filtered query params for preheating.
FilteredQueryParams string `json:"filteredQueryParams" binding:"omitempty"`

// PieceLength is the piece length for preheating.
PieceLength uint32 `json:"pieceLength" binding:"omitempty"`

// Headers is the http headers for authentication.
Headers map[string]string `json:"headers" binding:"omitempty"`

Expand Down
4 changes: 2 additions & 2 deletions scheduler/config/dynconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,9 +419,9 @@ func (mc *managerClient) Get() (any, error) {
Ip: mc.config.Server.AdvertiseIP.String(),
})
if err != nil {
if s, ok := status.FromError(err); ok {
if st, ok := status.FromError(err); ok {
// TODO Compatible with old version manager.
if slices.Contains([]codes.Code{codes.Unimplemented, codes.NotFound}, s.Code()) {
if slices.Contains([]codes.Code{codes.Unimplemented, codes.NotFound}, st.Code()) {
return DynconfigData{
Scheduler: getSchedulerResp,
Applications: nil,
Expand Down
121 changes: 91 additions & 30 deletions scheduler/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,23 @@ import (
"context"
"errors"
"fmt"
"io"
"strings"
"time"

"github.com/RichardKnop/machinery/v1"
"github.com/go-http-utils/headers"
"github.com/go-playground/validator/v10"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

cdnsystemv1 "d7y.io/api/v2/pkg/apis/cdnsystem/v1"
commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2"

logger "d7y.io/dragonfly/v2/internal/dflog"
internaljob "d7y.io/dragonfly/v2/internal/job"
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/net/http"
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/resource"
)
Expand Down Expand Up @@ -148,8 +151,9 @@ func (j *job) Serve() {
}()
}

// preheat is a job to preheat.
func (j *job) preheat(ctx context.Context, req string) error {
// preheat is a job to preheat, it is not supported to preheat
// with range requests.
func (j *job) preheat(ctx context.Context, data string) error {
ctx, cancel := context.WithTimeout(ctx, preheatTimeout)
defer cancel()

Expand All @@ -163,63 +167,120 @@ func (j *job) preheat(ctx context.Context, req string) error {
return fmt.Errorf("cluster %d scheduler %s has no available seed peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP)
}

preheat := &internaljob.PreheatRequest{}
if err := internaljob.UnmarshalRequest(req, preheat); err != nil {
logger.Errorf("unmarshal request err: %s, request body: %s", err.Error(), req)
req := &internaljob.PreheatRequest{}
if err := internaljob.UnmarshalRequest(data, req); err != nil {
logger.Errorf("unmarshal request err: %s, request body: %s", err.Error(), data)
return err
}

if err := validator.New().Struct(preheat); err != nil {
logger.Errorf("preheat %s validate failed: %s", preheat.URL, err.Error())
if err := validator.New().Struct(req); err != nil {
logger.Errorf("preheat %s validate failed: %s", req.URL, err.Error())
return err
}

urlMeta := &commonv1.UrlMeta{
Digest: preheat.Digest,
Tag: preheat.Tag,
Filter: preheat.FilteredQueryParams,
Header: preheat.Headers,
Application: preheat.Application,
Priority: commonv1.Priority(preheat.Priority),
}
if preheat.Headers != nil {
if r, ok := preheat.Headers[headers.Range]; ok {
// Range in dragonfly is without "bytes=".
urlMeta.Range = strings.TrimPrefix(r, http.RangePrefix)
// Preheat by v2 grpc protocol. If seed peer does not support
// v2 protocol, preheat by v1 grpc protocol.
if err := j.preheatV2(ctx, req); err != nil {
logger.Errorf("preheat %s failed: %s", req.URL, err.Error())

if st, ok := status.FromError(err); ok {
if st.Code() == codes.Unimplemented {
if err := j.preheatV1(ctx, req); err != nil {
return err
}

return nil
}
}

return err
}

return nil
}

// preheatV1 preheats job by v1 grpc protocol.
func (j *job) preheatV1(ctx context.Context, req *internaljob.PreheatRequest) error {
urlMeta := &commonv1.UrlMeta{
Digest: req.Digest,
Tag: req.Tag,
Filter: req.FilteredQueryParams,
Header: req.Headers,
Application: req.Application,
Priority: commonv1.Priority(req.Priority),
}

// Trigger seed peer download seeds.
taskID := idgen.TaskIDV1(preheat.URL, urlMeta)
log := logger.WithTask(taskID, preheat.URL)
log.Infof("preheat %s tag: %s, range: %s, filtered query params: %s, digest: %s",
preheat.URL, urlMeta.Tag, urlMeta.Range, urlMeta.Filter, urlMeta.Digest)
log.Debugf("preheat %s headers: %#v", preheat.URL, urlMeta.Header)
taskID := idgen.TaskIDV1(req.URL, urlMeta)
log := logger.WithTask(taskID, req.URL)
log.Infof("preheat(v1) %s tag: %s, filtered query params: %s, digest: %s, headers: %#v",
req.URL, urlMeta.Tag, urlMeta.Filter, urlMeta.Digest, urlMeta.Header)

stream, err := j.resource.SeedPeer().Client().ObtainSeeds(ctx, &cdnsystemv1.SeedRequest{
TaskId: taskID,
Url: preheat.URL,
Url: req.URL,
UrlMeta: urlMeta,
})
if err != nil {
log.Errorf("preheat %s failed: %s", preheat.URL, err.Error())
log.Errorf("preheat(v1) %s failed: %s", req.URL, err.Error())
return err
}

for {
piece, err := stream.Recv()
if err != nil {
log.Errorf("preheat %s recive piece failed: %s", preheat.URL, err.Error())
log.Errorf("preheat(v1) %s recive piece failed: %s", req.URL, err.Error())
return err
}

if piece.Done == true {
log.Infof("preheat %s succeeded", preheat.URL)
log.Infof("preheat(v1) %s succeeded", req.URL)
return nil
}
}
}

// preheatV2 preheats job by v2 grpc protocol.
func (j *job) preheatV2(ctx context.Context, req *internaljob.PreheatRequest) error {
filteredQueryParams := strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator)
taskID := idgen.TaskIDV2(req.URL, req.Digest, req.Tag, req.Application, int32(req.PieceLength), filteredQueryParams)

log := logger.WithTask(taskID, req.URL)
log.Infof("preheat(v2) %s tag: %s, filtered query params: %s, digest: %s, headers: %#v",
req.URL, req.Tag, req.FilteredQueryParams, req.Digest, req.Headers)

stream, err := j.resource.SeedPeer().Client().DownloadTask(ctx, taskID, &dfdaemonv2.DownloadTaskRequest{
Download: &commonv2.Download{
Url: req.URL,
Digest: &req.Digest,
Type: commonv2.TaskType_DFDAEMON,
Tag: &req.Tag,
Application: &req.Application,
Priority: commonv2.Priority(req.Priority),
FilteredQueryParams: filteredQueryParams,
RequestHeader: req.Headers,
PieceLength: uint32(req.PieceLength),
}})
if err != nil {
logger.Errorf("preheat(v2) %s failed: %s", req.URL, err.Error())
return err
}

// Wait for the download task to complete.
for {
_, err := stream.Recv()
if err != nil {
if err == io.EOF {
log.Infof("preheat(v2) %s succeeded", req.URL)
return nil
}

log.Errorf("preheat(v2) %s recive piece failed: %s", req.URL, err.Error())
return err
}
}
}

// syncPeers is a job to sync peers.
func (j *job) syncPeers() (string, error) {
var hosts []*resource.Host
Expand Down
37 changes: 23 additions & 14 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,7 @@ func (v *V2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesServer) error {
// handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest.
func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error {
// Handle resource included host, task, and peer.
_, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req.GetDownload())
host, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req.GetDownload())
if err != nil {
return err
}
Expand All @@ -836,7 +836,7 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S
// If scheduler trigger seed peer download back-to-source,
// the needBackToSource flag should be true.
case download.GetNeedBackToSource():
peer.Log.Infof("peer need back to source")
peer.Log.Info("peer need back to source")
peer.NeedBackToSource.Store(true)
// If task is pending, failed, leave, or succeeded and has no available peer,
// scheduler trigger seed peer download back-to-source.
Expand All @@ -845,18 +845,27 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S
task.FSM.Is(resource.TaskStateLeave) ||
task.FSM.Is(resource.TaskStateSucceeded) &&
!task.HasAvailablePeer(blocklist):
// If trigger the seed peer download back-to-source,
// the need back-to-source flag should be true.
download.NeedBackToSource = true

// Output path should be empty, prevent the seed peer
// copy file to output path.
download.OutputPath = nil
if err := v.downloadTaskBySeedPeer(ctx, taskID, download, peer); err != nil {
// Collect RegisterPeerFailureCount metrics.
metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
return err

// If HostType is normal, trigger seed peer download back-to-source.
if host.Type == types.HostTypeNormal {
// If trigger the seed peer download back-to-source,
// the need back-to-source flag should be true.
download.NeedBackToSource = true

// Output path should be empty, prevent the seed peer
// copy file to output path.
download.OutputPath = nil
if err := v.downloadTaskBySeedPeer(ctx, taskID, download, peer); err != nil {
// Collect RegisterPeerFailureCount metrics.
metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
return err
}
} else {
// If HostType is not normal, peer is seed peer, and
// trigger seed peer download back-to-source directly.
peer.Log.Info("peer need back to source")
peer.NeedBackToSource.Store(true)
}
}

Expand Down

0 comments on commit 4abb179

Please sign in to comment.