Skip to content

Commit

Permalink
feat: prevent concurrent request to cause state switch failed (#3014)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Jan 10, 2024
1 parent 8508844 commit dd03c97
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 14 deletions.
6 changes: 3 additions & 3 deletions scheduler/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ const (
DefaultSchedulerBackToSourceCount = 3

// DefaultSchedulerRetryBackToSourceLimit is default retry back-to-source limit for scheduler.
DefaultSchedulerRetryBackToSourceLimit = 30
DefaultSchedulerRetryBackToSourceLimit = 10

// DefaultSchedulerRetryLimit is default retry limit for scheduler.
DefaultSchedulerRetryLimit = 40
DefaultSchedulerRetryLimit = 15

// DefaultSchedulerRetryInterval is default retry interval for scheduler.
DefaultSchedulerRetryInterval = 100 * time.Millisecond
DefaultSchedulerRetryInterval = 300 * time.Millisecond

// DefaultSchedulerPieceDownloadTimeout is default timeout of downloading piece.
DefaultSchedulerPieceDownloadTimeout = 30 * time.Minute
Expand Down
1 change: 0 additions & 1 deletion scheduler/scheduling/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,6 @@ func (s *scheduling) filterCandidateParents(peer *resource.Peer, blocklist set.S
filterParentLimit = int(config.FilterParentLimit)
}
}
peer.Log.Debugf("filter parent limit is %d", filterParentLimit)

var (
candidateParents []*resource.Peer
Expand Down
24 changes: 14 additions & 10 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -927,11 +927,13 @@ func (v *V2) handleDownloadPeerStartedRequest(ctx context.Context, peerID string
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()

// Handle peer with peer started request.
if err := peer.FSM.Event(ctx, resource.PeerEventDownload); err != nil {
// Collect DownloadPeerStartedFailureCount metrics.
metrics.DownloadPeerStartedFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
return status.Error(codes.Internal, err.Error())
if !peer.FSM.Is(resource.PeerStateRunning) {
if err := peer.FSM.Event(ctx, resource.PeerEventDownload); err != nil {
// Collect DownloadPeerStartedFailureCount metrics.
metrics.DownloadPeerStartedFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
return status.Error(codes.Internal, err.Error())
}
}

return nil
Expand All @@ -950,11 +952,13 @@ func (v *V2) handleDownloadPeerBackToSourceStartedRequest(ctx context.Context, p
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()

// Handle peer with peer back-to-source started request.
if err := peer.FSM.Event(ctx, resource.PeerEventDownloadBackToSource); err != nil {
// Collect DownloadPeerBackToSourceStartedFailureCount metrics.
metrics.DownloadPeerBackToSourceStartedFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
return status.Error(codes.Internal, err.Error())
if !peer.FSM.Is(resource.PeerStateRunning) {
if err := peer.FSM.Event(ctx, resource.PeerEventDownloadBackToSource); err != nil {
// Collect DownloadPeerBackToSourceStartedFailureCount metrics.
metrics.DownloadPeerBackToSourceStartedFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
return status.Error(codes.Internal, err.Error())
}
}

return nil
Expand Down

0 comments on commit dd03c97

Please sign in to comment.