Skip to content

Commit

Permalink
fix: update delete task rpc and create e2e test. (#3447)
Browse files Browse the repository at this point in the history
Signed-off-by: Asklv <[email protected]>
  • Loading branch information
IRONICBo authored Sep 19, 2024
1 parent e61d6a1 commit c1dad7c
Show file tree
Hide file tree
Showing 14 changed files with 694 additions and 83 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/compatibility-e2e-v2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ jobs:
include:
- module: manager
image: manager
image-tag: v2.1.55-alpha
image-tag: v2.1.56
chart-name: manager
- module: scheduler
image: scheduler
image-tag: v2.1.55-alpha
image-tag: v2.1.56
chart-name: scheduler
- module: client
image: client
Expand Down
42 changes: 35 additions & 7 deletions internal/job/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ package job
import (
"time"

"github.com/bits-and-blooms/bitset"

nethttp "d7y.io/dragonfly/v2/pkg/net/http"
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/resource"
)

Expand Down Expand Up @@ -65,7 +69,25 @@ type GetTaskRequest struct {

// GetTaskResponse defines the response parameters for getting task.
type GetTaskResponse struct {
Peers []*resource.Peer `json:"peers"`
Peers []*Peer `json:"peers"`
SchedulerClusterID uint `json:"scheduler_cluster_id"`
}

// Peer represents the peer information.
type Peer struct {
ID string `json:"id"`
Config *config.ResourceConfig `json:"config,omitempty"`
Range *nethttp.Range `json:"range,omitempty"`
Priority int32 `json:"priority"`
Pieces map[int32]*resource.Piece `json:"pieces,omitempty"`
FinishedPieces *bitset.BitSet `json:"finished_pieces,omitempty"`
PieceCosts []time.Duration `json:"piece_costs"`
Cost time.Duration `json:"cost,omitempty"`
BlockParents []string `json:"block_parents"`
NeedBackToSource bool `json:"need_back_to_source"`
PieceUpdatedAt time.Time `json:"piece_updated_at"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}

// DeleteTaskRequest defines the request parameters for deleting task.
Expand All @@ -76,12 +98,18 @@ type DeleteTaskRequest struct {

// DeleteTaskResponse defines the response parameters for deleting task.
type DeleteTaskResponse struct {
SuccessPeers []*DeletePeerResponse `json:"success_peers"`
FailurePeers []*DeletePeerResponse `json:"failure_peers"`
SuccessPeers []*DeleteSuccessPeer `json:"success_peers"`
FailurePeers []*DeleteFailurePeer `json:"failure_peers"`
SchedulerClusterID uint `json:"scheduler_cluster_id"`
}

// DeletePeerResponse represents the response after attempting to delete a peer.
type DeletePeerResponse struct {
Peer *resource.Peer `json:"peer"`
Description string `json:"description"`
// DeleteSuccessPeer defines the response parameters for deleting peer successfully.
type DeleteSuccessPeer struct {
Peer
}

// DeleteFailurePeer defines the response parameters for deleting peer failed.
type DeleteFailurePeer struct {
Peer
Description string `json:"description"`
}
6 changes: 3 additions & 3 deletions manager/handlers/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestHandlers_CreateJob(t *testing.T) {
},
},
{
name: "success",
name: "create preheat job success",
req: httptest.NewRequest(http.MethodPost, "/oapi/v1/jobs", strings.NewReader(mockPreheatJobReqBody)),
mock: func(ms *mocks.MockServiceMockRecorder) {
ms.CreatePreheatJob(gomock.Any(), gomock.Eq(mockPreheatCreateJobRequest)).Return(mockPreheatJobModel, nil).Times(1)
Expand All @@ -152,7 +152,7 @@ func TestHandlers_CreateJob(t *testing.T) {
},
},
{
name: "success",
name: "create get task job success",
req: httptest.NewRequest(http.MethodPost, "/oapi/v1/jobs", strings.NewReader(mockGetTaskJobReqBody)),
mock: func(ms *mocks.MockServiceMockRecorder) {
ms.CreateGetTaskJob(gomock.Any(), gomock.Eq(mockCreateGetTaskJobRequest)).Return(mockGetTaskJobModel, nil).Times(1)
Expand All @@ -167,7 +167,7 @@ func TestHandlers_CreateJob(t *testing.T) {
},
},
{
name: "success",
name: "create delete task job success",
req: httptest.NewRequest(http.MethodPost, "/oapi/v1/jobs", strings.NewReader(mockDeleteTaskJobReqBody)),
mock: func(ms *mocks.MockServiceMockRecorder) {
ms.CreateDeleteTaskJob(gomock.Any(), gomock.Eq(mockCreateDeleteTaskJobRequest)).Return(mockDeleteTaskJobModel, nil).Times(1)
Expand Down
39 changes: 21 additions & 18 deletions manager/job/mocks/task_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 36 additions & 13 deletions manager/job/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ import (

// Task is an interface for manager tasks.
type Task interface {
// CreateDeleteTask create a delete task job
// CreateDeleteTask create a delete task job.
CreateDeleteTask(context.Context, []models.Scheduler, types.DeleteTaskArgs) (*internaljob.GroupJobState, error)

// CreateGetTask create a get task job
// CreateGetTask create a get task job.
CreateGetTask(context.Context, []models.Scheduler, types.GetTaskArgs) (*internaljob.GroupJobState, error)
}

Expand All @@ -53,7 +53,7 @@ func newTask(job *internaljob.Job) Task {
return &task{job}
}

// CreateDeleteTask create a delete task job
// CreateDeleteTask create a delete task job.
func (t *task) CreateDeleteTask(ctx context.Context, schedulers []models.Scheduler, json types.DeleteTaskArgs) (*internaljob.GroupJobState, error) {
var span trace.Span
ctx, span = tracer.Start(ctx, config.SpanDeleteTask, trace.WithSpanKind(trace.SpanKindProducer))
Expand All @@ -66,16 +66,45 @@ func (t *task) CreateDeleteTask(ctx context.Context, schedulers []models.Schedul
return nil, err
}

// Initialize queues.
queues, err := getSchedulerQueues(schedulers)
if err != nil {
return nil, err
}

return t.createGroupJob(ctx, internaljob.DeleteTaskJob, args, queues)
var signatures []*machineryv1tasks.Signature
for _, queue := range queues {
signatures = append(signatures, &machineryv1tasks.Signature{
UUID: fmt.Sprintf("task_%s", uuid.New().String()),
Name: internaljob.DeleteTaskJob,
RoutingKey: queue.String(),
Args: args,
})
}

group, err := machineryv1tasks.NewGroup(signatures...)
if err != nil {
return nil, err
}

var tasks []machineryv1tasks.Signature
for _, signature := range signatures {
tasks = append(tasks, *signature)
}

logger.Infof("create task group %s in queues %v, tasks: %#v", group.GroupUUID, queues, tasks)
if _, err := t.job.Server.SendGroupWithContext(ctx, group, 0); err != nil {
logger.Errorf("create preheat group %s failed", group.GroupUUID, err)
return nil, err
}

return &internaljob.GroupJobState{
GroupUUID: group.GroupUUID,
State: machineryv1tasks.StatePending,
CreatedAt: time.Now(),
}, nil
}

// CreateGetTask create a get task job
// CreateGetTask create a get task job.
func (t *task) CreateGetTask(ctx context.Context, schedulers []models.Scheduler, json types.GetTaskArgs) (*internaljob.GroupJobState, error) {
var span trace.Span
ctx, span = tracer.Start(ctx, config.SpanGetTask, trace.WithSpanKind(trace.SpanKindProducer))
Expand All @@ -88,22 +117,16 @@ func (t *task) CreateGetTask(ctx context.Context, schedulers []models.Scheduler,
return nil, err
}

// Initialize queues.
queues, err := getSchedulerQueues(schedulers)
if err != nil {
return nil, err
}

return t.createGroupJob(ctx, internaljob.GetTaskJob, args, queues)
}

// createGroupJob creates a group job.
func (t *task) createGroupJob(ctx context.Context, name string, args []machineryv1tasks.Arg, queues []internaljob.Queue) (*internaljob.GroupJobState, error) {
var signatures []*machineryv1tasks.Signature
for _, queue := range queues {
signatures = append(signatures, &machineryv1tasks.Signature{
UUID: fmt.Sprintf("task_%s", uuid.New().String()),
Name: name,
Name: internaljob.GetTaskJob,
RoutingKey: queue.String(),
Args: args,
})
Expand Down
Loading

0 comments on commit c1dad7c

Please sign in to comment.