From a576b7f888bda1a0b5cce1ce31710b037f2b3518 Mon Sep 17 00:00:00 2001 From: Marius Kleidl Date: Fri, 4 Oct 2024 11:34:14 +0200 Subject: [PATCH 1/6] ci: Enable data race detector in tests --- .github/workflows/continuous-integration.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/continuous-integration.yaml b/.github/workflows/continuous-integration.yaml index c639222e5..fa5de61be 100644 --- a/.github/workflows/continuous-integration.yaml +++ b/.github/workflows/continuous-integration.yaml @@ -49,8 +49,8 @@ jobs: - name: Test code run: | - go test ./pkg/... - go test ./internal/... + go test -race ./pkg/... + go test -race ./internal/... shell: bash - From 9b68f9f85f4d71dfd20e810c8e752ca67ca83c10 Mon Sep 17 00:00:00 2001 From: Marius Kleidl Date: Fri, 4 Oct 2024 12:28:41 +0200 Subject: [PATCH 2/6] handler: Fix data race in bodyReader when stopping upload --- pkg/handler/body_reader.go | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/pkg/handler/body_reader.go b/pkg/handler/body_reader.go index ab9c8d3cd..86217f3a0 100644 --- a/pkg/handler/body_reader.go +++ b/pkg/handler/body_reader.go @@ -7,6 +7,7 @@ import ( "net/http" "os" "strings" + "sync" "sync/atomic" "time" ) @@ -28,8 +29,11 @@ type bodyReader struct { bytesCounter int64 ctx *httpContext reader io.ReadCloser - err error onReadDone func() + + // lock protects concurrent access to err. + lock sync.RWMutex + err error } func newBodyReader(c *httpContext, maxSize int64) *bodyReader { @@ -41,7 +45,10 @@ func newBodyReader(c *httpContext, maxSize int64) *bodyReader { } func (r *bodyReader) Read(b []byte) (int, error) { - if r.err != nil { + r.lock.RLock() + hasErrored := r.err != nil + r.lock.RUnlock() + if hasErrored { return 0, io.EOF } @@ -99,20 +106,26 @@ func (r *bodyReader) Read(b []byte) (int, error) { // Other errors are stored for retrival with hasError, but is not returned // to the consumer. We do not overwrite an error if it has been set already. + r.lock.Lock() if r.err == nil { r.err = err } + r.lock.Unlock() } return n, nil } func (r bodyReader) hasError() error { - if r.err == io.EOF { + r.lock.RLock() + err := r.err + r.lock.RUnlock() + + if err == io.EOF { return nil } - return r.err + return err } func (r *bodyReader) bytesRead() int64 { @@ -120,7 +133,9 @@ func (r *bodyReader) bytesRead() int64 { } func (r *bodyReader) closeWithError(err error) { + r.lock.Lock() r.err = err + r.lock.Unlock() // SetReadDeadline with the current time causes concurrent reads to the body to time out, // so the body will be closed sooner with less delay. From 94a2c11f6cd38d678188954ff4aac232f6774aa7 Mon Sep 17 00:00:00 2001 From: Marius Kleidl Date: Fri, 4 Oct 2024 13:18:11 +0200 Subject: [PATCH 3/6] s3store: Prevent data race using `errgroup` package --- go.mod | 2 +- pkg/s3store/s3store.go | 51 +++++++++++++++++++++++------------------- 2 files changed, 29 insertions(+), 24 deletions(-) diff --git a/go.mod b/go.mod index cea5c5a6a..b278b7760 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/vimeo/go-util v1.4.1 golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df golang.org/x/net v0.29.0 + golang.org/x/sync v0.8.0 google.golang.org/api v0.199.0 google.golang.org/grpc v1.67.0 google.golang.org/protobuf v1.34.2 @@ -95,7 +96,6 @@ require ( go.opentelemetry.io/otel/trace v1.29.0 // indirect golang.org/x/crypto v0.27.0 // indirect golang.org/x/oauth2 v0.23.0 // indirect - golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect golang.org/x/time v0.6.0 // indirect diff --git a/pkg/s3store/s3store.go b/pkg/s3store/s3store.go index db4f29fde..5e61718e7 100644 --- a/pkg/s3store/s3store.go +++ b/pkg/s3store/s3store.go @@ -88,6 +88,7 @@ import ( "github.com/tus/tusd/v2/internal/uid" "github.com/tus/tusd/v2/pkg/handler" "golang.org/x/exp/slices" + "golang.org/x/sync/errgroup" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -469,8 +470,7 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re }() go partProducer.produce(producerCtx, optimalPartSize) - var wg sync.WaitGroup - var uploadErr error + var eg errgroup.Group for { // We acquire the semaphore before starting the goroutine to avoid @@ -497,10 +497,8 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re } upload.parts = append(upload.parts, part) - wg.Add(1) - go func(file io.ReadSeeker, part *s3Part, closePart func() error) { + eg.Go(func() error { defer upload.store.releaseUploadSemaphore() - defer wg.Done() t := time.Now() uploadPartInput := &s3.UploadPartInput{ @@ -509,39 +507,46 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re UploadId: aws.String(upload.multipartId), PartNumber: aws.Int32(part.number), } - etag, err := upload.putPartForUpload(ctx, uploadPartInput, file, part.size) + etag, err := upload.putPartForUpload(ctx, uploadPartInput, partfile, part.size) store.observeRequestDuration(t, metricUploadPart) - if err != nil { - uploadErr = err - } else { + if err == nil { part.etag = etag } - if cerr := closePart(); cerr != nil && uploadErr == nil { - uploadErr = cerr + + cerr := closePart() + if err != nil { + return err + } + if cerr != nil { + return cerr } - }(partfile, part, closePart) + return nil + }) } else { - wg.Add(1) - go func(file io.ReadSeeker, closePart func() error) { + eg.Go(func() error { defer upload.store.releaseUploadSemaphore() - defer wg.Done() - if err := store.putIncompletePartForUpload(ctx, upload.objectId, file); err != nil { - uploadErr = err + err := store.putIncompletePartForUpload(ctx, upload.objectId, partfile) + if err == nil { + upload.incompletePartSize = partsize } - if cerr := closePart(); cerr != nil && uploadErr == nil { - uploadErr = cerr + + cerr := closePart() + if err != nil { + return err + } + if cerr != nil { + return cerr } - upload.incompletePartSize = partsize - }(partfile, closePart) + return nil + }) } bytesUploaded += partsize nextPartNum += 1 } - wg.Wait() - + uploadErr := eg.Wait() if uploadErr != nil { return 0, uploadErr } From ce3d0fc2394c2108fff65638d756709347d36cf5 Mon Sep 17 00:00:00 2001 From: Marius Kleidl Date: Fri, 4 Oct 2024 13:30:34 +0200 Subject: [PATCH 4/6] s3store: Fix data race in `concatUsingMultipart` --- pkg/s3store/multi_error.go | 1 + pkg/s3store/s3store.go | 26 ++++++++++---------------- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/pkg/s3store/multi_error.go b/pkg/s3store/multi_error.go index a2d9e3db9..78fd97ece 100644 --- a/pkg/s3store/multi_error.go +++ b/pkg/s3store/multi_error.go @@ -4,6 +4,7 @@ import ( "errors" ) +// TODO: Replace with errors.Join func newMultiError(errs []error) error { message := "Multiple errors occurred:\n" for _, err := range errs { diff --git a/pkg/s3store/s3store.go b/pkg/s3store/s3store.go index 5e61718e7..23f4d90fe 100644 --- a/pkg/s3store/s3store.go +++ b/pkg/s3store/s3store.go @@ -974,13 +974,10 @@ func (upload *s3Upload) concatUsingDownload(ctx context.Context, partialUploads func (upload *s3Upload) concatUsingMultipart(ctx context.Context, partialUploads []handler.Upload) error { store := upload.store - numPartialUploads := len(partialUploads) - errs := make([]error, 0, numPartialUploads) - // Copy partial uploads concurrently - var wg sync.WaitGroup - wg.Add(numPartialUploads) + var eg errgroup.Group for i, partialUpload := range partialUploads { + // Part numbers must be in the range of 1 to 10000, inclusive. Since // slice indexes start at 0, we add 1 to ensure that i >= 1. partNumber := int32(i + 1) @@ -992,29 +989,26 @@ func (upload *s3Upload) concatUsingMultipart(ctx context.Context, partialUploads etag: "", }) - go func(partNumber int32, sourceObject string) { - defer wg.Done() - + eg.Go(func() error { res, err := store.Service.UploadPartCopy(ctx, &s3.UploadPartCopyInput{ Bucket: aws.String(store.Bucket), Key: store.keyWithPrefix(upload.objectId), UploadId: aws.String(upload.multipartId), PartNumber: aws.Int32(partNumber), - CopySource: aws.String(store.Bucket + "/" + *store.keyWithPrefix(sourceObject)), + CopySource: aws.String(store.Bucket + "/" + *store.keyWithPrefix(partialS3Upload.objectId)), }) if err != nil { - errs = append(errs, err) - return + return err } upload.parts[partNumber-1].etag = *res.CopyPartResult.ETag - }(partNumber, partialS3Upload.objectId) + return nil + }) } - wg.Wait() - - if len(errs) > 0 { - return newMultiError(errs) + err := eg.Wait() + if err != nil { + return err } return upload.FinishUpload(ctx) From 148fa2fd9499a4e82d367aa61db3a3afcb941f33 Mon Sep 17 00:00:00 2001 From: Marius Kleidl Date: Fri, 4 Oct 2024 13:40:29 +0200 Subject: [PATCH 5/6] fixup! s3store: Fix data race in `concatUsingMultipart` --- pkg/s3store/s3store.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/pkg/s3store/s3store.go b/pkg/s3store/s3store.go index 23f4d90fe..e592a6482 100644 --- a/pkg/s3store/s3store.go +++ b/pkg/s3store/s3store.go @@ -974,6 +974,8 @@ func (upload *s3Upload) concatUsingDownload(ctx context.Context, partialUploads func (upload *s3Upload) concatUsingMultipart(ctx context.Context, partialUploads []handler.Upload) error { store := upload.store + upload.parts = make([]*s3Part, len(partialUploads)) + // Copy partial uploads concurrently var eg errgroup.Group for i, partialUpload := range partialUploads { @@ -983,12 +985,6 @@ func (upload *s3Upload) concatUsingMultipart(ctx context.Context, partialUploads partNumber := int32(i + 1) partialS3Upload := partialUpload.(*s3Upload) - upload.parts = append(upload.parts, &s3Part{ - number: partNumber, - size: -1, - etag: "", - }) - eg.Go(func() error { res, err := store.Service.UploadPartCopy(ctx, &s3.UploadPartCopyInput{ Bucket: aws.String(store.Bucket), @@ -1001,7 +997,12 @@ func (upload *s3Upload) concatUsingMultipart(ctx context.Context, partialUploads return err } - upload.parts[partNumber-1].etag = *res.CopyPartResult.ETag + upload.parts[partNumber-1] = &s3Part{ + number: partNumber, + size: -1, // -1 is fine here bcause FinishUpload does not need this info. + etag: *res.CopyPartResult.ETag, + } + return nil }) } From 6afee2ba2cfe0607e466385eea61e365d6d16bef Mon Sep 17 00:00:00 2001 From: Marius Kleidl Date: Fri, 4 Oct 2024 13:47:32 +0200 Subject: [PATCH 6/6] fixup! handler: Fix data race in bodyReader when stopping upload --- pkg/handler/body_reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/handler/body_reader.go b/pkg/handler/body_reader.go index 86217f3a0..ed3a5f639 100644 --- a/pkg/handler/body_reader.go +++ b/pkg/handler/body_reader.go @@ -116,7 +116,7 @@ func (r *bodyReader) Read(b []byte) (int, error) { return n, nil } -func (r bodyReader) hasError() error { +func (r *bodyReader) hasError() error { r.lock.RLock() err := r.err r.lock.RUnlock()