Skip to content

Commit

Permalink
Merge branch 'bugfix/446462_upload_large_files' into 'develop'
Browse files Browse the repository at this point in the history
bugfix/446462_upload_large_files

See merge request upm-inesdata/inesdata-connector!42
  • Loading branch information
ralconada-gmv committed Nov 18, 2024
2 parents fc5a8ca + 1541175 commit 362f537
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provides;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.validator.spi.JsonObjectValidatorRegistry;

import static org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset.EDC_ASSET_TYPE;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,3 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.upm.inesdata.validator;

import jakarta.json.JsonObject;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ public void initialize(ServiceExtensionContext context) {

Region region = Region.of(regionName);

// Crear una instancia de S3Service
S3Service s3Service = new S3Service(accessKey, secretKey, endpointOverride, region, bucketName);
S3Service s3Service = new S3Service(accessKey, secretKey, endpointOverride, region, bucketName, monitor);

var storageAssetApiController = new StorageAssetApiController(assetService, managementApiTransformerRegistry,
validator,s3Service,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.info.Info;
import io.swagger.v3.oas.annotations.media.ArraySchema;
import io.swagger.v3.oas.annotations.media.Content;
Expand All @@ -10,41 +11,82 @@
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.json.JsonObject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.HeaderParam;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.eclipse.edc.api.model.ApiCoreSchema;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;

import java.io.InputStream;

@OpenAPIDefinition(
info = @Info(description = "Manages the connector s3 assets.",
title = "S3 Asset API", version = "1"))
info = @Info(description = "Manages the connector S3 assets.",
title = "S3 Asset API", version = "1"))
@Tag(name = "S3Asset")
@Consumes(MediaType.MULTIPART_FORM_DATA)
@Produces(MediaType.APPLICATION_JSON)
public interface StorageAssetApi {

/**
* Creates a new storage asset
* Uploads a chunk of a file for creating a new S3 asset.
*
* @param fileInputStream the input stream of the file to be uploaded
* @param fileDetail the details of the file to be uploaded
* @param assetJson the input stream of the asset metadata in JSON format
* @return JsonObject with the created asset
* @param contentDisposition The Content-Disposition header, which contains the file name.
* @param chunkIndex The index of the current chunk in the upload sequence.
* @param totalChunks The total number of chunks for this file.
* @param assetJson The asset info
* @param fileInputStream The input stream of the file chunk to be uploaded.
* @return JsonObject with status of the chunk upload or the next action.
*/
@Operation(description = "Creates a new S3 asset",
requestBody = @RequestBody(content = @Content(mediaType = MediaType.MULTIPART_FORM_DATA, schema = @Schema(
type = "object", requiredProperties = {"file", "json"}
))),
responses = {
@ApiResponse(responseCode = "200", description = "S3 asset was created successfully",
content = @Content(schema = @Schema(implementation = ApiCoreSchema.IdResponseSchema.class))),
@ApiResponse(responseCode = "400", description = "Request body was malformed",
content = @Content(array = @ArraySchema(schema = @Schema(implementation = ApiCoreSchema.ApiErrorDetailSchema.class)))),
@ApiResponse(responseCode = "409", description = "Could not create asset, because an asset with that ID already exists",
content = @Content(array = @ArraySchema(schema = @Schema(implementation = ApiCoreSchema.ApiErrorDetailSchema.class))))
}
@POST
@Path("/upload-chunk")
@Operation(
description = "Uploads a chunk of a file to create a new S3 asset.",
requestBody = @RequestBody(content = @Content(mediaType = MediaType.MULTIPART_FORM_DATA, schema = @Schema(
type = "object", requiredProperties = {"file", "json"}
))),
responses = {
@ApiResponse(responseCode = "200", description = "Chunk uploaded successfully",
content = @Content(schema = @Schema(implementation = ApiCoreSchema.IdResponseSchema.class))),
@ApiResponse(responseCode = "400", description = "Request body was malformed",
content = @Content(array = @ArraySchema(schema = @Schema(implementation = ApiCoreSchema.ApiErrorDetailSchema.class)))),
@ApiResponse(responseCode = "409", description = "Could not upload chunk, because of conflicts",
content = @Content(array = @ArraySchema(schema = @Schema(implementation = ApiCoreSchema.ApiErrorDetailSchema.class))))
}
)
JsonObject createStorageAsset(@FormDataParam("file") InputStream fileInputStream,
@FormDataParam("file") FormDataContentDisposition fileDetail,
@FormDataParam("json") JsonObject assetJson);
JsonObject uploadChunk(
@Parameter(description = "Content-Disposition header, which contains the file name") @HeaderParam("Content-Disposition") String contentDisposition,
@Parameter(description = "Index of the current chunk in the upload sequence") @HeaderParam("Chunk-Index") int chunkIndex,
@Parameter(description = "Total number of chunks for this file") @HeaderParam("Total-Chunks") int totalChunks,
@FormDataParam("json") JsonObject assetJson,
@FormDataParam("file") InputStream fileInputStream);

/**
* Finalizes the upload and creates the asset using the provided metadata (JSON).
*
* @param assetJson The metadata for the asset in JSON format.
* @param fileName The name of the uploaded file.
* @return JsonObject with the created asset or the status.
*/
@POST
@Path("/finalize-upload")
@Operation(
description = "Finalizes the chunked upload and creates the S3 asset using the provided metadata.",
requestBody = @RequestBody(content = @Content(mediaType = MediaType.MULTIPART_FORM_DATA, schema = @Schema(
type = "object", requiredProperties = {"json", "fileName"}
))),
responses = {
@ApiResponse(responseCode = "200", description = "Asset created successfully",
content = @Content(schema = @Schema(implementation = ApiCoreSchema.IdResponseSchema.class))),
@ApiResponse(responseCode = "400", description = "Request body was malformed",
content = @Content(array = @ArraySchema(schema = @Schema(implementation = ApiCoreSchema.ApiErrorDetailSchema.class)))),
@ApiResponse(responseCode = "409", description = "Asset could not be created due to conflict",
content = @Content(array = @ArraySchema(schema = @Schema(implementation = ApiCoreSchema.ApiErrorDetailSchema.class))))
}
)
JsonObject finalizeUpload(
@FormDataParam("fileName") String fileName,
@FormDataParam("json") JsonObject assetJson);
}
Original file line number Diff line number Diff line change
@@ -1,51 +1,46 @@
package org.upm.inesdata.storageasset.controller;

import jakarta.json.Json;
import jakarta.json.JsonObject;
import jakarta.servlet.annotation.MultipartConfig;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.*;
import jakarta.ws.rs.core.MediaType;
import org.eclipse.edc.api.model.IdResponse;
import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset;
import org.eclipse.edc.connector.controlplane.services.spi.asset.AssetService;
import org.eclipse.edc.jsonld.spi.JsonLd;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.constants.CoreConstants;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;
import org.eclipse.edc.util.string.StringUtils;
import org.eclipse.edc.validator.spi.JsonObjectValidatorRegistry;
import org.eclipse.edc.web.spi.exception.InvalidRequestException;
import org.eclipse.edc.web.spi.exception.ValidationFailureException;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;
import org.upm.inesdata.storageasset.service.S3Service;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.*;
import java.util.ArrayList;
import java.util.List;

import static org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset.EDC_ASSET_TYPE;
import static org.eclipse.edc.web.spi.exception.ServiceResultHandler.exceptionMapper;

@MultipartConfig
@Consumes(MediaType.MULTIPART_FORM_DATA)
@Produces(MediaType.APPLICATION_JSON)
@Path("/s3assets")
public class StorageAssetApiController implements StorageAssetApi {

private final TypeTransformerRegistry transformerRegistry;
private final AssetService service;
private final JsonObjectValidatorRegistry validator;
private final S3Service s3Service;

private final JsonLd jsonLd;

private final String bucketName;
private final String region;

public StorageAssetApiController(AssetService service, TypeTransformerRegistry transformerRegistry,
JsonObjectValidatorRegistry validator, S3Service s3Service, JsonLd jsonLd, String bucketName, String region) {
JsonObjectValidatorRegistry validator, S3Service s3Service, JsonLd jsonLd,
String bucketName, String region) {
this.transformerRegistry = transformerRegistry;
this.service = service;
this.validator = validator;
Expand All @@ -55,71 +50,91 @@ public StorageAssetApiController(AssetService service, TypeTransformerRegistry t
this.region = region;
}

/**
* Handles each chunk upload
*/
@POST
@Override
public JsonObject createStorageAsset(@FormDataParam("file") InputStream fileInputStream,
@FormDataParam("file") FormDataContentDisposition fileDetail, @FormDataParam("json") JsonObject assetJson) {

String fileName = fileDetail.getFileName();

InputStream bufferedInputStream = new BufferedInputStream(fileInputStream);
@Path("/upload-chunk")
@Consumes(MediaType.MULTIPART_FORM_DATA)
@Produces(MediaType.APPLICATION_JSON)
public JsonObject uploadChunk(@HeaderParam("Content-Disposition") String contentDisposition,
@HeaderParam("Chunk-Index") int chunkIndex,
@HeaderParam("Total-Chunks") int totalChunks,
@FormDataParam("json") JsonObject assetJson,
@FormDataParam("file") InputStream fileInputStream) {

JsonObject expand = jsonLd.expand(assetJson).orElseThrow((f) -> new EdcException("Failed to expand request"));
// Validación

validator.validate(EDC_ASSET_TYPE, expand).orElseThrow(ValidationFailureException::new);
Asset asset = transformerRegistry.transform(expand, Asset.class).orElseThrow(InvalidRequestException::new);

// Transformación
var asset = transformerRegistry.transform(expand, Asset.class).orElseThrow(InvalidRequestException::new);
String fileName = contentDisposition.split("filename=")[1].replace("\"", "");
String folder = String.valueOf(asset.getDataAddress().getProperties().get(CoreConstants.EDC_NAMESPACE+"folder"));

// Guardar fichero en MinIO
// Calcular el tamaño del fichero manualmente
long contentLength = 0;
try {
contentLength = getFileSize(bufferedInputStream);
} catch (IOException e) {
throw new EdcException("Failed to process file size", e);
// Construct the S3 key for the file, keeping the folder structure
String fullKey;
if (folder == null || folder.trim().isEmpty() || "null".equals(folder)) {
fullKey = fileName; // No folder, use the file name
} else {
fullKey = folder.endsWith("/") ? folder + fileName : folder + "/" + fileName;
}
String folder = String.valueOf(asset.getDataAddress().getProperties().get(CoreConstants.EDC_NAMESPACE+"folder"));
String fullKey = StringUtils.isNullOrBlank(folder) || "null".equals(folder)?fileName:(folder.endsWith("/") ? folder + fileName : folder + "/" + fileName);
s3Service.uploadFile(fullKey,bufferedInputStream, contentLength);
try {
setStorageProperties(asset, fullKey);

// Creación de asset
var idResponse = service.create(asset)
.map(a -> IdResponse.Builder.newInstance().id(a.getId()).createdAt(a.getCreatedAt()).build())
.orElseThrow(exceptionMapper(Asset.class, asset.getId()));
// Handle file upload chunking
try {
s3Service.uploadChunk(fullKey, fileInputStream, chunkIndex, totalChunks);

return transformerRegistry.transform(idResponse, JsonObject.class)
.orElseThrow(f -> new EdcException(f.getFailureDetail()));
// Return successful upload status for each chunk
return Json.createObjectBuilder()
.add("status", "Chunk " + chunkIndex + " uploaded successfully")
.build();
} catch (IOException e) {
// If an error occurs, delete the file from S3
s3Service.deleteFile(fullKey);
throw new EdcException("Failed to read or upload chunk", e);
} catch (Exception e) {
// Eliminar el archivo en caso de fallo
// If an error occurs, delete the file from S3
s3Service.deleteFile(fullKey);
throw new EdcException("Failed to process multipart data", e);
throw new EdcException("Failed to process chunked data", e);
}
}

private long getFileSize(InputStream inputStream) throws IOException {
byte[] buffer = new byte[8192];
int bytesRead;
long size = 0;

inputStream.mark(Integer.MAX_VALUE);
/**
* Finalize upload and create asset with JSON data
*/
@POST
@Path("/finalize-upload")
@Consumes(MediaType.MULTIPART_FORM_DATA)
@Produces(MediaType.APPLICATION_JSON)
public JsonObject finalizeUpload(@FormDataParam("fileName") String fileName,
@FormDataParam("json") JsonObject assetJson) {

JsonObject expand = jsonLd.expand(assetJson).orElseThrow((f) -> new EdcException("Failed to expand request"));

while ((bytesRead = inputStream.read(buffer)) != -1) {
size += bytesRead;
}
validator.validate(EDC_ASSET_TYPE, expand).orElseThrow(ValidationFailureException::new);
Asset asset = transformerRegistry.transform(expand, Asset.class).orElseThrow(InvalidRequestException::new);

// Set storage properties for the asset
setStorageProperties(asset, fileName);

inputStream.reset();
// Create the asset in the service
IdResponse idResponse = service.create(asset)
.map(a -> IdResponse.Builder.newInstance().id(a.getId()).createdAt(a.getCreatedAt()).build())
.orElseThrow(exceptionMapper(Asset.class, asset.getId()));

return size;
// Return the response for the created asset
return transformerRegistry.transform(idResponse, JsonObject.class)
.orElseThrow(f -> new EdcException(f.getFailureDetail()));
}

/**
* Set necessary storage properties for the asset in S3.
*/
private void setStorageProperties(Asset asset, String fileName) {
asset.getPrivateProperties().put("storageAssetFile", fileName);
asset.getDataAddress().setKeyName(fileName);
asset.getDataAddress().setType("AmazonS3");
asset.getDataAddress().getProperties().put(CoreConstants.EDC_NAMESPACE+ "bucketName", bucketName);
asset.getDataAddress().getProperties().put(CoreConstants.EDC_NAMESPACE+"region", region);
asset.getDataAddress().getProperties().put(CoreConstants.EDC_NAMESPACE + "bucketName", bucketName);
asset.getDataAddress().getProperties().put(CoreConstants.EDC_NAMESPACE + "region", region);
}
}
Loading

0 comments on commit 362f537

Please sign in to comment.