Skip to content

Commit

Permalink
Add multipart upload integration for translog and segment files
Browse files Browse the repository at this point in the history
Signed-off-by: Raghuvansh Raj <[email protected]>
  • Loading branch information
raghuvanshraj committed May 24, 2023
1 parent c89ba18 commit 74518bc
Show file tree
Hide file tree
Showing 23 changed files with 1,224 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,17 @@ public Settings indexSettings() {
.build();
}

protected void putRepository(Path path) {
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", path))
);
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Path absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
putRepository(absolutePath);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public Settings indexSettings() {
return remoteStoreIndexSettings(0);
}

private Settings remoteStoreIndexSettings(int numberOfReplicas) {
protected Settings remoteStoreIndexSettings(int numberOfReplicas) {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore.multipart;

import org.junit.After;
import org.junit.Before;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.remotestore.multipart.mocks.MockFsRepository;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class RemoteStoreMultipartFileCorruptionIT extends OpenSearchIntegTestCase {

protected static final String REPOSITORY_NAME = "test-remore-store-repo";
private static final String INDEX_NAME = "remote-store-test-idx-1";

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockFsRepositoryPlugin.class)).collect(Collectors.toList());
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build();
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Path absolutePath = randomRepoPath().toAbsolutePath();
putRepository(absolutePath);
}

protected void putRepository(Path path) {
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME)
.setType(MockFsRepositoryPlugin.TYPE)
.setSettings(
Settings.builder()
.put("location", path)
// custom setting for MockFsRepositoryPlugin
.put(MockFsRepository.TRIGGER_DATA_INTEGRITY_FAILURE.getKey(), true)
)
);
}

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}

protected Settings remoteStoreIndexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put("index.refresh_interval", "300s")
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

private IndexResponse indexSingleDoc() {
return client().prepareIndex(INDEX_NAME)
.setId(UUIDs.randomBase64UUID())
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
.get();
}

public void testLocalFileCorruptionDuringUpload() {
internalCluster().startDataOnlyNodes(1);
createIndex(INDEX_NAME, remoteStoreIndexSettings());
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

indexSingleDoc();

client().admin()
.indices()
.prepareRefresh(INDEX_NAME)
.setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_HIDDEN_FORBID_CLOSED)
.execute()
.actionGet();

// ensuring red cluster meaning shard has failed and is unassigned
ensureRed(INDEX_NAME);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore.multipart;

import org.opensearch.common.settings.Settings;
import org.opensearch.plugins.Plugin;
import org.opensearch.remotestore.RemoteStoreIT;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;

import java.nio.file.Path;
import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class RemoteStoreMultipartIT extends RemoteStoreIT {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockFsRepositoryPlugin.class)).collect(Collectors.toList());
}

@Override
protected void putRepository(Path path) {
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME)
.setType(MockFsRepositoryPlugin.TYPE)
.setSettings(Settings.builder().put("location", path))
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore.multipart.mocks;

import org.apache.lucene.index.CorruptIndexException;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.fs.FsBlobContainer;
import org.opensearch.common.blobstore.fs.FsBlobStore;
import org.opensearch.common.blobstore.stream.write.WriteContext;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class MockFsBlobContainer extends FsBlobContainer {

private static final int TRANSFER_TIMEOUT_MILLIS = 30000;

private final boolean triggerDataIntegrityFailure;

public MockFsBlobContainer(FsBlobStore blobStore, BlobPath blobPath, Path path, boolean triggerDataIntegrityFailure) {
super(blobStore, blobPath, path);
this.triggerDataIntegrityFailure = triggerDataIntegrityFailure;
}

@Override
public boolean isMultiStreamUploadSupported() {
return true;
}

@Override
public CompletableFuture<Void> writeBlobByStreams(WriteContext writeContext) throws IOException {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();

int nParts = 10;
long partSize = writeContext.getFileSize() / nParts;
StreamContext streamContext = writeContext.getStreamProvider(partSize);
final Path file = path.resolve(writeContext.getFileName());
byte[] buffer = new byte[(int) writeContext.getFileSize()];
AtomicLong totalContentRead = new AtomicLong();
CountDownLatch latch = new CountDownLatch(streamContext.getNumberOfParts());
for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) {
int finalPartIdx = partIdx;
Thread thread = new Thread(() -> {
try {
InputStreamContainer inputStreamContainer = streamContext.provideStream(finalPartIdx);
InputStream inputStream = inputStreamContainer.getInputStream();
long remainingContentLength = inputStreamContainer.getContentLength();
long offset = partSize * finalPartIdx;
while (remainingContentLength > 0) {
int readContentLength = inputStream.read(buffer, (int) offset, (int) remainingContentLength);
totalContentRead.addAndGet(readContentLength);
remainingContentLength -= readContentLength;
offset += readContentLength;
}
inputStream.close();
} catch (IOException e) {
completableFuture.completeExceptionally(e);
} finally {
latch.countDown();
}
});
thread.start();
}
try {
if (!latch.await(TRANSFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
throw new IOException("Timed out waiting for file transfer to complete for " + writeContext.getFileName());
}
} catch (InterruptedException e) {
throw new IOException("Await interrupted on CountDownLatch, transfer failed for " + writeContext.getFileName());
}
try (OutputStream outputStream = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)) {
outputStream.write(buffer);
}
if (writeContext.getFileSize() != totalContentRead.get()) {
throw new IOException(
"Incorrect content length read for file "
+ writeContext.getFileName()
+ ", actual file size: "
+ writeContext.getFileSize()
+ ", bytes read: "
+ totalContentRead.get()
);
}

try {
// bulks need to succeed for segment files to be generated
if (isSegmentFile(writeContext.getFileName()) && triggerDataIntegrityFailure) {
completableFuture.completeExceptionally(
new RuntimeException(
new CorruptIndexException(
"Data integrity check failure for file: " + writeContext.getFileName(),
writeContext.getFileName()
)
)
);
} else {
writeContext.getUploadFinalizer().accept(true);
completableFuture.complete(null);
}
} catch (Exception e) {
completableFuture.completeExceptionally(e);
}

return completableFuture;
}

private boolean isSegmentFile(String filename) {
return !filename.endsWith(".tlog") && !filename.endsWith(".ckp");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore.multipart.mocks;

import org.opensearch.OpenSearchException;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.fs.FsBlobStore;

import java.io.IOException;
import java.nio.file.Path;

public class MockFsBlobStore extends FsBlobStore {

private final boolean triggerDataIntegrityFailure;

public MockFsBlobStore(int bufferSizeInBytes, Path path, boolean readonly, boolean triggerDataIntegrityFailure) throws IOException {
super(bufferSizeInBytes, path, readonly);
this.triggerDataIntegrityFailure = triggerDataIntegrityFailure;
}

@Override
public BlobContainer blobContainer(BlobPath path) {
try {
return new MockFsBlobContainer(this, path, buildAndCreate(path), triggerDataIntegrityFailure);
} catch (IOException ex) {
throw new OpenSearchException("failed to create blob container", ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore.multipart.mocks;

import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.fs.FsBlobStore;
import org.opensearch.common.settings.Setting;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.repositories.fs.FsRepository;

public class MockFsRepository extends FsRepository {

public static Setting<Boolean> TRIGGER_DATA_INTEGRITY_FAILURE = Setting.boolSetting(
"mock_fs_repository.trigger_data_integrity_failure",
false
);

private final boolean triggerDataIntegrityFailure;

public MockFsRepository(
RepositoryMetadata metadata,
Environment environment,
NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService,
RecoverySettings recoverySettings
) {
super(metadata, environment, namedXContentRegistry, clusterService, recoverySettings);
triggerDataIntegrityFailure = TRIGGER_DATA_INTEGRITY_FAILURE.get(metadata.settings());
}

@Override
protected BlobStore createBlobStore() throws Exception {
FsBlobStore fsBlobStore = (FsBlobStore) super.createBlobStore();
return new MockFsBlobStore(fsBlobStore.bufferSizeInBytes(), fsBlobStore.path(), isReadOnly(), triggerDataIntegrityFailure);
}
}
Loading

0 comments on commit 74518bc

Please sign in to comment.