Skip to content

Commit

Permalink
refactor: get task job and delete task job (#3522)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Sep 20, 2024
1 parent fbb3ee3 commit 7bcbac9
Show file tree
Hide file tree
Showing 11 changed files with 240 additions and 420 deletions.
48 changes: 20 additions & 28 deletions internal/job/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@ package job

import (
"time"

"github.com/bits-and-blooms/bitset"

nethttp "d7y.io/dragonfly/v2/pkg/net/http"
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/resource"
)

// PreheatRequest defines the request parameters for preheating.
Expand Down Expand Up @@ -64,7 +58,8 @@ type PreheatFailureTask struct {

// GetTaskRequest defines the request parameters for getting task.
type GetTaskRequest struct {
TaskID string `json:"task_id" validate:"required"`
TaskID string `json:"task_id" validate:"required"`
Timeout time.Duration `json:"timeout" validate:"omitempty"`
}

// GetTaskResponse defines the response parameters for getting task.
Expand All @@ -75,19 +70,12 @@ type GetTaskResponse struct {

// Peer represents the peer information.
type Peer struct {
ID string `json:"id"`
Config *config.ResourceConfig `json:"config,omitempty"`
Range *nethttp.Range `json:"range,omitempty"`
Priority int32 `json:"priority"`
Pieces map[int32]*resource.Piece `json:"pieces,omitempty"`
FinishedPieces *bitset.BitSet `json:"finished_pieces,omitempty"`
PieceCosts []time.Duration `json:"piece_costs"`
Cost time.Duration `json:"cost,omitempty"`
BlockParents []string `json:"block_parents"`
NeedBackToSource bool `json:"need_back_to_source"`
PieceUpdatedAt time.Time `json:"piece_updated_at"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
ID string `json:"id"`
Hostname string `json:"hostname"`
IP string `json:"ip"`
HostType string `json:"host_type"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}

// DeleteTaskRequest defines the request parameters for deleting task.
Expand All @@ -98,18 +86,22 @@ type DeleteTaskRequest struct {

// DeleteTaskResponse defines the response parameters for deleting task.
type DeleteTaskResponse struct {
SuccessPeers []*DeleteSuccessPeer `json:"success_peers"`
FailurePeers []*DeleteFailurePeer `json:"failure_peers"`
SuccessTasks []*DeleteSuccessTask `json:"success_tasks"`
FailureTasks []*DeleteFailureTask `json:"failure_tasks"`
SchedulerClusterID uint `json:"scheduler_cluster_id"`
}

// DeleteSuccessPeer defines the response parameters for deleting peer successfully.
type DeleteSuccessPeer struct {
Peer
// DeleteSuccessTask defines the response parameters for deleting peer successfully.
type DeleteSuccessTask struct {
Hostname string `json:"hostname"`
IP string `json:"ip"`
HostType string `json:"host_type"`
}

// DeleteFailurePeer defines the response parameters for deleting peer failed.
type DeleteFailurePeer struct {
Peer
// DeleteFailureTask defines the response parameters for deleting peer failed.
type DeleteFailureTask struct {
Hostname string `json:"hostname"`
IP string `json:"ip"`
HostType string `json:"host_type"`
Description string `json:"description"`
}
3 changes: 1 addition & 2 deletions manager/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,11 @@ func New(cfg *config.Config, gdb *gorm.DB) (*Job, error) {
return nil, err
}

task := newTask(j)
return &Job{
Job: j,
Preheat: preheat,
SyncPeers: syncPeers,
Task: task,
Task: newTask(j),
GC: gc,
}, nil
}
Expand Down
39 changes: 18 additions & 21 deletions manager/job/mocks/task_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 17 additions & 17 deletions manager/job/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ import (

// Task is an interface for manager tasks.
type Task interface {
// CreateDeleteTask create a delete task job.
CreateDeleteTask(context.Context, []models.Scheduler, types.DeleteTaskArgs) (*internaljob.GroupJobState, error)

// CreateGetTask create a get task job.
CreateGetTask(context.Context, []models.Scheduler, types.GetTaskArgs) (*internaljob.GroupJobState, error)

// CreateDeleteTask create a delete task job.
CreateDeleteTask(context.Context, []models.Scheduler, types.DeleteTaskArgs) (*internaljob.GroupJobState, error)
}

// task is an implementation of Task.
Expand All @@ -53,16 +53,16 @@ func newTask(job *internaljob.Job) Task {
return &task{job}
}

// CreateDeleteTask create a delete task job.
func (t *task) CreateDeleteTask(ctx context.Context, schedulers []models.Scheduler, json types.DeleteTaskArgs) (*internaljob.GroupJobState, error) {
// CreateGetTask create a get task job.
func (t *task) CreateGetTask(ctx context.Context, schedulers []models.Scheduler, json types.GetTaskArgs) (*internaljob.GroupJobState, error) {
var span trace.Span
ctx, span = tracer.Start(ctx, config.SpanDeleteTask, trace.WithSpanKind(trace.SpanKindProducer))
span.SetAttributes(config.AttributeDeleteTaskID.String(json.TaskID))
ctx, span = tracer.Start(ctx, config.SpanGetTask, trace.WithSpanKind(trace.SpanKindProducer))
span.SetAttributes(config.AttributeGetTaskID.String(json.TaskID))
defer span.End()

args, err := internaljob.MarshalRequest(json)
if err != nil {
logger.Errorf("delete task marshal request: %v, error: %v", args, err)
logger.Errorf("get tasks marshal request: %v, error: %v", args, err)
return nil, err
}

Expand All @@ -75,7 +75,7 @@ func (t *task) CreateDeleteTask(ctx context.Context, schedulers []models.Schedul
for _, queue := range queues {
signatures = append(signatures, &machineryv1tasks.Signature{
UUID: fmt.Sprintf("task_%s", uuid.New().String()),
Name: internaljob.DeleteTaskJob,
Name: internaljob.GetTaskJob,
RoutingKey: queue.String(),
Args: args,
})
Expand All @@ -93,7 +93,7 @@ func (t *task) CreateDeleteTask(ctx context.Context, schedulers []models.Schedul

logger.Infof("create task group %s in queues %v, tasks: %#v", group.GroupUUID, queues, tasks)
if _, err := t.job.Server.SendGroupWithContext(ctx, group, 0); err != nil {
logger.Errorf("create preheat group %s failed", group.GroupUUID, err)
logger.Errorf("create task group %s failed", group.GroupUUID, err)
return nil, err
}

Expand All @@ -104,16 +104,16 @@ func (t *task) CreateDeleteTask(ctx context.Context, schedulers []models.Schedul
}, nil
}

// CreateGetTask create a get task job.
func (t *task) CreateGetTask(ctx context.Context, schedulers []models.Scheduler, json types.GetTaskArgs) (*internaljob.GroupJobState, error) {
// CreateDeleteTask create a delete task job.
func (t *task) CreateDeleteTask(ctx context.Context, schedulers []models.Scheduler, json types.DeleteTaskArgs) (*internaljob.GroupJobState, error) {
var span trace.Span
ctx, span = tracer.Start(ctx, config.SpanGetTask, trace.WithSpanKind(trace.SpanKindProducer))
span.SetAttributes(config.AttributeGetTaskID.String(json.TaskID))
ctx, span = tracer.Start(ctx, config.SpanDeleteTask, trace.WithSpanKind(trace.SpanKindProducer))
span.SetAttributes(config.AttributeDeleteTaskID.String(json.TaskID))
defer span.End()

args, err := internaljob.MarshalRequest(json)
if err != nil {
logger.Errorf("list tasks marshal request: %v, error: %v", args, err)
logger.Errorf("delete task marshal request: %v, error: %v", args, err)
return nil, err
}

Expand All @@ -126,7 +126,7 @@ func (t *task) CreateGetTask(ctx context.Context, schedulers []models.Scheduler,
for _, queue := range queues {
signatures = append(signatures, &machineryv1tasks.Signature{
UUID: fmt.Sprintf("task_%s", uuid.New().String()),
Name: internaljob.GetTaskJob,
Name: internaljob.DeleteTaskJob,
RoutingKey: queue.String(),
Args: args,
})
Expand All @@ -144,7 +144,7 @@ func (t *task) CreateGetTask(ctx context.Context, schedulers []models.Scheduler,

logger.Infof("create task group %s in queues %v, tasks: %#v", group.GroupUUID, queues, tasks)
if _, err := t.job.Server.SendGroupWithContext(ctx, group, 0); err != nil {
logger.Errorf("create task group %s failed", group.GroupUUID, err)
logger.Errorf("create preheat group %s failed", group.GroupUUID, err)
return nil, err
}

Expand Down
Loading

0 comments on commit 7bcbac9

Please sign in to comment.