Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Add multipart upload integration for translog and segment files #7119

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
cd2f2dc
Add multipart upload integration for translog and segment files
raghuvanshraj May 24, 2023
8c13090
Addressing PR comments
raghuvanshraj Jun 5, 2023
a1b78b1
Removed areStreamsDecorated check from RemoteTransferContainer
raghuvanshraj Jun 9, 2023
a564200
Moved TranslogCheckedContainer to org.opensearch.index.translog
raghuvanshraj Jun 9, 2023
7c1aad7
Modified translog checksum calculation to update on every translog write
raghuvanshraj Jun 9, 2023
5afd572
Addressing PR comments
raghuvanshraj Jun 15, 2023
53ff2b6
Adding MultiStreamBlobContainer for multipart upload check
raghuvanshraj Jun 16, 2023
3004c7e
Renaming MultiStreamBlobContainer to VerifyingMultiStreamBlobContainer
raghuvanshraj Jun 22, 2023
303ab59
Removing isRemoteIntegrityEnabled method from BlobContainer
raghuvanshraj Jun 27, 2023
5e500e5
Updating docstring for VerifyingMultiStreamBlobContainer
raghuvanshraj Jun 27, 2023
c2634d0
Minor refactor
raghuvanshraj Jul 3, 2023
b6dc925
Removed blocking get in translog/segment upload flows
raghuvanshraj Jul 3, 2023
b3ff7bd
Split completion listener and rename method
raghuvanshraj Jul 4, 2023
8d32aa9
Running spotless checks
raghuvanshraj Jul 4, 2023
a979103
Added latchedListener
raghuvanshraj Jul 5, 2023
9b8d95c
Added offset in stream container and moved stream container to common…
raghuvanshraj Jul 5, 2023
74181c5
Fixed naming for MockFsBlobContainerVerifying
raghuvanshraj Jul 5, 2023
aad413e
Added overloaded uploadBlob methods in BlobStoreTransferService
raghuvanshraj Jul 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ public class InputStreamContainer {

private final InputStream inputStream;
private final long contentLength;
private final long offset;

/**
* Construct a new stream object
*
* @param inputStream The input stream that is to be encapsulated
* @param contentLength The total content length that is to be read from the stream
*/
public InputStreamContainer(InputStream inputStream, long contentLength) {
public InputStreamContainer(InputStream inputStream, long contentLength, long offset) {
this.inputStream = inputStream;
this.contentLength = contentLength;
this.offset = offset;
}

/**
Expand All @@ -44,4 +46,11 @@ public InputStream getInputStream() {
public long getContentLength() {
return contentLength;
}

/**
* @return offset of the source content.
*/
public long getOffset() {
return offset;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,7 @@ protected void putRepository(Path path) {
protected void setupRepo() {
internalCluster().startClusterManagerOnlyNode();
absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
putRepository(absolutePath);
}

@After
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore.multipart;

import org.junit.After;
import org.junit.Before;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.remotestore.multipart.mocks.MockFsRepository;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class RemoteStoreMultipartFileCorruptionIT extends OpenSearchIntegTestCase {

protected static final String REPOSITORY_NAME = "test-remore-store-repo";
private static final String INDEX_NAME = "remote-store-test-idx-1";

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockFsRepositoryPlugin.class)).collect(Collectors.toList());
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build();
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Path absolutePath = randomRepoPath().toAbsolutePath();
putRepository(absolutePath);
}

protected void putRepository(Path path) {
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME)
.setType(MockFsRepositoryPlugin.TYPE)
.setSettings(
Settings.builder()
.put("location", path)
// custom setting for MockFsRepositoryPlugin
.put(MockFsRepository.TRIGGER_DATA_INTEGRITY_FAILURE.getKey(), true)
)
);
}

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}

protected Settings remoteStoreIndexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put("index.refresh_interval", "300s")
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

private IndexResponse indexSingleDoc() {
return client().prepareIndex(INDEX_NAME)
.setId(UUIDs.randomBase64UUID())
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
.get();
}

public void testLocalFileCorruptionDuringUpload() {
internalCluster().startDataOnlyNodes(1);
createIndex(INDEX_NAME, remoteStoreIndexSettings());
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

indexSingleDoc();

client().admin()
.indices()
.prepareRefresh(INDEX_NAME)
.setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_HIDDEN_FORBID_CLOSED)
.execute()
.actionGet();

// ensuring red cluster meaning shard has failed and is unassigned
ensureRed(INDEX_NAME);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore.multipart;

import org.opensearch.common.settings.Settings;
import org.opensearch.plugins.Plugin;
import org.opensearch.remotestore.RemoteStoreIT;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;

import java.nio.file.Path;
import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class RemoteStoreMultipartIT extends RemoteStoreIT {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockFsRepositoryPlugin.class)).collect(Collectors.toList());
}

@Override
protected void putRepository(Path path) {
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME)
.setType(MockFsRepositoryPlugin.TYPE)
.setSettings(Settings.builder().put("location", path))
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore.multipart.mocks;

import org.opensearch.OpenSearchException;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.fs.FsBlobStore;

import java.io.IOException;
import java.nio.file.Path;

public class MockFsBlobStore extends FsBlobStore {

private final boolean triggerDataIntegrityFailure;

public MockFsBlobStore(int bufferSizeInBytes, Path path, boolean readonly, boolean triggerDataIntegrityFailure) throws IOException {
super(bufferSizeInBytes, path, readonly);
this.triggerDataIntegrityFailure = triggerDataIntegrityFailure;
}

@Override
public BlobContainer blobContainer(BlobPath path) {
try {
return new MockFsVerifyingBlobContainer(this, path, buildAndCreate(path), triggerDataIntegrityFailure);
} catch (IOException ex) {
throw new OpenSearchException("failed to create blob container", ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore.multipart.mocks;

import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.fs.FsBlobStore;
import org.opensearch.common.settings.Setting;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.repositories.fs.FsRepository;

public class MockFsRepository extends FsRepository {

public static Setting<Boolean> TRIGGER_DATA_INTEGRITY_FAILURE = Setting.boolSetting(
"mock_fs_repository.trigger_data_integrity_failure",
false
);

private final boolean triggerDataIntegrityFailure;

public MockFsRepository(
RepositoryMetadata metadata,
Environment environment,
NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService,
RecoverySettings recoverySettings
) {
super(metadata, environment, namedXContentRegistry, clusterService, recoverySettings);
triggerDataIntegrityFailure = TRIGGER_DATA_INTEGRITY_FAILURE.get(metadata.settings());
}

@Override
protected BlobStore createBlobStore() throws Exception {
FsBlobStore fsBlobStore = (FsBlobStore) super.createBlobStore();
return new MockFsBlobStore(fsBlobStore.bufferSizeInBytes(), fsBlobStore.path(), isReadOnly(), triggerDataIntegrityFailure);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore.multipart.mocks;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.RepositoryPlugin;
import org.opensearch.repositories.Repository;

import java.util.Collections;
import java.util.Map;

public class MockFsRepositoryPlugin extends Plugin implements RepositoryPlugin {

public static final String TYPE = "fs_multipart_repository";

@Override
public Map<String, Repository.Factory> getRepositories(
Environment env,
NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService,
RecoverySettings recoverySettings
) {
return Collections.singletonMap(
"fs_multipart_repository",
metadata -> new MockFsRepository(metadata, env, namedXContentRegistry, clusterService, recoverySettings)
);
}
}
Loading