From dd03c979759f9f721b2ea0acfcd51f1ed67088cb Mon Sep 17 00:00:00 2001 From: Gaius Date: Wed, 10 Jan 2024 16:42:55 +0800 Subject: [PATCH] feat: prevent concurrent request to cause state switch failed (#3014) Signed-off-by: Gaius --- scheduler/config/constants.go | 6 +++--- scheduler/scheduling/scheduling.go | 1 - scheduler/service/service_v2.go | 24 ++++++++++++++---------- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/scheduler/config/constants.go b/scheduler/config/constants.go index a603b5734df..ae9c6301605 100644 --- a/scheduler/config/constants.go +++ b/scheduler/config/constants.go @@ -64,13 +64,13 @@ const ( DefaultSchedulerBackToSourceCount = 3 // DefaultSchedulerRetryBackToSourceLimit is default retry back-to-source limit for scheduler. - DefaultSchedulerRetryBackToSourceLimit = 30 + DefaultSchedulerRetryBackToSourceLimit = 10 // DefaultSchedulerRetryLimit is default retry limit for scheduler. - DefaultSchedulerRetryLimit = 40 + DefaultSchedulerRetryLimit = 15 // DefaultSchedulerRetryInterval is default retry interval for scheduler. - DefaultSchedulerRetryInterval = 100 * time.Millisecond + DefaultSchedulerRetryInterval = 300 * time.Millisecond // DefaultSchedulerPieceDownloadTimeout is default timeout of downloading piece. DefaultSchedulerPieceDownloadTimeout = 30 * time.Minute diff --git a/scheduler/scheduling/scheduling.go b/scheduler/scheduling/scheduling.go index 30eecd18cf0..cb7d4bd58dc 100644 --- a/scheduler/scheduling/scheduling.go +++ b/scheduler/scheduling/scheduling.go @@ -504,7 +504,6 @@ func (s *scheduling) filterCandidateParents(peer *resource.Peer, blocklist set.S filterParentLimit = int(config.FilterParentLimit) } } - peer.Log.Debugf("filter parent limit is %d", filterParentLimit) var ( candidateParents []*resource.Peer diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 98bf0f0fe0e..751115a6717 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -927,11 +927,13 @@ func (v *V2) handleDownloadPeerStartedRequest(ctx context.Context, peerID string peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() // Handle peer with peer started request. - if err := peer.FSM.Event(ctx, resource.PeerEventDownload); err != nil { - // Collect DownloadPeerStartedFailureCount metrics. - metrics.DownloadPeerStartedFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() - return status.Error(codes.Internal, err.Error()) + if !peer.FSM.Is(resource.PeerStateRunning) { + if err := peer.FSM.Event(ctx, resource.PeerEventDownload); err != nil { + // Collect DownloadPeerStartedFailureCount metrics. + metrics.DownloadPeerStartedFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + return status.Error(codes.Internal, err.Error()) + } } return nil @@ -950,11 +952,13 @@ func (v *V2) handleDownloadPeerBackToSourceStartedRequest(ctx context.Context, p peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() // Handle peer with peer back-to-source started request. - if err := peer.FSM.Event(ctx, resource.PeerEventDownloadBackToSource); err != nil { - // Collect DownloadPeerBackToSourceStartedFailureCount metrics. - metrics.DownloadPeerBackToSourceStartedFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() - return status.Error(codes.Internal, err.Error()) + if !peer.FSM.Is(resource.PeerStateRunning) { + if err := peer.FSM.Event(ctx, resource.PeerEventDownloadBackToSource); err != nil { + // Collect DownloadPeerBackToSourceStartedFailureCount metrics. + metrics.DownloadPeerBackToSourceStartedFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + return status.Error(codes.Internal, err.Error()) + } } return nil