From c1dad7c6b4b59e1291b7fb66703322739d4d11b9 Mon Sep 17 00:00:00 2001 From: Asklv <47499836+IRONICBo@users.noreply.github.com> Date: Thu, 19 Sep 2024 14:36:59 +0800 Subject: [PATCH] fix: update delete task rpc and create e2e test. (#3447) Signed-off-by: Asklv --- .github/workflows/compatibility-e2e-v2.yml | 4 +- internal/job/types.go | 42 +++- manager/handlers/job_test.go | 6 +- manager/job/mocks/task_mock.go | 39 ++-- manager/job/task.go | 49 +++-- manager/job/task_test.go | 202 ++++++++++++++++++ manager/service/job.go | 81 +++++-- pkg/rpc/dfdaemon/client/client_v2.go | 25 +++ .../dfdaemon/client/mocks/client_v2_mock.go | 39 ++++ scheduler/job/job.go | 92 ++++++-- scheduler/resource/seed_peer_client_mock.go | 39 ++++ test/e2e/v2/manager/preheat.go | 8 +- test/e2e/v2/manager/task.go | 137 ++++++++++++ test/e2e/v2/util/task.go | 14 ++ 14 files changed, 694 insertions(+), 83 deletions(-) create mode 100644 manager/job/task_test.go create mode 100644 test/e2e/v2/manager/task.go diff --git a/.github/workflows/compatibility-e2e-v2.yml b/.github/workflows/compatibility-e2e-v2.yml index 0bd64a388fc..3cf898f4f23 100644 --- a/.github/workflows/compatibility-e2e-v2.yml +++ b/.github/workflows/compatibility-e2e-v2.yml @@ -31,11 +31,11 @@ jobs: include: - module: manager image: manager - image-tag: v2.1.55-alpha + image-tag: v2.1.56 chart-name: manager - module: scheduler image: scheduler - image-tag: v2.1.55-alpha + image-tag: v2.1.56 chart-name: scheduler - module: client image: client diff --git a/internal/job/types.go b/internal/job/types.go index ff40032e908..179ecc02608 100644 --- a/internal/job/types.go +++ b/internal/job/types.go @@ -19,6 +19,10 @@ package job import ( "time" + "github.com/bits-and-blooms/bitset" + + nethttp "d7y.io/dragonfly/v2/pkg/net/http" + "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/resource" ) @@ -65,7 +69,25 @@ type GetTaskRequest struct { // GetTaskResponse defines the response parameters for getting task. type GetTaskResponse struct { - Peers []*resource.Peer `json:"peers"` + Peers []*Peer `json:"peers"` + SchedulerClusterID uint `json:"scheduler_cluster_id"` +} + +// Peer represents the peer information. +type Peer struct { + ID string `json:"id"` + Config *config.ResourceConfig `json:"config,omitempty"` + Range *nethttp.Range `json:"range,omitempty"` + Priority int32 `json:"priority"` + Pieces map[int32]*resource.Piece `json:"pieces,omitempty"` + FinishedPieces *bitset.BitSet `json:"finished_pieces,omitempty"` + PieceCosts []time.Duration `json:"piece_costs"` + Cost time.Duration `json:"cost,omitempty"` + BlockParents []string `json:"block_parents"` + NeedBackToSource bool `json:"need_back_to_source"` + PieceUpdatedAt time.Time `json:"piece_updated_at"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` } // DeleteTaskRequest defines the request parameters for deleting task. @@ -76,12 +98,18 @@ type DeleteTaskRequest struct { // DeleteTaskResponse defines the response parameters for deleting task. type DeleteTaskResponse struct { - SuccessPeers []*DeletePeerResponse `json:"success_peers"` - FailurePeers []*DeletePeerResponse `json:"failure_peers"` + SuccessPeers []*DeleteSuccessPeer `json:"success_peers"` + FailurePeers []*DeleteFailurePeer `json:"failure_peers"` + SchedulerClusterID uint `json:"scheduler_cluster_id"` } -// DeletePeerResponse represents the response after attempting to delete a peer. -type DeletePeerResponse struct { - Peer *resource.Peer `json:"peer"` - Description string `json:"description"` +// DeleteSuccessPeer defines the response parameters for deleting peer successfully. +type DeleteSuccessPeer struct { + Peer +} + +// DeleteFailurePeer defines the response parameters for deleting peer failed. +type DeleteFailurePeer struct { + Peer + Description string `json:"description"` } diff --git a/manager/handlers/job_test.go b/manager/handlers/job_test.go index 9eb311f9a3c..80dbdc379fa 100644 --- a/manager/handlers/job_test.go +++ b/manager/handlers/job_test.go @@ -137,7 +137,7 @@ func TestHandlers_CreateJob(t *testing.T) { }, }, { - name: "success", + name: "create preheat job success", req: httptest.NewRequest(http.MethodPost, "/oapi/v1/jobs", strings.NewReader(mockPreheatJobReqBody)), mock: func(ms *mocks.MockServiceMockRecorder) { ms.CreatePreheatJob(gomock.Any(), gomock.Eq(mockPreheatCreateJobRequest)).Return(mockPreheatJobModel, nil).Times(1) @@ -152,7 +152,7 @@ func TestHandlers_CreateJob(t *testing.T) { }, }, { - name: "success", + name: "create get task job success", req: httptest.NewRequest(http.MethodPost, "/oapi/v1/jobs", strings.NewReader(mockGetTaskJobReqBody)), mock: func(ms *mocks.MockServiceMockRecorder) { ms.CreateGetTaskJob(gomock.Any(), gomock.Eq(mockCreateGetTaskJobRequest)).Return(mockGetTaskJobModel, nil).Times(1) @@ -167,7 +167,7 @@ func TestHandlers_CreateJob(t *testing.T) { }, }, { - name: "success", + name: "create delete task job success", req: httptest.NewRequest(http.MethodPost, "/oapi/v1/jobs", strings.NewReader(mockDeleteTaskJobReqBody)), mock: func(ms *mocks.MockServiceMockRecorder) { ms.CreateDeleteTaskJob(gomock.Any(), gomock.Eq(mockCreateDeleteTaskJobRequest)).Return(mockDeleteTaskJobModel, nil).Times(1) diff --git a/manager/job/mocks/task_mock.go b/manager/job/mocks/task_mock.go index 82b59c2670e..6bd44961a62 100644 --- a/manager/job/mocks/task_mock.go +++ b/manager/job/mocks/task_mock.go @@ -16,6 +16,7 @@ import ( job "d7y.io/dragonfly/v2/internal/job" models "d7y.io/dragonfly/v2/manager/models" types "d7y.io/dragonfly/v2/manager/types" + tasks "github.com/RichardKnop/machinery/v1/tasks" gomock "go.uber.org/mock/gomock" ) @@ -42,32 +43,34 @@ func (m *MockTask) EXPECT() *MockTaskMockRecorder { return m.recorder } -// CreateDeleteTask mocks base method. -func (m *MockTask) CreateDeleteTask(arg0 context.Context, arg1 []models.Scheduler, arg2 types.DeleteTaskArgs) (*job.GroupJobState, error) { +// DeleteTask mocks base method. +func (m *MockTask) DeleteTask(arg0 context.Context, arg1 []models.Scheduler, arg2 types.DeleteTaskArgs) (*tasks.Group, map[string]*job.DeleteTaskResponse, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreateDeleteTask", arg0, arg1, arg2) - ret0, _ := ret[0].(*job.GroupJobState) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret := m.ctrl.Call(m, "DeleteTask", arg0, arg1, arg2) + ret0, _ := ret[0].(*tasks.Group) + ret1, _ := ret[1].(map[string]*job.DeleteTaskResponse) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 } -// CreateDeleteTask indicates an expected call of CreateDeleteTask. -func (mr *MockTaskMockRecorder) CreateDeleteTask(arg0, arg1, arg2 any) *gomock.Call { +// DeleteTask indicates an expected call of DeleteTask. +func (mr *MockTaskMockRecorder) DeleteTask(arg0, arg1, arg2 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateDeleteTask", reflect.TypeOf((*MockTask)(nil).CreateDeleteTask), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTask", reflect.TypeOf((*MockTask)(nil).DeleteTask), arg0, arg1, arg2) } -// CreateGetTask mocks base method. -func (m *MockTask) CreateGetTask(arg0 context.Context, arg1 []models.Scheduler, arg2 types.GetTaskArgs) (*job.GroupJobState, error) { +// GetTask mocks base method. +func (m *MockTask) GetTask(arg0 context.Context, arg1 []models.Scheduler, arg2 types.GetTaskArgs) (*tasks.Group, map[string]*job.GetTaskResponse, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreateGetTask", arg0, arg1, arg2) - ret0, _ := ret[0].(*job.GroupJobState) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret := m.ctrl.Call(m, "GetTask", arg0, arg1, arg2) + ret0, _ := ret[0].(*tasks.Group) + ret1, _ := ret[1].(map[string]*job.GetTaskResponse) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 } -// CreateGetTask indicates an expected call of CreateGetTask. -func (mr *MockTaskMockRecorder) CreateGetTask(arg0, arg1, arg2 any) *gomock.Call { +// GetTask indicates an expected call of GetTask. +func (mr *MockTaskMockRecorder) GetTask(arg0, arg1, arg2 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateGetTask", reflect.TypeOf((*MockTask)(nil).CreateGetTask), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTask", reflect.TypeOf((*MockTask)(nil).GetTask), arg0, arg1, arg2) } diff --git a/manager/job/task.go b/manager/job/task.go index 5cfb8efdde1..47959b24f9f 100644 --- a/manager/job/task.go +++ b/manager/job/task.go @@ -36,10 +36,10 @@ import ( // Task is an interface for manager tasks. type Task interface { - // CreateDeleteTask create a delete task job + // CreateDeleteTask create a delete task job. CreateDeleteTask(context.Context, []models.Scheduler, types.DeleteTaskArgs) (*internaljob.GroupJobState, error) - // CreateGetTask create a get task job + // CreateGetTask create a get task job. CreateGetTask(context.Context, []models.Scheduler, types.GetTaskArgs) (*internaljob.GroupJobState, error) } @@ -53,7 +53,7 @@ func newTask(job *internaljob.Job) Task { return &task{job} } -// CreateDeleteTask create a delete task job +// CreateDeleteTask create a delete task job. func (t *task) CreateDeleteTask(ctx context.Context, schedulers []models.Scheduler, json types.DeleteTaskArgs) (*internaljob.GroupJobState, error) { var span trace.Span ctx, span = tracer.Start(ctx, config.SpanDeleteTask, trace.WithSpanKind(trace.SpanKindProducer)) @@ -66,16 +66,45 @@ func (t *task) CreateDeleteTask(ctx context.Context, schedulers []models.Schedul return nil, err } - // Initialize queues. queues, err := getSchedulerQueues(schedulers) if err != nil { return nil, err } - return t.createGroupJob(ctx, internaljob.DeleteTaskJob, args, queues) + var signatures []*machineryv1tasks.Signature + for _, queue := range queues { + signatures = append(signatures, &machineryv1tasks.Signature{ + UUID: fmt.Sprintf("task_%s", uuid.New().String()), + Name: internaljob.DeleteTaskJob, + RoutingKey: queue.String(), + Args: args, + }) + } + + group, err := machineryv1tasks.NewGroup(signatures...) + if err != nil { + return nil, err + } + + var tasks []machineryv1tasks.Signature + for _, signature := range signatures { + tasks = append(tasks, *signature) + } + + logger.Infof("create task group %s in queues %v, tasks: %#v", group.GroupUUID, queues, tasks) + if _, err := t.job.Server.SendGroupWithContext(ctx, group, 0); err != nil { + logger.Errorf("create preheat group %s failed", group.GroupUUID, err) + return nil, err + } + + return &internaljob.GroupJobState{ + GroupUUID: group.GroupUUID, + State: machineryv1tasks.StatePending, + CreatedAt: time.Now(), + }, nil } -// CreateGetTask create a get task job +// CreateGetTask create a get task job. func (t *task) CreateGetTask(ctx context.Context, schedulers []models.Scheduler, json types.GetTaskArgs) (*internaljob.GroupJobState, error) { var span trace.Span ctx, span = tracer.Start(ctx, config.SpanGetTask, trace.WithSpanKind(trace.SpanKindProducer)) @@ -88,22 +117,16 @@ func (t *task) CreateGetTask(ctx context.Context, schedulers []models.Scheduler, return nil, err } - // Initialize queues. queues, err := getSchedulerQueues(schedulers) if err != nil { return nil, err } - return t.createGroupJob(ctx, internaljob.GetTaskJob, args, queues) -} - -// createGroupJob creates a group job. -func (t *task) createGroupJob(ctx context.Context, name string, args []machineryv1tasks.Arg, queues []internaljob.Queue) (*internaljob.GroupJobState, error) { var signatures []*machineryv1tasks.Signature for _, queue := range queues { signatures = append(signatures, &machineryv1tasks.Signature{ UUID: fmt.Sprintf("task_%s", uuid.New().String()), - Name: name, + Name: internaljob.GetTaskJob, RoutingKey: queue.String(), Args: args, }) diff --git a/manager/job/task_test.go b/manager/job/task_test.go new file mode 100644 index 00000000000..70d070a6d9b --- /dev/null +++ b/manager/job/task_test.go @@ -0,0 +1,202 @@ +/* + * Copyright 2024 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package job + +import ( + "context" + "errors" + "testing" + + machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks" + testifyassert "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" + + "d7y.io/dragonfly/v2/internal/job" + internaljob "d7y.io/dragonfly/v2/internal/job" + "d7y.io/dragonfly/v2/manager/job/mocks" + "d7y.io/dragonfly/v2/manager/models" + "d7y.io/dragonfly/v2/manager/types" +) + +func TestDeleteTask(t *testing.T) { + tests := []struct { + name string + setupMocks func(mockTask *mocks.MockTask) + ctx context.Context + schedulers []models.Scheduler + args types.DeleteTaskArgs + expect func(t *testing.T, group *machineryv1tasks.Group, responses map[string]*job.DeleteTaskResponse, err error) + }{ + { + name: "DeleteTask succeeds", + setupMocks: func(mockTask *mocks.MockTask) { + expectedGroup := &machineryv1tasks.Group{ + GroupUUID: "test-group-uuid", + } + expectedResponses := map[string]*job.DeleteTaskResponse{ + "scheduler1": { + SuccessPeers: []*job.DeleteSuccessPeer{ + { + Peer: internaljob.Peer{ID: "peer1"}, + }, + }, + FailurePeers: []*job.DeleteFailurePeer{}, + }, + "scheduler2": { + SuccessPeers: []*job.DeleteSuccessPeer{}, + FailurePeers: []*job.DeleteFailurePeer{ + { + Peer: internaljob.Peer{ID: "peer2"}, + Description: "Failed to delete", + }, + }, + }, + } + mockTask.EXPECT().DeleteTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(expectedGroup, expectedResponses, nil) + }, + ctx: context.TODO(), + schedulers: []models.Scheduler{ + {Hostname: "scheduler1"}, + {Hostname: "scheduler2"}, + }, + args: types.DeleteTaskArgs{ + TaskID: "test-task-id", + }, + expect: func(t *testing.T, group *machineryv1tasks.Group, responses map[string]*job.DeleteTaskResponse, err error) { + assert := testifyassert.New(t) + assert.NoError(err) + assert.Equal("test-group-uuid", group.GroupUUID) + assert.Equal("peer1", responses["scheduler1"].SuccessPeers[0].Peer.ID) + assert.Equal("peer2", responses["scheduler2"].FailurePeers[0].Peer.ID) + }, + }, + { + name: "DeleteTask fails", + setupMocks: func(mockTask *mocks.MockTask) { + mockTask.EXPECT().DeleteTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil, errors.New("delete task error")) + }, + ctx: context.TODO(), + schedulers: []models.Scheduler{ + {Hostname: "scheduler1"}, + }, + args: types.DeleteTaskArgs{ + TaskID: "test-task-id", + }, + expect: func(t *testing.T, group *machineryv1tasks.Group, responses map[string]*job.DeleteTaskResponse, err error) { + assert := testifyassert.New(t) + assert.Error(err) + assert.Nil(group) + assert.Nil(responses) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockTask := mocks.NewMockTask(ctrl) + tc.setupMocks(mockTask) + + group, responses, err := mockTask.DeleteTask(tc.ctx, tc.schedulers, tc.args) + tc.expect(t, group, responses, err) + }) + } +} + +func TestGetTask(t *testing.T) { + tests := []struct { + name string + setupMocks func(mockTask *mocks.MockTask) + ctx context.Context + schedulers []models.Scheduler + args types.GetTaskArgs + expect func(t *testing.T, group *machineryv1tasks.Group, responses map[string]*job.GetTaskResponse, err error) + }{ + { + name: "GetTask succeeds", + setupMocks: func(mockTask *mocks.MockTask) { + expectedGroup := &machineryv1tasks.Group{ + GroupUUID: "test-group-uuid", + } + expectedResponses := map[string]*job.GetTaskResponse{ + "scheduler1": { + Peers: []*internaljob.Peer{ + {ID: "peer1"}, + {ID: "peer2"}, + }, + }, + "scheduler2": { + Peers: []*internaljob.Peer{ + {ID: "peer3"}, + {ID: "peer4"}, + }, + }, + } + mockTask.EXPECT().GetTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(expectedGroup, expectedResponses, nil) + }, + ctx: context.TODO(), + schedulers: []models.Scheduler{ + {Hostname: "scheduler1"}, + {Hostname: "scheduler2"}, + }, + args: types.GetTaskArgs{ + TaskID: "test-task-id", + }, + expect: func(t *testing.T, group *machineryv1tasks.Group, responses map[string]*job.GetTaskResponse, err error) { + assert := testifyassert.New(t) + assert.NoError(err) + assert.Equal("test-group-uuid", group.GroupUUID) + assert.Equal("peer1", responses["scheduler1"].Peers[0].ID) + assert.Equal("peer3", responses["scheduler2"].Peers[0].ID) + }, + }, + { + name: "GetTask fails", + setupMocks: func(mockTask *mocks.MockTask) { + mockTask.EXPECT().GetTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil, errors.New("get task error")) + }, + ctx: context.TODO(), + schedulers: []models.Scheduler{ + {Hostname: "scheduler1"}, + }, + args: types.GetTaskArgs{ + TaskID: "test-task-id", + }, + expect: func(t *testing.T, group *machineryv1tasks.Group, responses map[string]*job.GetTaskResponse, err error) { + assert := testifyassert.New(t) + assert.Error(err) + assert.Nil(group) + assert.Nil(responses) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockTask := mocks.NewMockTask(ctrl) + tc.setupMocks(mockTask) + + group, responses, err := mockTask.GetTask(tc.ctx, tc.schedulers, tc.args) + tc.expect(t, group, responses, err) + }) + } +} diff --git a/manager/service/job.go b/manager/service/job.go index 6272e66c848..40c78eca19b 100644 --- a/manager/service/job.go +++ b/manager/service/job.go @@ -93,19 +93,19 @@ func (s *service) CreateDeleteTaskJob(ctx context.Context, json types.CreateDele return nil, err } - candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs) + allSchedulers, err := s.findAllSchedulers(ctx, json.SchedulerClusterIDs) if err != nil { return nil, err } - groupJobState, err := s.job.CreateDeleteTask(ctx, candidateSchedulers, json.Args) - if err != nil { - return nil, err + var allSchedulerClusters []models.SchedulerCluster + for _, allScheduler := range allSchedulers { + allSchedulerClusters = append(allSchedulerClusters, allScheduler.SchedulerCluster) } - var candidateSchedulerClusters []models.SchedulerCluster - for _, candidateScheduler := range candidateSchedulers { - candidateSchedulerClusters = append(candidateSchedulerClusters, candidateScheduler.SchedulerCluster) + groupJobState, err := s.job.CreateDeleteTask(ctx, allSchedulers, json.Args) + if err != nil { + return nil, err } job := models.Job{ @@ -115,7 +115,7 @@ func (s *service) CreateDeleteTaskJob(ctx context.Context, json types.CreateDele State: groupJobState.State, Args: args, UserID: json.UserID, - SchedulerClusters: candidateSchedulerClusters, + SchedulerClusters: allSchedulerClusters, } if err := s.db.WithContext(ctx).Create(&job).Error; err != nil { @@ -127,22 +127,22 @@ func (s *service) CreateDeleteTaskJob(ctx context.Context, json types.CreateDele } func (s *service) CreateGetTaskJob(ctx context.Context, json types.CreateGetTaskJobRequest) (*models.Job, error) { - candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs) + allSchedulers, err := s.findAllSchedulers(ctx, json.SchedulerClusterIDs) if err != nil { return nil, err } - groupJobState, err := s.job.CreateGetTask(ctx, candidateSchedulers, json.Args) - if err != nil { - return nil, err + var allSchedulerClusters []models.SchedulerCluster + for _, allScheduler := range allSchedulers { + allSchedulerClusters = append(allSchedulerClusters, allScheduler.SchedulerCluster) } - var candidateSchedulerClusters []models.SchedulerCluster - for _, candidateScheduler := range candidateSchedulers { - candidateSchedulerClusters = append(candidateSchedulerClusters, candidateScheduler.SchedulerCluster) + args, err := structure.StructToMap(json.Args) + if err != nil { + return nil, err } - args, err := structure.StructToMap(json.Args) + groupJobState, err := s.job.CreateGetTask(ctx, allSchedulers, json.Args) if err != nil { return nil, err } @@ -154,7 +154,7 @@ func (s *service) CreateGetTaskJob(ctx context.Context, json types.CreateGetTask State: groupJobState.State, Args: args, UserID: json.UserID, - SchedulerClusters: candidateSchedulerClusters, + SchedulerClusters: allSchedulerClusters, } if err := s.db.WithContext(ctx).Create(&job).Error; err != nil { @@ -165,6 +165,53 @@ func (s *service) CreateGetTaskJob(ctx context.Context, json types.CreateGetTask return &job, nil } +func (s *service) findAllSchedulers(ctx context.Context, schedulerClusterIDs []uint) ([]models.Scheduler, error) { + var availableSchedulers []models.Scheduler + if len(schedulerClusterIDs) != 0 { + // Find the scheduler clusters by request. + for _, schedulerClusterID := range schedulerClusterIDs { + schedulerCluster := models.SchedulerCluster{} + if err := s.db.WithContext(ctx).First(&schedulerCluster, schedulerClusterID).Error; err != nil { + return nil, err + } + + var schedulers []models.Scheduler + if err := s.db.WithContext(ctx).Preload("SchedulerCluster").Find(&schedulers, models.Scheduler{ + SchedulerClusterID: schedulerCluster.ID, + State: models.SchedulerStateActive, + }).Error; err != nil { + return nil, err + } + + availableSchedulers = append(availableSchedulers, schedulers...) + } + } else { + // Find all of the scheduler clusters that has active schedulers. + var availableSchedulersClusters []models.SchedulerCluster + if err := s.db.WithContext(ctx).Find(&availableSchedulersClusters).Error; err != nil { + return nil, err + } + + for _, availableSchedulersCluster := range availableSchedulersClusters { + var schedulers []models.Scheduler + if err := s.db.WithContext(ctx).Preload("SchedulerCluster").Find(&schedulers, models.Scheduler{ + SchedulerClusterID: availableSchedulersCluster.ID, + State: models.SchedulerStateActive, + }).Error; err != nil { + continue + } + + availableSchedulers = append(availableSchedulers, schedulers...) + } + } + + if len(availableSchedulers) == 0 { + return nil, errors.New("available schedulers not found") + } + + return availableSchedulers, nil +} + func (s *service) findCandidateSchedulers(ctx context.Context, schedulerClusterIDs []uint) ([]models.Scheduler, error) { var candidateSchedulers []models.Scheduler if len(schedulerClusterIDs) != 0 { diff --git a/pkg/rpc/dfdaemon/client/client_v2.go b/pkg/rpc/dfdaemon/client/client_v2.go index a92f1ea4eec..a5e88330e0b 100644 --- a/pkg/rpc/dfdaemon/client/client_v2.go +++ b/pkg/rpc/dfdaemon/client/client_v2.go @@ -28,6 +28,7 @@ import ( grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "google.golang.org/grpc" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/credentials/insecure" commonv2 "d7y.io/api/v2/pkg/apis/common/v2" dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2" @@ -102,6 +103,7 @@ func GetV2ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V grpc_retry.WithBackoff(grpc_retry.BackoffLinear(backoffWaitBetween)), ), )), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( grpc_prometheus.StreamClientInterceptor, grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), @@ -129,6 +131,12 @@ type V2 interface { // DownloadTask downloads task from p2p network. DownloadTask(context.Context, string, *dfdaemonv2.DownloadTaskRequest, ...grpc.CallOption) (dfdaemonv2.DfdaemonUpload_DownloadTaskClient, error) + // StatTask stats task information. + StatTask(context.Context, *dfdaemonv2.StatTaskRequest, ...grpc.CallOption) (*commonv2.Task, error) + + // DeleteTask deletes task from p2p network. + DeleteTask(context.Context, *dfdaemonv2.DeleteTaskRequest, ...grpc.CallOption) error + // DownloadCacheTask downloads cache task from p2p network. DownloadCacheTask(context.Context, *dfdaemonv2.DownloadCacheTaskRequest, ...grpc.CallOption) (dfdaemonv2.DfdaemonUpload_DownloadCacheTaskClient, error) @@ -182,6 +190,23 @@ func (v *v2) DownloadTask(ctx context.Context, taskID string, req *dfdaemonv2.Do ) } +// StatTask stats task information. +func (v *v2) StatTask(ctx context.Context, req *dfdaemonv2.StatTaskRequest, opts ...grpc.CallOption) (*commonv2.Task, error) { + ctx, cancel := context.WithTimeout(ctx, contextTimeout) + defer cancel() + + return v.DfdaemonUploadClient.StatTask(ctx, req, opts...) +} + +// DeleteTask deletes task from p2p network. +func (v *v2) DeleteTask(ctx context.Context, req *dfdaemonv2.DeleteTaskRequest, opts ...grpc.CallOption) error { + ctx, cancel := context.WithTimeout(ctx, contextTimeout) + defer cancel() + + _, err := v.DfdaemonUploadClient.DeleteTask(ctx, req, opts...) + return err +} + // DownloadCacheTask downloads cache task from p2p network. func (v *v2) DownloadCacheTask(ctx context.Context, req *dfdaemonv2.DownloadCacheTaskRequest, opts ...grpc.CallOption) (dfdaemonv2.DfdaemonUpload_DownloadCacheTaskClient, error) { return v.DfdaemonUploadClient.DownloadCacheTask(ctx, req, opts...) diff --git a/pkg/rpc/dfdaemon/client/mocks/client_v2_mock.go b/pkg/rpc/dfdaemon/client/mocks/client_v2_mock.go index 32fa7176ef1..b77c004accd 100644 --- a/pkg/rpc/dfdaemon/client/mocks/client_v2_mock.go +++ b/pkg/rpc/dfdaemon/client/mocks/client_v2_mock.go @@ -75,6 +75,25 @@ func (mr *MockV2MockRecorder) DeleteCacheTask(arg0, arg1 any, arg2 ...any) *gomo return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteCacheTask", reflect.TypeOf((*MockV2)(nil).DeleteCacheTask), varargs...) } +// DeleteTask mocks base method. +func (m *MockV2) DeleteTask(arg0 context.Context, arg1 *dfdaemon.DeleteTaskRequest, arg2 ...grpc.CallOption) error { + m.ctrl.T.Helper() + varargs := []any{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "DeleteTask", varargs...) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteTask indicates an expected call of DeleteTask. +func (mr *MockV2MockRecorder) DeleteTask(arg0, arg1 any, arg2 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTask", reflect.TypeOf((*MockV2)(nil).DeleteTask), varargs...) +} + // DownloadCacheTask mocks base method. func (m *MockV2) DownloadCacheTask(arg0 context.Context, arg1 *dfdaemon.DownloadCacheTaskRequest, arg2 ...grpc.CallOption) (dfdaemon.DfdaemonUpload_DownloadCacheTaskClient, error) { m.ctrl.T.Helper() @@ -155,6 +174,26 @@ func (mr *MockV2MockRecorder) StatCacheTask(arg0, arg1 any, arg2 ...any) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StatCacheTask", reflect.TypeOf((*MockV2)(nil).StatCacheTask), varargs...) } +// StatTask mocks base method. +func (m *MockV2) StatTask(arg0 context.Context, arg1 *dfdaemon.StatTaskRequest, arg2 ...grpc.CallOption) (*common.Task, error) { + m.ctrl.T.Helper() + varargs := []any{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "StatTask", varargs...) + ret0, _ := ret[0].(*common.Task) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StatTask indicates an expected call of StatTask. +func (mr *MockV2MockRecorder) StatTask(arg0, arg1 any, arg2 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StatTask", reflect.TypeOf((*MockV2)(nil).StatTask), varargs...) +} + // SyncPieces mocks base method. func (m *MockV2) SyncPieces(arg0 context.Context, arg1 *dfdaemon.SyncPiecesRequest, arg2 ...grpc.CallOption) (dfdaemon.DfdaemonUpload_SyncPiecesClient, error) { m.ctrl.T.Helper() diff --git a/scheduler/job/job.go b/scheduler/job/job.go index aec0daf1f70..2b2ae40a403 100644 --- a/scheduler/job/job.go +++ b/scheduler/job/job.go @@ -495,12 +495,21 @@ func (j *job) getTask(ctx context.Context, data string) (string, error) { task, ok := j.resource.TaskManager().Load(req.TaskID) if !ok { - logger.Errorf("task %s not found", req.TaskID) - return "", fmt.Errorf("task %s not found", req.TaskID) + // Do not return error if task not found, just retunr empty response. + logger.Warnf("task %s not found", req.TaskID) + return internaljob.MarshalResponse(&internaljob.GetTaskResponse{}) + } + + // Convert peer struct to peer response. + var peers []*internaljob.Peer + for _, peer := range task.LoadPeers() { + peers = append(peers, convertPeer(peer)) + } return internaljob.MarshalResponse(&internaljob.GetTaskResponse{ - Peers: task.LoadPeers(), + Peers: peers, + SchedulerClusterID: j.config.Manager.SchedulerClusterID, }) } @@ -522,12 +531,13 @@ func (j *job) deleteTask(ctx context.Context, data string) (string, error) { task, ok := j.resource.TaskManager().Load(req.TaskID) if !ok { - logger.Errorf("task %s not found", req.TaskID) - return "", fmt.Errorf("task %s not found", req.TaskID) + // Do not return error if task not found, just retunr empty response. + logger.Warnf("task %s not found", req.TaskID) + return internaljob.MarshalResponse(&internaljob.DeleteTaskResponse{}) } - successPeers := []*internaljob.DeletePeerResponse{} - failurePeers := []*internaljob.DeletePeerResponse{} + successPeers := []*internaljob.DeleteSuccessPeer{} + failurePeers := []*internaljob.DeleteFailurePeer{} finishedPeers := task.LoadFinishedPeers() for _, finishedPeer := range finishedPeers { @@ -537,34 +547,78 @@ func (j *job) deleteTask(ctx context.Context, data string) (string, error) { dfdaemonClient, err := dfdaemonclient.GetV2ByAddr(ctx, addr) if err != nil { log.Errorf("get client from %s failed: %s", addr, err.Error()) - failurePeers = append(failurePeers, &internaljob.DeletePeerResponse{ - Peer: finishedPeer, - Description: err.Error(), + failurePeers = append(failurePeers, &internaljob.DeleteFailurePeer{ + Peer: *convertPeer(finishedPeer), + Description: fmt.Sprintf("task %s failed: %s", req.TaskID, err.Error()), }) continue } - if err = dfdaemonClient.DeleteCacheTask(ctx, &dfdaemonv2.DeleteCacheTaskRequest{ + if err = dfdaemonClient.DeleteTask(ctx, &dfdaemonv2.DeleteTaskRequest{ TaskId: req.TaskID, }); err != nil { logger.Errorf("delete task failed: %s", err.Error()) - failurePeers = append(failurePeers, &internaljob.DeletePeerResponse{ - Peer: finishedPeer, - Description: err.Error(), + failurePeers = append(failurePeers, &internaljob.DeleteFailurePeer{ + Peer: *convertPeer(finishedPeer), + Description: fmt.Sprintf("task %s failed: %s", req.TaskID, err.Error()), }) continue } - successPeers = append(successPeers, &internaljob.DeletePeerResponse{ - Peer: finishedPeer, - Description: "", + successPeers = append(successPeers, &internaljob.DeleteSuccessPeer{ + Peer: *convertPeer(finishedPeer), }) } return internaljob.MarshalResponse(&internaljob.DeleteTaskResponse{ - FailurePeers: failurePeers, - SuccessPeers: successPeers, + FailurePeers: failurePeers, + SuccessPeers: successPeers, + SchedulerClusterID: j.config.Manager.SchedulerClusterID, }) } + +func convertPeer(p *resource.Peer) *internaljob.Peer { + peer := &internaljob.Peer{ + ID: p.ID, + Config: p.Config, + Range: p.Range, + Priority: int32(p.Priority), + FinishedPieces: p.FinishedPieces, + PieceCosts: p.PieceCosts(), + NeedBackToSource: p.NeedBackToSource != nil && p.NeedBackToSource.Load(), + } + + if p.BlockParents != nil && p.BlockParents.Len() > 0 { + peer.BlockParents = p.BlockParents.Values() + } + + if p.Cost != nil { + peer.Cost = p.Cost.Load() + } + + if p.PieceUpdatedAt != nil { + peer.PieceUpdatedAt = p.PieceUpdatedAt.Load() + } + + if p.CreatedAt != nil { + peer.CreatedAt = p.CreatedAt.Load() + } + + if p.UpdatedAt != nil { + peer.UpdatedAt = p.UpdatedAt.Load() + } + + peer.Pieces = make(map[int32]*resource.Piece) + p.Pieces.Range(func(key, value interface{}) bool { + k, ok1 := key.(int32) + v, ok2 := value.(*resource.Piece) + if ok1 && ok2 { + peer.Pieces[k] = v + } + return true + }) + + return peer +} diff --git a/scheduler/resource/seed_peer_client_mock.go b/scheduler/resource/seed_peer_client_mock.go index 74df28d9656..cf866e454bb 100644 --- a/scheduler/resource/seed_peer_client_mock.go +++ b/scheduler/resource/seed_peer_client_mock.go @@ -92,6 +92,25 @@ func (mr *MockSeedPeerClientMockRecorder) DeleteCacheTask(arg0, arg1 any, arg2 . return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteCacheTask", reflect.TypeOf((*MockSeedPeerClient)(nil).DeleteCacheTask), varargs...) } +// DeleteTask mocks base method. +func (m *MockSeedPeerClient) DeleteTask(arg0 context.Context, arg1 *dfdaemon.DeleteTaskRequest, arg2 ...grpc.CallOption) error { + m.ctrl.T.Helper() + varargs := []any{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "DeleteTask", varargs...) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteTask indicates an expected call of DeleteTask. +func (mr *MockSeedPeerClientMockRecorder) DeleteTask(arg0, arg1 any, arg2 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTask", reflect.TypeOf((*MockSeedPeerClient)(nil).DeleteTask), varargs...) +} + // DownloadCacheTask mocks base method. func (m *MockSeedPeerClient) DownloadCacheTask(arg0 context.Context, arg1 *dfdaemon.DownloadCacheTaskRequest, arg2 ...grpc.CallOption) (dfdaemon.DfdaemonUpload_DownloadCacheTaskClient, error) { m.ctrl.T.Helper() @@ -224,6 +243,26 @@ func (mr *MockSeedPeerClientMockRecorder) StatCacheTask(arg0, arg1 any, arg2 ... return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StatCacheTask", reflect.TypeOf((*MockSeedPeerClient)(nil).StatCacheTask), varargs...) } +// StatTask mocks base method. +func (m *MockSeedPeerClient) StatTask(arg0 context.Context, arg1 *dfdaemon.StatTaskRequest, arg2 ...grpc.CallOption) (*common0.Task, error) { + m.ctrl.T.Helper() + varargs := []any{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "StatTask", varargs...) + ret0, _ := ret[0].(*common0.Task) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StatTask indicates an expected call of StatTask. +func (mr *MockSeedPeerClientMockRecorder) StatTask(arg0, arg1 any, arg2 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StatTask", reflect.TypeOf((*MockSeedPeerClient)(nil).StatTask), varargs...) +} + // SyncPieceTasks mocks base method. func (m *MockSeedPeerClient) SyncPieceTasks(arg0 context.Context, arg1 *common.PieceTaskRequest, arg2 ...grpc.CallOption) (cdnsystem.Seeder_SyncPieceTasksClient, error) { m.ctrl.T.Helper() diff --git a/test/e2e/v2/manager/preheat.go b/test/e2e/v2/manager/preheat.go index c312527e7e3..c25c3566e9c 100644 --- a/test/e2e/v2/manager/preheat.go +++ b/test/e2e/v2/manager/preheat.go @@ -406,7 +406,7 @@ var _ = Describe("Preheat with Manager", func() { }) }) -func waitForDone(preheat *models.Job, pod *util.PodExec) bool { +func waitForDone(job *models.Job, pod *util.PodExec) bool { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() @@ -419,12 +419,12 @@ func waitForDone(preheat *models.Job, pod *util.PodExec) bool { return false case <-ticker.C: out, err := pod.CurlCommand("", nil, nil, - fmt.Sprintf("http://dragonfly-manager.dragonfly-system.svc:8080/api/v1/jobs/%d", preheat.ID)).CombinedOutput() + fmt.Sprintf("http://dragonfly-manager.dragonfly-system.svc:8080/api/v1/jobs/%d", job.ID)).CombinedOutput() fmt.Println(string(out)) Expect(err).NotTo(HaveOccurred()) - err = json.Unmarshal(out, preheat) + err = json.Unmarshal(out, job) Expect(err).NotTo(HaveOccurred()) - switch preheat.State { + switch job.State { case machineryv1tasks.StateSuccess: return true case machineryv1tasks.StateFailure: diff --git a/test/e2e/v2/manager/task.go b/test/e2e/v2/manager/task.go new file mode 100644 index 00000000000..b0ef6173dd8 --- /dev/null +++ b/test/e2e/v2/manager/task.go @@ -0,0 +1,137 @@ +/* + * Copyright 2024 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package manager + +import ( + "encoding/json" + "fmt" + + . "github.com/onsi/ginkgo/v2" //nolint + . "github.com/onsi/gomega" //nolint + + internaljob "d7y.io/dragonfly/v2/internal/job" + "d7y.io/dragonfly/v2/manager/models" + "d7y.io/dragonfly/v2/manager/types" + "d7y.io/dragonfly/v2/pkg/structure" + "d7y.io/dragonfly/v2/test/e2e/v2/util" +) + +var _ = Describe("Get and delete task with Manager", func() { + Context("get and delete /bin/cat task", func() { + It("get and delete task should be ok", func() { + // Create preheat job. + managerPod, err := util.ManagerExec(0) + fmt.Println(err) + Expect(err).NotTo(HaveOccurred()) + + req, err := structure.StructToMap(types.CreatePreheatJobRequest{ + Type: internaljob.PreheatJob, + Args: types.PreheatArgs{ + Type: "file", + URL: util.GetFileURL("/bin/cat"), + }, + SchedulerClusterIDs: []uint{1}, + }) + Expect(err).NotTo(HaveOccurred()) + + out, err := managerPod.CurlCommand("POST", map[string]string{"Content-Type": "application/json"}, req, + "http://127.0.0.1:8080/api/v1/jobs").CombinedOutput() + fmt.Println(err) + Expect(err).NotTo(HaveOccurred()) + fmt.Println(string(out)) + + job := &models.Job{} + err = json.Unmarshal(out, job) + fmt.Println(err) + Expect(err).NotTo(HaveOccurred()) + + done := waitForDone(job, managerPod) + Expect(done).Should(BeTrue()) + + fileMetadata := util.FileMetadata{ + ID: "6f1e003b51a34df01dd80e3498dbaad584584d97888464b33b3e2c8442a3d485", + Sha256: "df954abca766aceddd79dd20429e4f222019018667446626d3a641d3c47c50fc", + } + + seedClientPods := make([]*util.PodExec, 3) + for i := 0; i < 3; i++ { + seedClientPods[i], err = util.SeedClientExec(i) + fmt.Println(err) + Expect(err).NotTo(HaveOccurred()) + } + + // Check the file is downloaded successfully. + sha256sum, err := util.CalculateSha256ByTaskID(seedClientPods, fileMetadata.ID) + Expect(err).NotTo(HaveOccurred()) + Expect(fileMetadata.Sha256).To(Equal(sha256sum)) + + // Get task. + req, err = structure.StructToMap(types.CreateGetTaskJobRequest{ + Type: internaljob.GetTaskJob, + Args: types.GetTaskArgs{ + TaskID: fileMetadata.ID, + }, + }) + Expect(err).NotTo(HaveOccurred()) + out, err = managerPod.CurlCommand("POST", map[string]string{"Content-Type": "application/json"}, req, + "http://127.0.0.1:8080/api/v1/jobs").CombinedOutput() + fmt.Println(err) + Expect(err).NotTo(HaveOccurred()) + fmt.Println(string(out)) + + job = &models.Job{} + err = json.Unmarshal(out, job) + fmt.Println(err) + Expect(err).NotTo(HaveOccurred()) + + done = waitForDone(job, managerPod) + Expect(done).Should(BeTrue()) + + // Check get task response is not null. + Expect(job.Result).NotTo(BeNil()) + + // Delete task. + req, err = structure.StructToMap(types.CreateDeleteTaskJobRequest{ + Type: internaljob.DeleteTaskJob, + Args: types.DeleteTaskArgs{ + TaskID: fileMetadata.ID, + }, + }) + Expect(err).NotTo(HaveOccurred()) + out, err = managerPod.CurlCommand("POST", map[string]string{"Content-Type": "application/json"}, req, + "http://127.0.0.1:8080/api/v1/jobs").CombinedOutput() + fmt.Println(err) + Expect(err).NotTo(HaveOccurred()) + fmt.Println(string(out)) + + job = &models.Job{} + err = json.Unmarshal(out, job) + fmt.Println(err) + Expect(err).NotTo(HaveOccurred()) + + done = waitForDone(job, managerPod) + Expect(done).Should(BeTrue()) + + // Check delete task response is not null. + Expect(job.Result).NotTo(BeNil()) + + // Check file is deleted successfully. + exist := util.CheckFilesExist(seedClientPods, fileMetadata.ID) + Expect(exist).Should(BeFalse()) + }) + }) +}) diff --git a/test/e2e/v2/util/task.go b/test/e2e/v2/util/task.go index 6c031343397..9184f1f1d7d 100644 --- a/test/e2e/v2/util/task.go +++ b/test/e2e/v2/util/task.go @@ -31,6 +31,20 @@ type TaskMetadata struct { Sha256 string } +// Check files is exist or not. +func CheckFilesExist(pods []*PodExec, taskID string) bool { + for _, pod := range pods { + contentPath := fmt.Sprintf("%s/%s", clientContentDir, taskID) + if _, err := pod.Command("ls", contentPath).CombinedOutput(); err != nil { + // If the path does not exist, skip this client. + fmt.Printf("path %s does not exist: %s\n", contentPath, err.Error()) + continue + } + return true + } + return false +} + func CalculateSha256ByTaskID(pods []*PodExec, taskID string) (string, error) { var sha256sum string for _, pod := range pods {