Skip to content

Commit

Permalink
Merge pull request #6892 from ant-media/S3TransferBufferSizeConfigura…
Browse files Browse the repository at this point in the history
…tion

S3 transfer buffer size configuration
  • Loading branch information
mekya authored Dec 21, 2024
2 parents f1a7417 + bb238e2 commit 60001d8
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/main/java/io/antmedia/AntMediaApplicationAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -2288,6 +2288,7 @@ public void setStorageclientSettings(AppSettings settings) {
storageClient.setPermission(settings.getS3Permission());
storageClient.setStorageClass(settings.getS3StorageClass());
storageClient.setCacheControl(settings.getS3CacheControl());
storageClient.setTransferBufferSize(settings.getS3TransferBufferSizeInBytes());
storageClient.reset();
}

Expand Down
20 changes: 20 additions & 0 deletions src/main/java/io/antmedia/AppSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -1793,6 +1793,7 @@ public class AppSettings implements Serializable{
public static final String APPLICATION_STATUS_INSTALLATION_FAILED = "installationFailed";



/**
* Describes the application installation status. Possible values:
*
Expand Down Expand Up @@ -2000,6 +2001,17 @@ public class AppSettings implements Serializable{
*/
@Value("${s3Permission:${"+SETTINGS_S3_PERMISSION+":public-read}}")
private String s3Permission = "public-read";


/**
* S3 Transfer Buffer Size
* This describes to buffer size to keep transferring data. It should be
* bigger than ts segment file size for HLS continuous upload.
* Otherwise chunk update may cannot be retried in case of any network break.
*/
@Value("${s3TransferBufferSizeInBytes:10000000}")
private int s3TransferBufferSizeInBytes = 10000000;


/**
* HLS Encryption key info file full path.
Expand Down Expand Up @@ -4138,4 +4150,12 @@ public long getAppInstallationTime() {
public void setAppInstallationTime(long appInstallationTime) {
this.appInstallationTime = appInstallationTime;
}

public int getS3TransferBufferSizeInBytes() {
return s3TransferBufferSizeInBytes;
}

public void setS3TransferBufferSizeInBytes(int s3TransferBufferSizeInBytes) {
this.s3TransferBufferSizeInBytes = s3TransferBufferSizeInBytes;
}
}
1 change: 1 addition & 0 deletions src/main/java/io/antmedia/servlet/UploadHLSChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public void uploadHLSChunk(StorageClient storageClient, ConfigurableWebApplicati

String s3FileKey = getS3Key(req, appSettings);

//TODO: we overwrite progressListener for the ongoing upload here. This may make logs misleading.
storageClient.setProgressListener(event -> {
if (event.getEventType() == ProgressEventType.TRANSFER_FAILED_EVENT)
{
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/antmedia/storage/AmazonS3StorageClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,9 @@ public void save(String key, File file, InputStream inputStream, boolean deleteL
{
putRequest = new PutObjectRequest(getStorageName(), key, inputStream, metadata);
}


putRequest.getRequestClientOptions().setReadLimit(getTransferBufferSize());

putRequest.setCannedAcl(getCannedAcl());

if(checkStorageClass(getStorageClass())){
Expand Down Expand Up @@ -187,7 +189,7 @@ public void save(String key, File file, InputStream inputStream, boolean deleteL
}
}

private void listenUploadProgress(String key, File file, boolean deleteLocalFile, Upload upload) {
public void listenUploadProgress(String key, File file, boolean deleteLocalFile, Upload upload) {
upload.addProgressListener((ProgressListener)event ->
{
if (event.getEventType() == ProgressEventType.TRANSFER_FAILED_EVENT)
Expand Down
12 changes: 11 additions & 1 deletion src/main/java/io/antmedia/storage/StorageClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public abstract class StorageClient {

protected ProgressListener progressListener;


private int transferBufferSize;

/**
* Delete file from storage
*
Expand Down Expand Up @@ -180,4 +181,13 @@ public String getCacheControl() {
public void setCacheControl(String cacheControl) {
this.cacheControl = cacheControl;
}

public int getTransferBufferSize() {
return transferBufferSize;
}


public void setTransferBufferSize(int transferBufferSize) {
this.transferBufferSize = transferBufferSize;
}
}
8 changes: 7 additions & 1 deletion src/test/java/io/antmedia/test/AppSettingsUnitTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;

import com.amazonaws.RequestClientOptions;
import com.google.gson.Gson;

import io.antmedia.AntMediaApplicationAdapter;
Expand Down Expand Up @@ -652,13 +653,18 @@ public void testUnsetAppSettings(AppSettings appSettings) {

appSettings.setAppInstallationTime(100);
assertEquals(100, appSettings.getAppInstallationTime());


assertEquals(10000000, appSettings.getS3TransferBufferSizeInBytes());
appSettings.setS3TransferBufferSizeInBytes(50000);
assertEquals(50000, appSettings.getS3TransferBufferSizeInBytes());

//if we add a new field, we just need to check its default value in this test
//When a new field is added or removed please update the number of fields and make this test pass
//by also checking its default value.

assertEquals("New field is added to settings. PAY ATTENTION: Please CHECK ITS DEFAULT VALUE and fix the number of fields.",
196, numberOfFields);
197, numberOfFields);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -27,6 +33,7 @@
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.Upload;
Expand Down Expand Up @@ -276,5 +283,28 @@ public void testCannedAcl() {
assertEquals(CannedAccessControlList.AwsExecRead, storage.getCannedAcl());


}

@Test
public void testS3TransferBufferSize() {
AmazonS3StorageClient storage = spy(new AmazonS3StorageClient());
int s3TransferBufferSize = 5000;

storage.setTransferBufferSize(s3TransferBufferSize);

TransferManager tm = Mockito.mock(TransferManager.class);
Mockito.doReturn(tm).when(storage).getTransferManager();
Mockito.doReturn(true).when(storage).isEnabled();
Mockito.doNothing().when(storage).listenUploadProgress(anyString(), nullable(File.class), anyBoolean(), any());


storage.save("key", null, mock(InputStream.class), false, false);

ArgumentCaptor<PutObjectRequest> putObjectRequestCaptor = ArgumentCaptor.forClass(PutObjectRequest.class);
Mockito.verify(tm).upload(putObjectRequestCaptor.capture());
assertEquals(s3TransferBufferSize, putObjectRequestCaptor.getValue().getRequestClientOptions().getReadLimit());



}
}

0 comments on commit 60001d8

Please sign in to comment.