Skip to content

Commit

Permalink
Added transport action for bulk async shard fetch for primary shards (#…
Browse files Browse the repository at this point in the history
…8218)

* Added transport action for bulk async shard fetch for primary shards

Signed-off-by: sudarshan baliga <[email protected]>
Co-authored-by: Shivansh Arora <[email protected]>
  • Loading branch information
sudarshan-baliga and shiv0408 authored Feb 2, 2024
1 parent 90a815e commit 8cb4bb4
Show file tree
Hide file tree
Showing 6 changed files with 708 additions and 70 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway;

import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.indices.store.ShardAttributes;

import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import static org.opensearch.test.OpenSearchIntegTestCase.client;
import static org.opensearch.test.OpenSearchIntegTestCase.internalCluster;
import static org.opensearch.test.OpenSearchIntegTestCase.resolveIndex;

public class GatewayRecoveryTestUtils {

public static DiscoveryNode[] getDiscoveryNodes() throws ExecutionException, InterruptedException {
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.local(false);
clusterStateRequest.clear().nodes(true).routingTable(true).indices("*");
ClusterStateResponse clusterStateResponse = client().admin().cluster().state(clusterStateRequest).get();
final List<DiscoveryNode> nodes = new LinkedList<>(clusterStateResponse.getState().nodes().getDataNodes().values());
DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()];
nodes.toArray(disNodesArr);
return disNodesArr;
}

public static Map<ShardId, ShardAttributes> prepareRequestMap(String[] indices, int primaryShardCount) {
Map<ShardId, ShardAttributes> shardIdShardAttributesMap = new HashMap<>();
for (String indexName : indices) {
final Index index = resolveIndex(indexName);
final String customDataPath = IndexMetadata.INDEX_DATA_PATH_SETTING.get(
client().admin().indices().prepareGetSettings(indexName).get().getIndexToSettings().get(indexName)
);
for (int shardIdNum = 0; shardIdNum < primaryShardCount; shardIdNum++) {
final ShardId shardId = new ShardId(index, shardIdNum);
shardIdShardAttributesMap.put(shardId, new ShardAttributes(shardId, customDataPath));
}
}
return shardIdShardAttributesMap;
}

public static void corruptShard(String nodeName, ShardId shardId) throws IOException, InterruptedException {
for (Path path : internalCluster().getInstance(NodeEnvironment.class, nodeName).availableShardPaths(shardId)) {
final Path indexPath = path.resolve(ShardPath.INDEX_FOLDER_NAME);
if (Files.exists(indexPath)) { // multi data path might only have one path in use
try (DirectoryStream<Path> stream = Files.newDirectoryStream(indexPath)) {
for (Path item : stream) {
if (item.getFileName().toString().startsWith("segments_")) {
Files.delete(item);
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.admin.indices.stats.IndexStats;
import org.opensearch.action.admin.indices.stats.ShardStats;
Expand All @@ -60,6 +62,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.store.ShardAttributes;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.InternalTestCluster.RestartCallback;
Expand All @@ -85,6 +88,9 @@
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.gateway.GatewayRecoveryTestUtils.corruptShard;
import static org.opensearch.gateway.GatewayRecoveryTestUtils.getDiscoveryNodes;
import static org.opensearch.gateway.GatewayRecoveryTestUtils.prepareRequestMap;
import static org.opensearch.gateway.GatewayService.RECOVER_AFTER_NODES_SETTING;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.index.query.QueryBuilders.termQuery;
Expand Down Expand Up @@ -734,4 +740,97 @@ public void testMessyElectionsStillMakeClusterGoGreen() throws Exception {
internalCluster().fullRestart();
ensureGreen("test");
}

public void testSingleShardFetchUsingBatchAction() {
String indexName = "test";
int numOfShards = 1;
prepareIndex(indexName, numOfShards);
Map<ShardId, ShardAttributes> shardIdShardAttributesMap = prepareRequestMap(new String[] { indexName }, numOfShards);

ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName).get();

TransportNodesListGatewayStartedShardsBatch.NodesGatewayStartedShardsBatch response;
response = ActionTestUtils.executeBlocking(
internalCluster().getInstance(TransportNodesListGatewayStartedShardsBatch.class),
new TransportNodesListGatewayStartedShardsBatch.Request(searchShardsResponse.getNodes(), shardIdShardAttributesMap)
);
final Index index = resolveIndex(indexName);
final ShardId shardId = new ShardId(index, 0);
TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeGatewayStartedShards = response.getNodesMap()
.get(searchShardsResponse.getNodes()[0].getId())
.getNodeGatewayStartedShardsBatch()
.get(shardId);
assertNodeGatewayStartedShardsHappyCase(nodeGatewayStartedShards);
}

public void testShardFetchMultiNodeMultiIndexesUsingBatchAction() {
// start node
internalCluster().startNode();
String indexName1 = "test1";
String indexName2 = "test2";
int numShards = internalCluster().numDataNodes();
// assign one primary shard each to the data nodes
prepareIndex(indexName1, numShards);
prepareIndex(indexName2, numShards);
Map<ShardId, ShardAttributes> shardIdShardAttributesMap = prepareRequestMap(new String[] { indexName1, indexName2 }, numShards);
ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName1, indexName2).get();
assertEquals(internalCluster().numDataNodes(), searchShardsResponse.getNodes().length);
TransportNodesListGatewayStartedShardsBatch.NodesGatewayStartedShardsBatch response;
response = ActionTestUtils.executeBlocking(
internalCluster().getInstance(TransportNodesListGatewayStartedShardsBatch.class),
new TransportNodesListGatewayStartedShardsBatch.Request(searchShardsResponse.getNodes(), shardIdShardAttributesMap)
);
for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) {
ShardId shardId = clusterSearchShardsGroup.getShardId();
assertEquals(1, clusterSearchShardsGroup.getShards().length);
String nodeId = clusterSearchShardsGroup.getShards()[0].currentNodeId();
TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeGatewayStartedShards = response.getNodesMap()
.get(nodeId)
.getNodeGatewayStartedShardsBatch()
.get(shardId);
assertNodeGatewayStartedShardsHappyCase(nodeGatewayStartedShards);
}
}

public void testShardFetchCorruptedShardsUsingBatchAction() throws Exception {
String indexName = "test";
int numOfShards = 1;
prepareIndex(indexName, numOfShards);
Map<ShardId, ShardAttributes> shardIdShardAttributesMap = prepareRequestMap(new String[] { indexName }, numOfShards);
ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName).get();
final Index index = resolveIndex(indexName);
final ShardId shardId = new ShardId(index, 0);
corruptShard(searchShardsResponse.getNodes()[0].getName(), shardId);
TransportNodesListGatewayStartedShardsBatch.NodesGatewayStartedShardsBatch response;
internalCluster().restartNode(searchShardsResponse.getNodes()[0].getName());
response = ActionTestUtils.executeBlocking(
internalCluster().getInstance(TransportNodesListGatewayStartedShardsBatch.class),
new TransportNodesListGatewayStartedShardsBatch.Request(getDiscoveryNodes(), shardIdShardAttributesMap)
);
DiscoveryNode[] discoveryNodes = getDiscoveryNodes();
TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeGatewayStartedShards = response.getNodesMap()
.get(discoveryNodes[0].getId())
.getNodeGatewayStartedShardsBatch()
.get(shardId);
assertNotNull(nodeGatewayStartedShards.storeException());
assertNotNull(nodeGatewayStartedShards.allocationId());
assertTrue(nodeGatewayStartedShards.primary());
}

private void assertNodeGatewayStartedShardsHappyCase(
TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeGatewayStartedShards
) {
assertNull(nodeGatewayStartedShards.storeException());
assertNotNull(nodeGatewayStartedShards.allocationId());
assertTrue(nodeGatewayStartedShards.primary());
}

private void prepareIndex(String indexName, int numberOfPrimaryShards) {
createIndex(
indexName,
Settings.builder().put(SETTING_NUMBER_OF_SHARDS, numberOfPrimaryShards).put(SETTING_NUMBER_OF_REPLICAS, 0).build()
);
index(indexName, "type", "1", Collections.emptyMap());
flush(indexName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ protected void configure() {
bind(GatewayService.class).asEagerSingleton();
bind(TransportNodesListGatewayMetaState.class).asEagerSingleton();
bind(TransportNodesListGatewayStartedShards.class).asEagerSingleton();
bind(TransportNodesListGatewayStartedShardsBatch.class).asEagerSingleton();
bind(LocalAllocateDangledIndices.class).asEagerSingleton();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.shard.ShardStateMetadata;
import org.opensearch.index.store.Store;
import org.opensearch.indices.IndicesService;

import java.io.IOException;

/**
* This class has the common code used in {@link TransportNodesListGatewayStartedShards} and
* {@link TransportNodesListGatewayStartedShardsBatch} to get the shard info on the local node.
* <p>
* This class should not be used to add more functions and will be removed when the
* {@link TransportNodesListGatewayStartedShards} will be deprecated and all the code will be moved to
* {@link TransportNodesListGatewayStartedShardsBatch}
*
* @opensearch.internal
*/
public class TransportNodesGatewayStartedShardHelper {
public static TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard getShardInfoOnLocalNode(
Logger logger,
final ShardId shardId,
NamedXContentRegistry namedXContentRegistry,
NodeEnvironment nodeEnv,
IndicesService indicesService,
String shardDataPathInRequest,
Settings settings,
ClusterService clusterService
) throws IOException {
logger.trace("{} loading local shard state info", shardId);
ShardStateMetadata shardStateMetadata = ShardStateMetadata.FORMAT.loadLatestState(
logger,
namedXContentRegistry,
nodeEnv.availableShardPaths(shardId)
);
if (shardStateMetadata != null) {
if (indicesService.getShardOrNull(shardId) == null
&& shardStateMetadata.indexDataLocation == ShardStateMetadata.IndexDataLocation.LOCAL) {
final String customDataPath;
if (shardDataPathInRequest != null) {
customDataPath = shardDataPathInRequest;
} else {
// TODO: Fallback for BWC with older OpenSearch versions.
// Remove once request.getCustomDataPath() always returns non-null
final IndexMetadata metadata = clusterService.state().metadata().index(shardId.getIndex());
if (metadata != null) {
customDataPath = new IndexSettings(metadata, settings).customDataPath();
} else {
logger.trace("{} node doesn't have meta data for the requests index", shardId);
throw new OpenSearchException("node doesn't have meta data for index " + shardId.getIndex());
}
}
// we don't have an open shard on the store, validate the files on disk are openable
ShardPath shardPath = null;
try {
shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath);
if (shardPath == null) {
throw new IllegalStateException(shardId + " no shard path found");
}
Store.tryOpenIndex(shardPath.resolveIndex(), shardId, nodeEnv::shardLock, logger);
} catch (Exception exception) {
final ShardPath finalShardPath = shardPath;
logger.trace(
() -> new ParameterizedMessage(
"{} can't open index for shard [{}] in path [{}]",
shardId,
shardStateMetadata,
(finalShardPath != null) ? finalShardPath.resolveIndex() : ""
),
exception
);
String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null;
return new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard(
allocationId,
shardStateMetadata.primary,
null,
exception
);
}
}

logger.debug("{} shard state info found: [{}]", shardId, shardStateMetadata);
String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null;
final IndexShard shard = indicesService.getShardOrNull(shardId);
return new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard(
allocationId,
shardStateMetadata.primary,
shard != null ? shard.getLatestReplicationCheckpoint() : null
);
}
logger.trace("{} no local shard info found", shardId);
return new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard(null, false, null);
}
}
Loading

0 comments on commit 8cb4bb4

Please sign in to comment.