Skip to content

Commit

Permalink
Merge branch 'main' into draft_changes_15954
Browse files Browse the repository at this point in the history
Signed-off-by: Sumit Bansal <[email protected]>
  • Loading branch information
sumitasr authored Sep 30, 2024
2 parents 3e7bca2 + 8ddb3ee commit 23b9c64
Show file tree
Hide file tree
Showing 36 changed files with 1,855 additions and 89 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add successfulSearchShardIndices in searchRequestContext ([#15967](https://github.com/opensearch-project/OpenSearch/pull/15967))
- Remove identity-related feature flagged code from the RestController ([#15430](https://github.com/opensearch-project/OpenSearch/pull/15430))
- Add support for msearch API to pass search pipeline name - ([#15923](https://github.com/opensearch-project/OpenSearch/pull/15923))
- Add _list/indices API as paginated alternate to _cat/indices ([#14718](https://github.com/opensearch-project/OpenSearch/pull/14718))
- Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976))
- Add changes to block calls in cat shards, indices and segments based on dynamic limit settings ([#15986](https://github.com/opensearch-project/OpenSearch/pull/15986))

### Dependencies
Expand Down
8 changes: 8 additions & 0 deletions qa/remote-clusters/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,11 @@ services:
- "9600"
volumes:
- ./haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg:ro
depends_on:
- opensearch-2
healthcheck:
test: ["CMD", "timeout", "1", "bash", "-c", "cat < /dev/null > /dev/tcp/localhost/9600"]
interval: 2s
timeout: 1s
retries: 5
start_period: 15s
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.remotestore;

import org.opensearch.action.support.IndicesOptions;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
Expand All @@ -32,6 +33,7 @@
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStorePinnedTimestampsGarbageCollectionIT extends RemoteStoreBaseIntegTestCase {
Expand Down Expand Up @@ -288,6 +290,79 @@ public void testLiveIndexWithPinnedTimestamps() throws Exception {
});
}

public void testLiveIndexWithPinnedTimestampsMultiplePrimaryTerms() throws Exception {
prepareCluster(1, 2, Settings.EMPTY);
Settings indexSettings = Settings.builder()
.put(remoteStoreIndexSettings(1, 1))
.put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 3)
.build();
createIndex(INDEX_NAME, indexSettings);
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(INDEX_NAME)
);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));

int numDocs = randomIntBetween(5, 10);
for (int i = 0; i < numDocs; i++) {
keepPinnedTimestampSchedulerUpdated();
indexSingleDoc(INDEX_NAME, true);
if (i == 2) {
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueMinutes(1));
remoteStorePinnedTimestampService.pinTimestamp(System.currentTimeMillis(), "xyz", noOpActionListener);
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
}
}

ingestDocs();

internalCluster().restartNode(primaryNodeName(INDEX_NAME));
ensureGreen(INDEX_NAME);

ingestDocs();

String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
String shardDataPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
TRANSLOG,
DATA,
translogPathFixedPrefix
).buildAsString();
Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1");

assertBusy(() -> {
List<Path> dataFiles = Files.list(translogDataPath).collect(Collectors.toList());
assertFalse(dataFiles.isEmpty());
});
}

private void ingestDocs() {
int numDocs = randomIntBetween(15, 20);
for (int i = 0; i < numDocs; i++) {
indexSingleDoc(INDEX_NAME, false);
}

assertNoFailures(client().admin().indices().prepareRefresh(INDEX_NAME).setIndicesOptions(IndicesOptions.lenientExpandOpen()).get());
flushAndRefresh(INDEX_NAME);

int numDocsPostFailover = randomIntBetween(15, 20);
for (int i = 0; i < numDocsPostFailover; i++) {
indexSingleDoc(INDEX_NAME, false);
}

flushAndRefresh(INDEX_NAME);
assertNoFailures(client().admin().indices().prepareRefresh(INDEX_NAME).setIndicesOptions(IndicesOptions.lenientExpandOpen()).get());
}

public void testIndexDeletionNoPinnedTimestamps() throws Exception {
prepareCluster(1, 1, Settings.EMPTY);
Settings indexSettings = Settings.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -312,6 +313,107 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2MultipleSnapshots(
// translogPostDeletionOfSnapshot1.size()), 60, TimeUnit.SECONDS);
}

public void testRemoteStoreCleanupMultiplePrimaryOnSnapshotDeletion() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();
Settings settings = remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath);
settings = Settings.builder()
.put(settings)
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.FIXED.toString())
.build();
String clusterManagerName = internalCluster().startClusterManagerOnlyNode(settings);
internalCluster().startDataOnlyNodes(3, settings);
final Client clusterManagerClient = internalCluster().clusterManagerClient();
ensureStableCluster(4);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
clusterManagerName
);
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);

final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowV2(snapshotRepoPath));

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = Settings.builder()
.put(getRemoteStoreBackedIndexSettings())
.put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 2)
.build();
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
ensureGreen(remoteStoreEnabledIndexName);

// Create 2 snapshots for primary term 1
keepPinnedTimestampSchedulerUpdated();
indexRandomDocs(remoteStoreEnabledIndexName, 5);
createSnapshot(snapshotRepoName, "snap1");
keepPinnedTimestampSchedulerUpdated();
indexRandomDocs(remoteStoreEnabledIndexName, 5);
createSnapshot(snapshotRepoName, "snap2");

// Restart current primary to change the primary term
internalCluster().restartNode(primaryNodeName(remoteStoreEnabledIndexName));
ensureGreen(remoteStoreEnabledIndexName);

// Create 2 snapshots for primary term 2
keepPinnedTimestampSchedulerUpdated();
indexRandomDocs(remoteStoreEnabledIndexName, 5);
createSnapshot(snapshotRepoName, "snap3");
keepPinnedTimestampSchedulerUpdated();
indexRandomDocs(remoteStoreEnabledIndexName, 5);
createSnapshot(snapshotRepoName, "snap4");

String indexUUID = client().admin()
.indices()
.prepareGetSettings(remoteStoreEnabledIndexName)
.get()
.getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_INDEX_UUID);

Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID);
Path shardPath = Path.of(String.valueOf(indexPath), "0");
Path translogPath = Path.of(String.valueOf(shardPath), "translog", "data", "1");

// Deleting snap1 will still keep files in primary term 1 due to snap2
deleteSnapshot(clusterManagerClient, snapshotRepoName, "snap1");
assertTrue(RemoteStoreBaseIntegTestCase.getFileCount(translogPath) > 0);

// Deleting snap2 will not remove primary term 1 as we need to trigger trimUnreferencedReaders once
deleteSnapshot(clusterManagerClient, snapshotRepoName, "snap2");
assertTrue(RemoteStoreBaseIntegTestCase.getFileCount(translogPath) > 0);

// Index a doc to trigger trimUnreferencedReaders
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
keepPinnedTimestampSchedulerUpdated();
indexRandomDocs(remoteStoreEnabledIndexName, 5);

assertBusy(() -> assertFalse(Files.exists(translogPath)), 30, TimeUnit.SECONDS);
}

private void createSnapshot(String repoName, String snapshotName) {
CreateSnapshotResponse createSnapshotResponse = client().admin()
.cluster()
.prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true)
.get();
SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo();

assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName));
}

private void deleteSnapshot(Client clusterManagerClient, String repoName, String snapshotName) {
AcknowledgedResponse deleteSnapshotResponse = clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(repoName, snapshotName)
.get();
assertAcked(deleteSnapshotResponse);
}

private Settings snapshotV2Settings(Path remoteStoreRepoPath) {
Settings settings = Settings.builder()
.put(remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath))
Expand Down
14 changes: 13 additions & 1 deletion server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,9 @@
import org.opensearch.rest.action.ingest.RestGetPipelineAction;
import org.opensearch.rest.action.ingest.RestPutPipelineAction;
import org.opensearch.rest.action.ingest.RestSimulatePipelineAction;
import org.opensearch.rest.action.list.AbstractListAction;
import org.opensearch.rest.action.list.RestIndicesListAction;
import org.opensearch.rest.action.list.RestListAction;
import org.opensearch.rest.action.search.RestClearScrollAction;
import org.opensearch.rest.action.search.RestCountAction;
import org.opensearch.rest.action.search.RestCreatePitAction;
Expand Down Expand Up @@ -805,9 +808,14 @@ private ActionFilters setupActionFilters(List<ActionPlugin> actionPlugins) {

public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
List<AbstractCatAction> catActions = new ArrayList<>();
List<AbstractListAction> listActions = new ArrayList<>();
Consumer<RestHandler> registerHandler = handler -> {
if (handler instanceof AbstractCatAction) {
catActions.add((AbstractCatAction) handler);
if (handler instanceof AbstractListAction && ((AbstractListAction) handler).isActionPaginated()) {
listActions.add((AbstractListAction) handler);
} else {
catActions.add((AbstractCatAction) handler);
}
}
restController.registerHandler(handler);
};
Expand Down Expand Up @@ -983,6 +991,9 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
}
registerHandler.accept(new RestTemplatesAction());

// LIST API
registerHandler.accept(new RestIndicesListAction());

// Point in time API
registerHandler.accept(new RestCreatePitAction());
registerHandler.accept(new RestDeletePitAction());
Expand Down Expand Up @@ -1014,6 +1025,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
}
}
registerHandler.accept(new RestCatAction(catActions));
registerHandler.accept(new RestListAction(listActions));
registerHandler.accept(new RestDecommissionAction());
registerHandler.accept(new RestGetDecommissionStateAction());
registerHandler.accept(new RestRemoteStoreStatsAction());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeReadAction;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
Expand Down Expand Up @@ -88,6 +89,7 @@ public class TransportIndicesShardStoresAction extends TransportClusterManagerNo
private static final Logger logger = LogManager.getLogger(TransportIndicesShardStoresAction.class);

private final TransportNodesListGatewayStartedShards listShardStoresInfo;
private final ClusterManagerMetrics clusterManagerMetrics;

@Inject
public TransportIndicesShardStoresAction(
Expand All @@ -96,7 +98,8 @@ public TransportIndicesShardStoresAction(
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
TransportNodesListGatewayStartedShards listShardStoresInfo
TransportNodesListGatewayStartedShards listShardStoresInfo,
ClusterManagerMetrics clusterManagerMetrics
) {
super(
IndicesShardStoresAction.NAME,
Expand All @@ -109,6 +112,7 @@ public TransportIndicesShardStoresAction(
true
);
this.listShardStoresInfo = listShardStoresInfo;
this.clusterManagerMetrics = clusterManagerMetrics;
}

@Override
Expand Down Expand Up @@ -154,7 +158,7 @@ protected void clusterManagerOperation(
// we could fetch all shard store info from every node once (nNodes requests)
// we have to implement a TransportNodesAction instead of using TransportNodesListGatewayStartedShards
// for fetching shard stores info, that operates on a list of shards instead of a single shard
new AsyncShardStoresInfoFetches(state.nodes(), routingNodes, shardsToFetch, listener).start();
new AsyncShardStoresInfoFetches(state.nodes(), routingNodes, shardsToFetch, listener, clusterManagerMetrics).start();
}

@Override
Expand All @@ -175,27 +179,37 @@ private class AsyncShardStoresInfoFetches {
private final ActionListener<IndicesShardStoresResponse> listener;
private CountDown expectedOps;
private final Queue<InternalAsyncFetch.Response> fetchResponses;
private final ClusterManagerMetrics clusterManagerMetrics;

AsyncShardStoresInfoFetches(
DiscoveryNodes nodes,
RoutingNodes routingNodes,
Set<Tuple<ShardId, String>> shards,
ActionListener<IndicesShardStoresResponse> listener
ActionListener<IndicesShardStoresResponse> listener,
ClusterManagerMetrics clusterManagerMetrics
) {
this.nodes = nodes;
this.routingNodes = routingNodes;
this.shards = shards;
this.listener = listener;
this.fetchResponses = new ConcurrentLinkedQueue<>();
this.expectedOps = new CountDown(shards.size());
this.clusterManagerMetrics = clusterManagerMetrics;
}

void start() {
if (shards.isEmpty()) {
listener.onResponse(new IndicesShardStoresResponse());
} else {
for (Tuple<ShardId, String> shard : shards) {
InternalAsyncFetch fetch = new InternalAsyncFetch(logger, "shard_stores", shard.v1(), shard.v2(), listShardStoresInfo);
InternalAsyncFetch fetch = new InternalAsyncFetch(
logger,
"shard_stores",
shard.v1(),
shard.v2(),
listShardStoresInfo,
clusterManagerMetrics
);
fetch.fetchData(nodes, Collections.emptyMap());
}
}
Expand All @@ -213,9 +227,10 @@ private class InternalAsyncFetch extends AsyncShardFetch<NodeGatewayStartedShard
String type,
ShardId shardId,
String customDataPath,
TransportNodesListGatewayStartedShards action
TransportNodesListGatewayStartedShards action,
ClusterManagerMetrics clusterManagerMetrics
) {
super(logger, type, shardId, customDataPath, action);
super(logger, type, shardId, customDataPath, action, clusterManagerMetrics);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public final class ClusterManagerMetrics {

public final Counter leaderCheckFailureCounter;
public final Counter followerChecksFailureCounter;
public final Counter asyncFetchFailureCounter;
public final Counter asyncFetchSuccessCounter;

public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
clusterStateAppliersHistogram = metricsRegistry.createHistogram(
Expand Down Expand Up @@ -71,6 +73,17 @@ public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
"Counter for number of failed leader checks",
COUNTER_METRICS_UNIT
);
asyncFetchFailureCounter = metricsRegistry.createCounter(
"async.fetch.failure.count",
"Counter for number of failed async fetches",
COUNTER_METRICS_UNIT
);
asyncFetchSuccessCounter = metricsRegistry.createCounter(
"async.fetch.success.count",
"Counter for number of successful async fetches",
COUNTER_METRICS_UNIT
);

}

public void recordLatency(Histogram histogram, Double value) {
Expand Down
Loading

0 comments on commit 23b9c64

Please sign in to comment.