forked from opensearch-project/OpenSearch
-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Recovery/Replication changes for Writable Warm Index #2
Draft
nisgoel-amazon
wants to merge
16
commits into
composite-directory-poc
Choose a base branch
from
replication-flow-poc-new
base: composite-directory-poc
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
c5b021b
Composite Directory POC
rayshrey 46a63d5
Refactor TransferManager interface to RemoteStoreFileTrackerAdapter
rayshrey 35f6f1e
Implement block level fetch for Composite Directory
rayshrey ec27fbe
Removed CACHE state from FileTracker
rayshrey f93655d
Fixes after latest pull
rayshrey 04bad8a
Add new setting for warm, remove store type setting, FileTracker and …
rayshrey d09b75a
Modify TransferManager - replace BlobContainer with Functional Interf…
rayshrey 90ace4a
Reuse OnDemandBlockSnapshotIndexInput instead of OnDemandBlockComposi…
rayshrey 93d1f2a
Modify constructors to avoid breaking public api contract and code re…
rayshrey 3280a6d
Add experimental annotations for newly created classes and review com…
rayshrey 4ee744d
Use ref count as a temporary measure to prevent file from eviction un…
rayshrey 3292b36
Remove method level locks
rayshrey 5013be9
Handle tmp file deletion
rayshrey 7491c62
Nit fixes
rayshrey f7ed4cc
Replication/Recovery changes for writable warm index
nisgoel-amazon 7051168
Replication/Recovery changes for writable warm index
nisgoel-amazon File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
297 changes: 297 additions & 0 deletions
297
...est/java/org/opensearch/indices/replication/WarmIndexRemoteStoreSegmentReplicationIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,297 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.indices.replication; | ||
|
||
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; | ||
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; | ||
|
||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; | ||
import java.io.IOException; | ||
import java.nio.file.Path; | ||
import java.util.Locale; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.stream.Collectors; | ||
import org.junit.After; | ||
import org.junit.Before; | ||
import org.opensearch.cluster.metadata.RepositoriesMetadata; | ||
import org.opensearch.cluster.metadata.RepositoryMetadata; | ||
import org.opensearch.cluster.node.DiscoveryNode; | ||
import org.opensearch.cluster.service.ClusterService; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.common.util.FeatureFlags; | ||
import org.opensearch.index.IndexModule; | ||
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; | ||
import org.opensearch.repositories.RepositoriesService; | ||
import org.opensearch.repositories.blobstore.BlobStoreRepository; | ||
import org.opensearch.test.OpenSearchIntegTestCase; | ||
|
||
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) | ||
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) | ||
public class WarmIndexRemoteStoreSegmentReplicationIT extends SegmentReplicationIT { | ||
|
||
protected static final String REPOSITORY_NAME = "test-remote-store-repo"; | ||
protected static final String REPOSITORY_2_NAME = "test-remote-store-repo-2"; | ||
|
||
protected Path segmentRepoPath; | ||
protected Path translogRepoPath; | ||
protected boolean clusterSettingsSuppliedByTest = false; | ||
|
||
@Before | ||
private void setup() { | ||
internalCluster().startClusterManagerOnlyNode(); | ||
} | ||
|
||
@Override | ||
public Settings indexSettings() { | ||
return Settings.builder() | ||
.put(super.indexSettings()) | ||
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), "partial") | ||
.build(); | ||
} | ||
|
||
@Override | ||
protected Settings nodeSettings(int nodeOrdinal) { | ||
if (segmentRepoPath == null || translogRepoPath == null) { | ||
segmentRepoPath = randomRepoPath().toAbsolutePath(); | ||
translogRepoPath = randomRepoPath().toAbsolutePath(); | ||
} | ||
if (clusterSettingsSuppliedByTest) { | ||
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build(); | ||
} else { | ||
return Settings.builder() | ||
.put(super.nodeSettings(nodeOrdinal)) | ||
.put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath)) | ||
//.put(RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), -1) | ||
.build(); | ||
} | ||
} | ||
|
||
@Override | ||
protected Settings featureFlagSettings() { | ||
Settings.Builder featureSettings = Settings.builder(); | ||
featureSettings.put(FeatureFlags.WRITEABLE_REMOTE_INDEX, true); | ||
|
||
return featureSettings.build(); | ||
} | ||
|
||
@Override | ||
protected boolean addMockIndexStorePlugin() { | ||
return false; | ||
} | ||
|
||
public void testPrimaryStopped_ReplicaPromoted() throws Exception { | ||
super.testPrimaryStopped_ReplicaPromoted(); | ||
} | ||
|
||
public void testRestartPrimary() throws Exception { | ||
super.testRestartPrimary(); | ||
} | ||
|
||
public void testCancelPrimaryAllocation() throws Exception { | ||
super.testCancelPrimaryAllocation(); | ||
} | ||
|
||
public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { | ||
super.testReplicationAfterPrimaryRefreshAndFlush(); | ||
} | ||
|
||
public void testIndexReopenClose() throws Exception { | ||
super.testIndexReopenClose(); | ||
} | ||
|
||
public void testScrollWithConcurrentIndexAndSearch() throws Exception { | ||
super.testScrollWithConcurrentIndexAndSearch(); | ||
} | ||
|
||
public void testMultipleShards() throws Exception { | ||
super.testMultipleShards(); | ||
} | ||
|
||
public void testReplicationAfterForceMerge() throws Exception { | ||
super.testReplicationAfterForceMerge(); | ||
} | ||
|
||
public void testReplicationAfterForceMergeOnPrimaryShardsOnly() throws Exception { | ||
super.testReplicationAfterForceMergeOnPrimaryShardsOnly(); | ||
} | ||
|
||
public void testNodeDropWithOngoingReplication() throws Exception { | ||
super.testNodeDropWithOngoingReplication(); | ||
} | ||
|
||
public void testCancellation() throws Exception { | ||
super.testCancellation(); | ||
} | ||
|
||
public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { | ||
super.testStartReplicaAfterPrimaryIndexesDocs(); | ||
} | ||
|
||
public void testDeleteOperations() throws Exception { | ||
super.testDeleteOperations(); | ||
} | ||
|
||
public void testReplicationPostDeleteAndForceMerge() throws Exception { | ||
super.testReplicationPostDeleteAndForceMerge(); | ||
} | ||
|
||
public void testUpdateOperations() throws Exception { | ||
super.testUpdateOperations(); | ||
} | ||
|
||
public void testReplicaHasDiffFilesThanPrimary() throws Exception { | ||
super.testReplicaHasDiffFilesThanPrimary(); | ||
} | ||
|
||
public void testDropPrimaryDuringReplication() throws Exception { | ||
super.testDropPrimaryDuringReplication(); | ||
} | ||
|
||
public void testPressureServiceStats() throws Exception { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you don't need all these super calls, it will run all parent tests when you inherit it. |
||
super.testPressureServiceStats(); | ||
} | ||
|
||
public void testScrollCreatedOnReplica() throws Exception { | ||
assumeFalse( | ||
"Skipping the test as its not compatible with segment replication with remote store.", | ||
warmIndexSegmentReplicationEnabled() | ||
); | ||
super.testScrollCreatedOnReplica(); | ||
} | ||
|
||
public void testScrollWithOngoingSegmentReplication() { | ||
assumeFalse( | ||
"Skipping the test as its not compatible with segment replication with remote store.", | ||
warmIndexSegmentReplicationEnabled() | ||
); | ||
} | ||
|
||
public void testPitCreatedOnReplica() throws Exception { | ||
assumeFalse( | ||
"Skipping the test as its not compatible with segment replication with remote store.", | ||
warmIndexSegmentReplicationEnabled() | ||
); | ||
super.testPitCreatedOnReplica(); | ||
} | ||
|
||
public void testPrimaryReceivesDocsDuringReplicaRecovery() throws Exception { | ||
super.testPrimaryReceivesDocsDuringReplicaRecovery(); | ||
} | ||
|
||
public void testIndexWhileRecoveringReplica() throws Exception { | ||
super.testIndexWhileRecoveringReplica(); | ||
} | ||
|
||
public void testRestartPrimary_NoReplicas() throws Exception { | ||
super.testRestartPrimary_NoReplicas(); | ||
} | ||
|
||
public void testRealtimeGetRequestsSuccessful() { | ||
super.testRealtimeGetRequestsSuccessful(); | ||
} | ||
|
||
public void testRealtimeGetRequestsUnsuccessful() { | ||
super.testRealtimeGetRequestsUnsuccessful(); | ||
} | ||
|
||
public void testRealtimeMultiGetRequestsSuccessful() { | ||
super.testRealtimeMultiGetRequestsSuccessful(); | ||
} | ||
|
||
public void testRealtimeMultiGetRequestsUnsuccessful() { | ||
super.testRealtimeMultiGetRequestsUnsuccessful(); | ||
} | ||
|
||
public void testRealtimeTermVectorRequestsSuccessful() throws IOException { | ||
super.testRealtimeTermVectorRequestsSuccessful(); | ||
} | ||
|
||
public void testRealtimeTermVectorRequestsUnSuccessful() throws IOException { | ||
super.testRealtimeTermVectorRequestsUnSuccessful(); | ||
} | ||
|
||
public void testReplicaAlreadyAtCheckpoint() throws Exception { | ||
super.testReplicaAlreadyAtCheckpoint(); | ||
} | ||
|
||
public void testCancellationDuringGetCheckpointInfo() { | ||
assumeFalse( | ||
"Skipping the test as its not compatible with segment replication with remote store.", | ||
warmIndexSegmentReplicationEnabled() | ||
); | ||
} | ||
|
||
public void testCancellationDuringGetSegments() { | ||
assumeFalse( | ||
"Skipping the test as its not compatible with segment replication with remote store.", | ||
warmIndexSegmentReplicationEnabled() | ||
); | ||
} | ||
|
||
protected boolean warmIndexSegmentReplicationEnabled() { | ||
return !Objects.equals(IndexModule.INDEX_STORE_LOCALITY_SETTING.get(indexSettings()).toString(), "partial"); | ||
} | ||
|
||
@After | ||
public void teardown() { | ||
clusterSettingsSuppliedByTest = false; | ||
assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_NAME); | ||
assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_2_NAME); | ||
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get(); | ||
clusterAdmin().prepareCleanupRepository(REPOSITORY_2_NAME).get(); | ||
} | ||
|
||
public RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String name) { | ||
Map<String, String> nodeAttributes = node.getAttributes(); | ||
String type = nodeAttributes.get(String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name)); | ||
|
||
String settingsAttributeKeyPrefix = String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, name); | ||
Map<String, String> settingsMap = node.getAttributes() | ||
.keySet() | ||
.stream() | ||
.filter(key -> key.startsWith(settingsAttributeKeyPrefix)) | ||
.collect(Collectors.toMap(key -> key.replace(settingsAttributeKeyPrefix, ""), key -> node.getAttributes().get(key))); | ||
|
||
Settings.Builder settings = Settings.builder(); | ||
settingsMap.entrySet().forEach(entry -> settings.put(entry.getKey(), entry.getValue())); | ||
settings.put(BlobStoreRepository.SYSTEM_REPOSITORY_SETTING.getKey(), true); | ||
|
||
return new RepositoryMetadata(name, type, settings.build()); | ||
} | ||
|
||
public void assertRemoteStoreRepositoryOnAllNodes(String repositoryName) { | ||
RepositoriesMetadata repositories = internalCluster().getInstance(ClusterService.class, internalCluster().getNodeNames()[0]) | ||
.state() | ||
.metadata() | ||
.custom(RepositoriesMetadata.TYPE); | ||
RepositoryMetadata actualRepository = repositories.repository(repositoryName); | ||
|
||
final RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class); | ||
final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repositoryName); | ||
|
||
for (String nodeName : internalCluster().getNodeNames()) { | ||
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nodeName); | ||
DiscoveryNode node = clusterService.localNode(); | ||
RepositoryMetadata expectedRepository = buildRepositoryMetadata(node, repositoryName); | ||
|
||
// Validated that all the restricted settings are entact on all the nodes. | ||
repository.getRestrictedSystemRepositorySettings() | ||
.stream() | ||
.forEach( | ||
setting -> assertEquals( | ||
String.format(Locale.ROOT, "Restricted Settings mismatch [%s]", setting.getKey()), | ||
setting.get(actualRepository.settings()), | ||
setting.get(expectedRepository.settings()) | ||
) | ||
); | ||
} | ||
} | ||
|
||
} |
89 changes: 89 additions & 0 deletions
89
server/src/internalClusterTest/java/org/opensearch/remotestore/CompositeDirectoryIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.remotestore; | ||
|
||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; | ||
|
||
import org.apache.lucene.store.Directory; | ||
import org.apache.lucene.store.FilterDirectory; | ||
import org.opensearch.action.admin.indices.get.GetIndexRequest; | ||
import org.opensearch.action.admin.indices.get.GetIndexResponse; | ||
import org.opensearch.action.search.SearchResponse; | ||
import org.opensearch.cluster.metadata.IndexMetadata; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.common.util.FeatureFlags; | ||
import org.opensearch.index.IndexModule; | ||
import org.opensearch.index.IndexService; | ||
import org.opensearch.index.query.QueryBuilders; | ||
import org.opensearch.index.shard.IndexShard; | ||
import org.opensearch.index.store.CompositeDirectory; | ||
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; | ||
import org.opensearch.indices.IndicesService; | ||
import org.opensearch.test.OpenSearchIntegTestCase; | ||
|
||
import java.util.Map; | ||
|
||
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; | ||
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; | ||
|
||
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) | ||
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false) | ||
// Uncomment the below line to enable trace level logs for this test for better debugging | ||
// @TestLogging(reason = "Getting trace logs from composite directory package", value = "org.opensearch.index.store:TRACE") | ||
public class CompositeDirectoryIT extends RemoteStoreBaseIntegTestCase { | ||
|
||
/* | ||
Disabling MockFSIndexStore plugin as the MockFSDirectoryFactory wraps the FSDirectory over a OpenSearchMockDirectoryWrapper which extends FilterDirectory (whereas FSDirectory extends BaseDirectory) | ||
As a result of this wrapping the local directory of Composite Directory does not satisfy the assertion that local directory must be of type FSDirectory | ||
*/ | ||
@Override | ||
protected boolean addMockIndexStorePlugin() { | ||
return false; | ||
} | ||
|
||
@Override | ||
protected Settings featureFlagSettings() { | ||
Settings.Builder featureSettings = Settings.builder(); | ||
featureSettings.put(FeatureFlags.WRITEABLE_REMOTE_INDEX, true); | ||
|
||
return featureSettings.build(); | ||
} | ||
|
||
public void testCompositeDirectory() throws Exception { | ||
Settings settings = Settings.builder() | ||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) | ||
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), "partial") | ||
.build(); | ||
assertAcked(client().admin().indices().prepareCreate("test-idx-1").setSettings(settings).get()); | ||
|
||
// Check if the Directory initialized for the IndexShard is of Composite Directory type | ||
IndexService indexService = internalCluster().getDataNodeInstance(IndicesService.class).indexService(resolveIndex("test-idx-1")); | ||
IndexShard shard = indexService.getShardOrNull(0); | ||
Directory directory = (((FilterDirectory) (((FilterDirectory) (shard.store().directory())).getDelegate())).getDelegate()); | ||
assertTrue(directory instanceof CompositeDirectory); | ||
|
||
// Verify from the cluster settings if the data locality is partial | ||
GetIndexResponse getIndexResponse = client().admin() | ||
.indices() | ||
.getIndex(new GetIndexRequest().indices("test-idx-1").includeDefaults(true)) | ||
.get(); | ||
Settings indexSettings = getIndexResponse.settings().get("test-idx-1"); | ||
assertEquals("partial", indexSettings.get("index.store.data_locality")); | ||
|
||
// Index data and ensure cluster does not turn red while indexing | ||
Map<String, Long> stats = indexData(10, false, "test-idx-1"); | ||
refresh("test-idx-1"); | ||
ensureGreen("test-idx-1"); | ||
|
||
// Search and verify that the total docs indexed match the search hits | ||
SearchResponse searchResponse3 = client().prepareSearch("test-idx-1").setQuery(QueryBuilders.matchAllQuery()).get(); | ||
assertHitCount(searchResponse3, stats.get(TOTAL_OPERATIONS)); | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would prefer we add parameterization to SegmentReplicationIT or the remote store version that randomly enables writeable warm here vs duplicating the suite.