Skip to content

Commit

Permalink
Merge pull request #338 from esuomi/rework_s3_integration_to_use_nona…
Browse files Browse the repository at this point in the history
…sync_client

rework S3BlobStoreRepository to use the more stable AWS S3 (non-async) client instead
  • Loading branch information
testower authored Jun 11, 2024
2 parents 8791537 + 0cf7434 commit 89ade25
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 99 deletions.
9 changes: 2 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -308,15 +308,10 @@
Including HTTP client implementations directly allows for configuring and customization of the clients.
See https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/http-configuration-apache.html
-->
<!-- asynchronous HTTP client for all service clients -->
<!-- synchronous HTTP client for all service clients -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>netty-nio-client</artifactId>
</dependency>
<!-- CRT client is a specialization of the async client which allows for increased performance -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>aws-crt-client</artifactId>
<artifactId>apache-client</artifactId>
</dependency>
<!-- AWS Simple Storage Service service client -->
<dependency>
Expand Down
93 changes: 39 additions & 54 deletions src/main/java/no/entur/uttu/export/blob/S3BlobStoreRepository.java
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
package no.entur.uttu.export.blob;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import org.rutebanken.helper.storage.BlobAlreadyExistsException;
import org.rutebanken.helper.storage.BlobStoreException;
import org.rutebanken.helper.storage.repository.BlobStoreRepository;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;

/**
* <a href="https://aws.amazon.com/s3/">AWS S3</a> backed implementation of {@link BlobStoreRepository}.
Expand All @@ -33,22 +32,20 @@ public class S3BlobStoreRepository implements BlobStoreRepository {
*/
private static final long UNKNOWN_LATEST_VERSION = 0;

private final S3AsyncClient s3AsyncClient;
private final S3Client s3Client;

private String containerName;

public S3BlobStoreRepository(S3AsyncClient s3AsyncClient) {
this.s3AsyncClient = Objects.requireNonNull(s3AsyncClient);
public S3BlobStoreRepository(S3Client s3Client) {
this.s3Client = Objects.requireNonNull(s3Client);
}

@Override
public InputStream getBlob(String objectName) {
return s3AsyncClient
.getObject(
GetObjectRequest.builder().bucket(containerName).key(objectName).build(),
AsyncResponseTransformer.toBlockingInputStream()
)
.join();
return s3Client.getObject(
GetObjectRequest.builder().bucket(containerName).key(objectName).build(),
ResponseTransformer.toInputStream()
);
}

@Override
Expand All @@ -58,11 +55,14 @@ public long uploadBlob(String objectName, InputStream inputStream) {

@Override
public long uploadBlob(String objectName, InputStream inputStream, String contentType) {
BlockingInputStreamAsyncRequestBody body = AsyncRequestBody.forBlockingInputStream(
null
); // 'null' indicates a stream will be provided later.
RequestBody body = null;
try {
body = RequestBody.fromBytes(inputStream.readAllBytes());
} catch (IOException e) {
throw new BlobStoreException("Failed to read all bytes from given InputStream", e);
}

CompletableFuture<PutObjectResponse> responseFuture = s3AsyncClient.putObject(
s3Client.putObject(
r -> {
r.bucket(containerName).key(objectName);
if (contentType != null) {
Expand All @@ -71,11 +71,6 @@ public long uploadBlob(String objectName, InputStream inputStream, String conten
},
body
);

// Provide the stream of data to be uploaded.
long v = body.writeInputStream(inputStream);

PutObjectResponse r = responseFuture.join(); // Wait for the response.
return UNKNOWN_LATEST_VERSION;
}

Expand All @@ -91,13 +86,14 @@ public long uploadNewBlob(String objectName, InputStream inputStream) {
}

private boolean objectExists(String containerName, String objectName) {
return s3AsyncClient
.headObject(headObjectRequest ->
try {
s3Client.headObject(headObjectRequest ->
headObjectRequest.bucket(containerName).key(objectName)
)
.exceptionally(throwable -> null)
.thenApply(Objects::nonNull)
.join();
);
return true;
} catch (NoSuchKeyException e) {
return false;
}
}

@Override
Expand All @@ -107,15 +103,13 @@ public void copyBlob(
String targetContainerName,
String targetObjectName
) {
s3AsyncClient
.copyObject(copyObjectRequest ->
copyObjectRequest
.sourceBucket(sourceContainerName)
.sourceKey(sourceObjectName)
.destinationBucket(targetContainerName)
.destinationKey(targetObjectName)
)
.join();
s3Client.copyObject(copyObjectRequest ->
copyObjectRequest
.sourceBucket(sourceContainerName)
.sourceKey(sourceObjectName)
.destinationBucket(targetContainerName)
.destinationKey(targetObjectName)
);
}

@Override
Expand Down Expand Up @@ -164,7 +158,7 @@ private String trimPrefix(String prefix, String s) {

@Override
public boolean delete(String objectName) {
s3AsyncClient.deleteObject(r -> r.bucket(containerName).key(objectName)).join();
s3Client.deleteObject(r -> r.bucket(containerName).key(objectName));

return !objectExists(containerName, objectName);
}
Expand All @@ -179,11 +173,10 @@ public boolean deleteAllFilesInFolder(String folder) {
.stream()
.map(s3Object -> ObjectIdentifier.builder().key(s3Object.key()).build())
.toList();
return s3AsyncClient
return s3Client
.deleteObjects(deleteObjectsRequest ->
deleteObjectsRequest.delete(delete -> delete.objects(objectIdentifiers))
)
.join()
.errors()
.isEmpty();
}
Expand All @@ -200,20 +193,12 @@ private <U> List<U> iteratePrefix(
String prefix,
Function<List<S3Object>, U> mapper
) {
ListObjectsV2Publisher publisher = s3AsyncClient.listObjectsV2Paginator(req ->
ListObjectsV2Iterable iterable = s3Client.listObjectsV2Paginator(req ->
req.bucket(containerName).prefix(prefix)
);

List<U> pageResults = new ArrayList<>();
CompletableFuture<Void> future = publisher.subscribe(res ->
pageResults.add(mapper.apply(res.contents()))
);
try {
future.get();
} catch (InterruptedException | ExecutionException ignored) {
// ignored on purpose
}

iterable.forEach(a -> pageResults.add(mapper.apply(a.contents())));
return pageResults;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package no.entur.uttu.export.blob;

import java.net.URI;
import java.time.Duration;
import org.rutebanken.helper.storage.repository.BlobStoreRepository;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
Expand All @@ -26,9 +27,10 @@
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;

@Configuration
@Profile("s3-blobstore")
Expand All @@ -43,11 +45,9 @@ public class S3BlobStoreRepositoryConfig {
@Bean
BlobStoreRepository blobStoreRepository(
@Value("${blobstore.s3.bucket}") String containerName,
S3AsyncClient s3AsyncClient
S3Client s3Client
) {
S3BlobStoreRepository s3BlobStoreRepository = new S3BlobStoreRepository(
s3AsyncClient
);
S3BlobStoreRepository s3BlobStoreRepository = new S3BlobStoreRepository(s3Client);
s3BlobStoreRepository.setContainerName(containerName);
return s3BlobStoreRepository;
}
Expand All @@ -70,14 +70,22 @@ public AwsCredentialsProvider cloudCredentials() {
}

@Bean
public S3AsyncClient s3AsyncClient(AwsCredentialsProvider credentialsProvider) {
S3CrtAsyncClientBuilder b = S3AsyncClient
.crtBuilder()
public S3Client s3Client(AwsCredentialsProvider credentialsProvider) {
S3ClientBuilder builder = S3Client
.builder()
.region(Region.of(region))
.credentialsProvider(credentialsProvider)
.region(Region.of(region));
.overrideConfiguration(
ClientOverrideConfiguration
.builder()
.apiCallAttemptTimeout(Duration.ofSeconds(15))
.apiCallTimeout(Duration.ofSeconds(15))
.retryPolicy(retryPolicy -> retryPolicy.numRetries(5))
.build()
);
if (endpointOverride != null) {
b = b.endpointOverride(URI.create(endpointOverride));
builder = builder.endpointOverride(URI.create(endpointOverride));
}
return b.build();
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;
import no.entur.uttu.UttuIntegrationTest;
import org.jetbrains.annotations.NotNull;
import org.junit.Assert;
Expand All @@ -19,10 +18,10 @@
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;

@Testcontainers
@ActiveProfiles({ "s3-blobstore" })
Expand All @@ -33,7 +32,7 @@ public class S3BlobStoreRepositoryTest extends UttuIntegrationTest {
private static LocalStackContainer localStack;

@Autowired
private S3AsyncClient s3AsyncClient;
private S3Client s3Client;

@Autowired
private S3BlobStoreRepository blobStore;
Expand All @@ -50,19 +49,12 @@ static void blobStoreProperties(DynamicPropertyRegistry registry) {
registry.add("blobstore.s3.bucket", () -> TEST_BUCKET);
}

private void createBucket(String bucketName)
throws ExecutionException, InterruptedException {
s3AsyncClient
.headBucket(request -> request.bucket(bucketName))
.exceptionally(throwable -> {
if (throwable.getCause() instanceof NoSuchBucketException) {
s3AsyncClient
.createBucket(CreateBucketRequest.builder().bucket(bucketName).build())
.join();
}
return null;
})
.get();
private void createBucket(String bucketName) {
try {
s3Client.headBucket(request -> request.bucket(bucketName));
} catch (NoSuchBucketException e) {
s3Client.createBucket(request -> request.bucket(bucketName));
}
}

@BeforeClass
Expand Down Expand Up @@ -114,9 +106,9 @@ public void canSetContentTypeForUpload() throws Exception {
contentType
);
assertBlobExists(TEST_BUCKET, "json", true);
HeadObjectResponse response = s3AsyncClient
.headObject(request -> request.bucket(TEST_BUCKET).key("json"))
.join();
HeadObjectResponse response = s3Client.headObject(request ->
request.bucket(TEST_BUCKET).key("json")
);
Assert.assertEquals(contentType, response.contentType());
}

Expand Down Expand Up @@ -180,12 +172,11 @@ public void canCopyAllBlobsWithSharedPrefix() throws Exception {
return new ByteArrayInputStream(source.getBytes(StandardCharsets.UTF_8));
}

private void assertBlobExists(String bucket, String key, boolean exists)
throws ExecutionException, InterruptedException {
HeadObjectResponse response = s3AsyncClient
.headObject(request -> request.bucket(bucket).key(key))
.exceptionally(throwable -> null)
.get();
private void assertBlobExists(String bucket, String key, boolean exists) {
HeadObjectResponse response = null;
try {
response = s3Client.headObject(request -> request.bucket(bucket).key(key));
} catch (NoSuchKeyException ignored) {}
if (!exists && response != null) {
Assert.fail(bucket + " / " + key + " exists");
}
Expand Down

0 comments on commit 89ade25

Please sign in to comment.