Skip to content

Commit

Permalink
azurestore: Buffer upload data on disk instead of in-memory
Browse files Browse the repository at this point in the history
Closes #1031
  • Loading branch information
Acconut committed Jan 23, 2024
1 parent 9f0aebe commit 1231fd5
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 11 deletions.
47 changes: 37 additions & 10 deletions pkg/azurestore/azurestore.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package azurestore

import (
"bufio"
"bytes"
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
"os"
"strings"

"github.com/tus/tusd/v2/internal/uid"
Expand All @@ -18,13 +19,20 @@ type AzureStore struct {
Service AzService
ObjectPrefix string
Container string

// TemporaryDirectory is the path where AzureStore will create temporary files
// on disk during the upload. An empty string ("", the default value) will
// cause AzureStore to use the operating system's default temporary directory.
TemporaryDirectory string
}

type AzUpload struct {
ID string
InfoBlob AzBlob
BlockBlob AzBlob
InfoHandler *handler.FileInfo

tempDir string
}

func New(service AzService) *AzureStore {
Expand Down Expand Up @@ -73,6 +81,7 @@ func (store AzureStore) NewUpload(ctx context.Context, info handler.FileInfo) (h
InfoHandler: &info,
InfoBlob: infoBlob,
BlockBlob: blockBlob,
tempDir: store.TemporaryDirectory,
}

err = azUpload.writeInfo(ctx)
Expand Down Expand Up @@ -128,6 +137,7 @@ func (store AzureStore) GetUpload(ctx context.Context, id string) (handler.Uploa
InfoHandler: &info,
InfoBlob: infoBlob,
BlockBlob: blockBlob,
tempDir: store.TemporaryDirectory,
}, nil
}

Expand All @@ -140,21 +150,38 @@ func (store AzureStore) AsLengthDeclarableUpload(upload handler.Upload) handler.
}

func (upload *AzUpload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) {
r := bufio.NewReader(src)
buf := new(bytes.Buffer)
n, err := r.WriteTo(buf)
// Create a temporary file for holding the uploaded data
file, err := os.CreateTemp(upload.tempDir, "tusd-az-tmp-")
if err != nil {
return 0, err
}
defer os.Remove(file.Name())

// Copy the entire request body into the file
n, err := io.Copy(file, src)
if err != nil {
file.Close()
return 0, err
}

chunkSize := int64(binary.Size(buf.Bytes()))
if chunkSize > MaxBlockBlobChunkSize {
return 0, fmt.Errorf("azurestore: Chunk of size %v too large. Max chunk size is %v", chunkSize, MaxBlockBlobChunkSize)
// Seek to the beginning
if _, err := file.Seek(0, 0); err != nil {
file.Close()
return 0, err
}

if n > MaxBlockBlobChunkSize {
file.Close()
return 0, fmt.Errorf("azurestore: Chunk of size %v too large. Max chunk size is %v", n, MaxBlockBlobChunkSize)
}

re := bytes.NewReader(buf.Bytes())
err = upload.BlockBlob.Upload(ctx, re)
err = upload.BlockBlob.Upload(ctx, file)
if err != nil {
file.Close()
return 0, err
}

if err := file.Close(); err != nil && !errors.Is(err, fs.ErrClosed) {
return 0, err
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/azurestore/azurestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,12 @@ func TestWriteChunk(t *testing.T) {
infoBlob.EXPECT().Download(ctx).Return(newReadCloser(data), nil).Times(1),
service.EXPECT().NewBlob(ctx, mockID).Return(blockBlob, nil).Times(1),
blockBlob.EXPECT().GetOffset(ctx).Return(offset, nil).Times(1),
blockBlob.EXPECT().Upload(ctx, bytes.NewReader([]byte(mockReaderData))).Return(nil).Times(1),
blockBlob.EXPECT().Upload(ctx, gomock.Any()).DoAndReturn(func(_ context.Context, reader io.ReadSeeker) error {
actual, err := io.ReadAll(reader)
assert.Nil(err)
assert.Equal(mockReaderData, string(actual))
return nil
}).Times(1),
)

upload, err := store.GetUpload(ctx, mockID)
Expand Down

0 comments on commit 1231fd5

Please sign in to comment.