diff --git a/pom.xml b/pom.xml index 1634a092..f6f834cc 100644 --- a/pom.xml +++ b/pom.xml @@ -308,15 +308,10 @@ Including HTTP client implementations directly allows for configuring and customization of the clients. See https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/http-configuration-apache.html --> - + software.amazon.awssdk - netty-nio-client - - - - software.amazon.awssdk - aws-crt-client + apache-client diff --git a/src/main/java/no/entur/uttu/export/blob/S3BlobStoreRepository.java b/src/main/java/no/entur/uttu/export/blob/S3BlobStoreRepository.java index b1a09d57..f33bee92 100644 --- a/src/main/java/no/entur/uttu/export/blob/S3BlobStoreRepository.java +++ b/src/main/java/no/entur/uttu/export/blob/S3BlobStoreRepository.java @@ -1,23 +1,22 @@ package no.entur.uttu.export.blob; +import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.function.Function; import org.rutebanken.helper.storage.BlobAlreadyExistsException; +import org.rutebanken.helper.storage.BlobStoreException; import org.rutebanken.helper.storage.repository.BlobStoreRepository; -import software.amazon.awssdk.core.async.AsyncRequestBody; -import software.amazon.awssdk.core.async.AsyncResponseTransformer; -import software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody; -import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.core.sync.ResponseTransformer; +import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; import software.amazon.awssdk.services.s3.model.ObjectIdentifier; -import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.services.s3.model.S3Object; -import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher; +import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable; /** * AWS S3 backed implementation of {@link BlobStoreRepository}. @@ -33,22 +32,20 @@ public class S3BlobStoreRepository implements BlobStoreRepository { */ private static final long UNKNOWN_LATEST_VERSION = 0; - private final S3AsyncClient s3AsyncClient; + private final S3Client s3Client; private String containerName; - public S3BlobStoreRepository(S3AsyncClient s3AsyncClient) { - this.s3AsyncClient = Objects.requireNonNull(s3AsyncClient); + public S3BlobStoreRepository(S3Client s3Client) { + this.s3Client = Objects.requireNonNull(s3Client); } @Override public InputStream getBlob(String objectName) { - return s3AsyncClient - .getObject( - GetObjectRequest.builder().bucket(containerName).key(objectName).build(), - AsyncResponseTransformer.toBlockingInputStream() - ) - .join(); + return s3Client.getObject( + GetObjectRequest.builder().bucket(containerName).key(objectName).build(), + ResponseTransformer.toInputStream() + ); } @Override @@ -58,11 +55,14 @@ public long uploadBlob(String objectName, InputStream inputStream) { @Override public long uploadBlob(String objectName, InputStream inputStream, String contentType) { - BlockingInputStreamAsyncRequestBody body = AsyncRequestBody.forBlockingInputStream( - null - ); // 'null' indicates a stream will be provided later. + RequestBody body = null; + try { + body = RequestBody.fromBytes(inputStream.readAllBytes()); + } catch (IOException e) { + throw new BlobStoreException("Failed to read all bytes from given InputStream", e); + } - CompletableFuture responseFuture = s3AsyncClient.putObject( + s3Client.putObject( r -> { r.bucket(containerName).key(objectName); if (contentType != null) { @@ -71,11 +71,6 @@ public long uploadBlob(String objectName, InputStream inputStream, String conten }, body ); - - // Provide the stream of data to be uploaded. - long v = body.writeInputStream(inputStream); - - PutObjectResponse r = responseFuture.join(); // Wait for the response. return UNKNOWN_LATEST_VERSION; } @@ -91,13 +86,14 @@ public long uploadNewBlob(String objectName, InputStream inputStream) { } private boolean objectExists(String containerName, String objectName) { - return s3AsyncClient - .headObject(headObjectRequest -> + try { + s3Client.headObject(headObjectRequest -> headObjectRequest.bucket(containerName).key(objectName) - ) - .exceptionally(throwable -> null) - .thenApply(Objects::nonNull) - .join(); + ); + return true; + } catch (NoSuchKeyException e) { + return false; + } } @Override @@ -107,15 +103,13 @@ public void copyBlob( String targetContainerName, String targetObjectName ) { - s3AsyncClient - .copyObject(copyObjectRequest -> - copyObjectRequest - .sourceBucket(sourceContainerName) - .sourceKey(sourceObjectName) - .destinationBucket(targetContainerName) - .destinationKey(targetObjectName) - ) - .join(); + s3Client.copyObject(copyObjectRequest -> + copyObjectRequest + .sourceBucket(sourceContainerName) + .sourceKey(sourceObjectName) + .destinationBucket(targetContainerName) + .destinationKey(targetObjectName) + ); } @Override @@ -164,7 +158,7 @@ private String trimPrefix(String prefix, String s) { @Override public boolean delete(String objectName) { - s3AsyncClient.deleteObject(r -> r.bucket(containerName).key(objectName)).join(); + s3Client.deleteObject(r -> r.bucket(containerName).key(objectName)); return !objectExists(containerName, objectName); } @@ -179,11 +173,10 @@ public boolean deleteAllFilesInFolder(String folder) { .stream() .map(s3Object -> ObjectIdentifier.builder().key(s3Object.key()).build()) .toList(); - return s3AsyncClient + return s3Client .deleteObjects(deleteObjectsRequest -> deleteObjectsRequest.delete(delete -> delete.objects(objectIdentifiers)) ) - .join() .errors() .isEmpty(); } @@ -200,20 +193,12 @@ private List iteratePrefix( String prefix, Function, U> mapper ) { - ListObjectsV2Publisher publisher = s3AsyncClient.listObjectsV2Paginator(req -> + ListObjectsV2Iterable iterable = s3Client.listObjectsV2Paginator(req -> req.bucket(containerName).prefix(prefix) ); List pageResults = new ArrayList<>(); - CompletableFuture future = publisher.subscribe(res -> - pageResults.add(mapper.apply(res.contents())) - ); - try { - future.get(); - } catch (InterruptedException | ExecutionException ignored) { - // ignored on purpose - } - + iterable.forEach(a -> pageResults.add(mapper.apply(a.contents()))); return pageResults; } diff --git a/src/main/java/no/entur/uttu/export/blob/S3BlobStoreRepositoryConfig.java b/src/main/java/no/entur/uttu/export/blob/S3BlobStoreRepositoryConfig.java index 8fe33300..cf981979 100644 --- a/src/main/java/no/entur/uttu/export/blob/S3BlobStoreRepositoryConfig.java +++ b/src/main/java/no/entur/uttu/export/blob/S3BlobStoreRepositoryConfig.java @@ -17,6 +17,7 @@ package no.entur.uttu.export.blob; import java.net.URI; +import java.time.Duration; import org.rutebanken.helper.storage.repository.BlobStoreRepository; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; @@ -26,9 +27,10 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.s3.S3AsyncClient; -import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; @Configuration @Profile("s3-blobstore") @@ -43,11 +45,9 @@ public class S3BlobStoreRepositoryConfig { @Bean BlobStoreRepository blobStoreRepository( @Value("${blobstore.s3.bucket}") String containerName, - S3AsyncClient s3AsyncClient + S3Client s3Client ) { - S3BlobStoreRepository s3BlobStoreRepository = new S3BlobStoreRepository( - s3AsyncClient - ); + S3BlobStoreRepository s3BlobStoreRepository = new S3BlobStoreRepository(s3Client); s3BlobStoreRepository.setContainerName(containerName); return s3BlobStoreRepository; } @@ -70,14 +70,22 @@ public AwsCredentialsProvider cloudCredentials() { } @Bean - public S3AsyncClient s3AsyncClient(AwsCredentialsProvider credentialsProvider) { - S3CrtAsyncClientBuilder b = S3AsyncClient - .crtBuilder() + public S3Client s3Client(AwsCredentialsProvider credentialsProvider) { + S3ClientBuilder builder = S3Client + .builder() + .region(Region.of(region)) .credentialsProvider(credentialsProvider) - .region(Region.of(region)); + .overrideConfiguration( + ClientOverrideConfiguration + .builder() + .apiCallAttemptTimeout(Duration.ofSeconds(15)) + .apiCallTimeout(Duration.ofSeconds(15)) + .retryPolicy(retryPolicy -> retryPolicy.numRetries(5)) + .build() + ); if (endpointOverride != null) { - b = b.endpointOverride(URI.create(endpointOverride)); + builder = builder.endpointOverride(URI.create(endpointOverride)); } - return b.build(); + return builder.build(); } } diff --git a/src/test/java/no/entur/uttu/export/blob/S3BlobStoreRepositoryTest.java b/src/test/java/no/entur/uttu/export/blob/S3BlobStoreRepositoryTest.java index 4cb4b873..fab4e6ad 100644 --- a/src/test/java/no/entur/uttu/export/blob/S3BlobStoreRepositoryTest.java +++ b/src/test/java/no/entur/uttu/export/blob/S3BlobStoreRepositoryTest.java @@ -2,7 +2,6 @@ import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; -import java.util.concurrent.ExecutionException; import no.entur.uttu.UttuIntegrationTest; import org.jetbrains.annotations.NotNull; import org.junit.Assert; @@ -19,10 +18,10 @@ import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.s3.S3AsyncClient; -import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import software.amazon.awssdk.services.s3.model.NoSuchBucketException; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; @Testcontainers @ActiveProfiles({ "s3-blobstore" }) @@ -33,7 +32,7 @@ public class S3BlobStoreRepositoryTest extends UttuIntegrationTest { private static LocalStackContainer localStack; @Autowired - private S3AsyncClient s3AsyncClient; + private S3Client s3Client; @Autowired private S3BlobStoreRepository blobStore; @@ -50,19 +49,12 @@ static void blobStoreProperties(DynamicPropertyRegistry registry) { registry.add("blobstore.s3.bucket", () -> TEST_BUCKET); } - private void createBucket(String bucketName) - throws ExecutionException, InterruptedException { - s3AsyncClient - .headBucket(request -> request.bucket(bucketName)) - .exceptionally(throwable -> { - if (throwable.getCause() instanceof NoSuchBucketException) { - s3AsyncClient - .createBucket(CreateBucketRequest.builder().bucket(bucketName).build()) - .join(); - } - return null; - }) - .get(); + private void createBucket(String bucketName) { + try { + s3Client.headBucket(request -> request.bucket(bucketName)); + } catch (NoSuchBucketException e) { + s3Client.createBucket(request -> request.bucket(bucketName)); + } } @BeforeClass @@ -114,9 +106,9 @@ public void canSetContentTypeForUpload() throws Exception { contentType ); assertBlobExists(TEST_BUCKET, "json", true); - HeadObjectResponse response = s3AsyncClient - .headObject(request -> request.bucket(TEST_BUCKET).key("json")) - .join(); + HeadObjectResponse response = s3Client.headObject(request -> + request.bucket(TEST_BUCKET).key("json") + ); Assert.assertEquals(contentType, response.contentType()); } @@ -180,12 +172,11 @@ public void canCopyAllBlobsWithSharedPrefix() throws Exception { return new ByteArrayInputStream(source.getBytes(StandardCharsets.UTF_8)); } - private void assertBlobExists(String bucket, String key, boolean exists) - throws ExecutionException, InterruptedException { - HeadObjectResponse response = s3AsyncClient - .headObject(request -> request.bucket(bucket).key(key)) - .exceptionally(throwable -> null) - .get(); + private void assertBlobExists(String bucket, String key, boolean exists) { + HeadObjectResponse response = null; + try { + response = s3Client.headObject(request -> request.bucket(bucket).key(key)); + } catch (NoSuchKeyException ignored) {} if (!exists && response != null) { Assert.fail(bucket + " / " + key + " exists"); }