Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recovery/Replication changes for Writable Warm Index #2

Draft
wants to merge 16 commits into
base: composite-directory-poc
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 2.x]
### Added
- Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478))
- Add composite directory implementation and integrate it with FileCache ([12782](https://github.com/opensearch-project/OpenSearch/pull/12782))

### Dependencies
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.junit.After;
import org.junit.Before;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchIntegTestCase;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
public class WarmIndexRemoteStoreSegmentReplicationIT extends SegmentReplicationIT {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would prefer we add parameterization to SegmentReplicationIT or the remote store version that randomly enables writeable warm here vs duplicating the suite.


protected static final String REPOSITORY_NAME = "test-remote-store-repo";
protected static final String REPOSITORY_2_NAME = "test-remote-store-repo-2";

protected Path segmentRepoPath;
protected Path translogRepoPath;
protected boolean clusterSettingsSuppliedByTest = false;

@Before
private void setup() {
internalCluster().startClusterManagerOnlyNode();
}

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), "partial")
.build();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
if (segmentRepoPath == null || translogRepoPath == null) {
segmentRepoPath = randomRepoPath().toAbsolutePath();
translogRepoPath = randomRepoPath().toAbsolutePath();
}
if (clusterSettingsSuppliedByTest) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build();
} else {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath))
//.put(RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), -1)
.build();
}
}

@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
featureSettings.put(FeatureFlags.WRITEABLE_REMOTE_INDEX, true);

return featureSettings.build();
}

@Override
protected boolean addMockIndexStorePlugin() {
return false;
}

public void testPrimaryStopped_ReplicaPromoted() throws Exception {
super.testPrimaryStopped_ReplicaPromoted();
}

public void testRestartPrimary() throws Exception {
super.testRestartPrimary();
}

public void testCancelPrimaryAllocation() throws Exception {
super.testCancelPrimaryAllocation();
}

public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
super.testReplicationAfterPrimaryRefreshAndFlush();
}

public void testIndexReopenClose() throws Exception {
super.testIndexReopenClose();
}

public void testScrollWithConcurrentIndexAndSearch() throws Exception {
super.testScrollWithConcurrentIndexAndSearch();
}

public void testMultipleShards() throws Exception {
super.testMultipleShards();
}

public void testReplicationAfterForceMerge() throws Exception {
super.testReplicationAfterForceMerge();
}

public void testReplicationAfterForceMergeOnPrimaryShardsOnly() throws Exception {
super.testReplicationAfterForceMergeOnPrimaryShardsOnly();
}

public void testNodeDropWithOngoingReplication() throws Exception {
super.testNodeDropWithOngoingReplication();
}

public void testCancellation() throws Exception {
super.testCancellation();
}

public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
super.testStartReplicaAfterPrimaryIndexesDocs();
}

public void testDeleteOperations() throws Exception {
super.testDeleteOperations();
}

public void testReplicationPostDeleteAndForceMerge() throws Exception {
super.testReplicationPostDeleteAndForceMerge();
}

public void testUpdateOperations() throws Exception {
super.testUpdateOperations();
}

public void testReplicaHasDiffFilesThanPrimary() throws Exception {
super.testReplicaHasDiffFilesThanPrimary();
}

public void testDropPrimaryDuringReplication() throws Exception {
super.testDropPrimaryDuringReplication();
}

public void testPressureServiceStats() throws Exception {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need all these super calls, it will run all parent tests when you inherit it.

super.testPressureServiceStats();
}

public void testScrollCreatedOnReplica() throws Exception {
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store.",
warmIndexSegmentReplicationEnabled()
);
super.testScrollCreatedOnReplica();
}

public void testScrollWithOngoingSegmentReplication() {
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store.",
warmIndexSegmentReplicationEnabled()
);
}

public void testPitCreatedOnReplica() throws Exception {
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store.",
warmIndexSegmentReplicationEnabled()
);
super.testPitCreatedOnReplica();
}

public void testPrimaryReceivesDocsDuringReplicaRecovery() throws Exception {
super.testPrimaryReceivesDocsDuringReplicaRecovery();
}

public void testIndexWhileRecoveringReplica() throws Exception {
super.testIndexWhileRecoveringReplica();
}

public void testRestartPrimary_NoReplicas() throws Exception {
super.testRestartPrimary_NoReplicas();
}

public void testRealtimeGetRequestsSuccessful() {
super.testRealtimeGetRequestsSuccessful();
}

public void testRealtimeGetRequestsUnsuccessful() {
super.testRealtimeGetRequestsUnsuccessful();
}

public void testRealtimeMultiGetRequestsSuccessful() {
super.testRealtimeMultiGetRequestsSuccessful();
}

public void testRealtimeMultiGetRequestsUnsuccessful() {
super.testRealtimeMultiGetRequestsUnsuccessful();
}

public void testRealtimeTermVectorRequestsSuccessful() throws IOException {
super.testRealtimeTermVectorRequestsSuccessful();
}

public void testRealtimeTermVectorRequestsUnSuccessful() throws IOException {
super.testRealtimeTermVectorRequestsUnSuccessful();
}

public void testReplicaAlreadyAtCheckpoint() throws Exception {
super.testReplicaAlreadyAtCheckpoint();
}

public void testCancellationDuringGetCheckpointInfo() {
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store.",
warmIndexSegmentReplicationEnabled()
);
}

public void testCancellationDuringGetSegments() {
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store.",
warmIndexSegmentReplicationEnabled()
);
}

protected boolean warmIndexSegmentReplicationEnabled() {
return !Objects.equals(IndexModule.INDEX_STORE_LOCALITY_SETTING.get(indexSettings()).toString(), "partial");
}

@After
public void teardown() {
clusterSettingsSuppliedByTest = false;
assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_NAME);
assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_2_NAME);
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
clusterAdmin().prepareCleanupRepository(REPOSITORY_2_NAME).get();
}

public RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String name) {
Map<String, String> nodeAttributes = node.getAttributes();
String type = nodeAttributes.get(String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name));

String settingsAttributeKeyPrefix = String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, name);
Map<String, String> settingsMap = node.getAttributes()
.keySet()
.stream()
.filter(key -> key.startsWith(settingsAttributeKeyPrefix))
.collect(Collectors.toMap(key -> key.replace(settingsAttributeKeyPrefix, ""), key -> node.getAttributes().get(key)));

Settings.Builder settings = Settings.builder();
settingsMap.entrySet().forEach(entry -> settings.put(entry.getKey(), entry.getValue()));
settings.put(BlobStoreRepository.SYSTEM_REPOSITORY_SETTING.getKey(), true);

return new RepositoryMetadata(name, type, settings.build());
}

public void assertRemoteStoreRepositoryOnAllNodes(String repositoryName) {
RepositoriesMetadata repositories = internalCluster().getInstance(ClusterService.class, internalCluster().getNodeNames()[0])
.state()
.metadata()
.custom(RepositoriesMetadata.TYPE);
RepositoryMetadata actualRepository = repositories.repository(repositoryName);

final RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repositoryName);

for (String nodeName : internalCluster().getNodeNames()) {
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nodeName);
DiscoveryNode node = clusterService.localNode();
RepositoryMetadata expectedRepository = buildRepositoryMetadata(node, repositoryName);

// Validated that all the restricted settings are entact on all the nodes.
repository.getRestrictedSystemRepositorySettings()
.stream()
.forEach(
setting -> assertEquals(
String.format(Locale.ROOT, "Restricted Settings mismatch [%s]", setting.getKey()),
setting.get(actualRepository.settings()),
setting.get(expectedRepository.settings())
)
);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.CompositeDirectory;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.indices.IndicesService;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Map;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false)
// Uncomment the below line to enable trace level logs for this test for better debugging
// @TestLogging(reason = "Getting trace logs from composite directory package", value = "org.opensearch.index.store:TRACE")
public class CompositeDirectoryIT extends RemoteStoreBaseIntegTestCase {

/*
Disabling MockFSIndexStore plugin as the MockFSDirectoryFactory wraps the FSDirectory over a OpenSearchMockDirectoryWrapper which extends FilterDirectory (whereas FSDirectory extends BaseDirectory)
As a result of this wrapping the local directory of Composite Directory does not satisfy the assertion that local directory must be of type FSDirectory
*/
@Override
protected boolean addMockIndexStorePlugin() {
return false;
}

@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
featureSettings.put(FeatureFlags.WRITEABLE_REMOTE_INDEX, true);

return featureSettings.build();
}

public void testCompositeDirectory() throws Exception {
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), "partial")
.build();
assertAcked(client().admin().indices().prepareCreate("test-idx-1").setSettings(settings).get());

// Check if the Directory initialized for the IndexShard is of Composite Directory type
IndexService indexService = internalCluster().getDataNodeInstance(IndicesService.class).indexService(resolveIndex("test-idx-1"));
IndexShard shard = indexService.getShardOrNull(0);
Directory directory = (((FilterDirectory) (((FilterDirectory) (shard.store().directory())).getDelegate())).getDelegate());
assertTrue(directory instanceof CompositeDirectory);

// Verify from the cluster settings if the data locality is partial
GetIndexResponse getIndexResponse = client().admin()
.indices()
.getIndex(new GetIndexRequest().indices("test-idx-1").includeDefaults(true))
.get();
Settings indexSettings = getIndexResponse.settings().get("test-idx-1");
assertEquals("partial", indexSettings.get("index.store.data_locality"));

// Index data and ensure cluster does not turn red while indexing
Map<String, Long> stats = indexData(10, false, "test-idx-1");
refresh("test-idx-1");
ensureGreen("test-idx-1");

// Search and verify that the total docs indexed match the search hits
SearchResponse searchResponse3 = client().prepareSearch("test-idx-1").setQuery(QueryBuilders.matchAllQuery()).get();
assertHitCount(searchResponse3, stats.get(TOTAL_OPERATIONS));
}
}
Loading
Loading