Skip to content

Commit

Permalink
Populate RecoveryState details for shallow snapshot restore (opensear…
Browse files Browse the repository at this point in the history
…ch-project#15353)

---------
Signed-off-by: Lakshya Taragi <[email protected]>
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
ltaragi authored and Sachin Kale committed Sep 2, 2024
1 parent 5653ed6 commit 0e04819
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
import org.opensearch.client.Requests;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.common.Nullable;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.io.PathUtils;
Expand All @@ -31,6 +33,7 @@
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
import org.opensearch.snapshots.SnapshotInfo;
Expand Down Expand Up @@ -63,6 +66,8 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteRestoreSnapshotIT extends AbstractSnapshotIntegTestCase {
Expand Down Expand Up @@ -579,6 +584,37 @@ public void testRestoreShallowSnapshotRepository() throws ExecutionException, In
ensureGreen(restoredIndexName1);
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1);

// ensure recovery details are non-zero
RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(restoredIndexName1).execute().actionGet();
assertEquals(1, recoveryResponse.getTotalShards());
assertEquals(1, recoveryResponse.getSuccessfulShards());
assertEquals(0, recoveryResponse.getFailedShards());
assertEquals(1, recoveryResponse.shardRecoveryStates().size());
assertTrue(recoveryResponse.shardRecoveryStates().containsKey(restoredIndexName1));
assertEquals(1, recoveryResponse.shardRecoveryStates().get(restoredIndexName1).size());

RecoveryState recoveryState = recoveryResponse.shardRecoveryStates().get(restoredIndexName1).get(0);
assertEquals(RecoveryState.Stage.DONE, recoveryState.getStage());
assertEquals(0, recoveryState.getShardId().getId());
assertTrue(recoveryState.getPrimary());
assertEquals(RecoverySource.Type.SNAPSHOT, recoveryState.getRecoverySource().getType());
assertThat(recoveryState.getIndex().time(), greaterThanOrEqualTo(0L));

// ensure populated file details
assertTrue(recoveryState.getIndex().totalFileCount() > 0);
assertTrue(recoveryState.getIndex().totalRecoverFiles() > 0);
assertTrue(recoveryState.getIndex().recoveredFileCount() > 0);
assertThat(recoveryState.getIndex().recoveredFilesPercent(), greaterThanOrEqualTo(0.0f));
assertThat(recoveryState.getIndex().recoveredFilesPercent(), lessThanOrEqualTo(100.0f));
assertFalse(recoveryState.getIndex().fileDetails().isEmpty());

// ensure populated bytes details
assertTrue(recoveryState.getIndex().recoveredBytes() > 0L);
assertTrue(recoveryState.getIndex().totalBytes() > 0L);
assertTrue(recoveryState.getIndex().totalRecoverBytes() > 0L);
assertThat(recoveryState.getIndex().recoveredBytesPercent(), greaterThanOrEqualTo(0.0f));
assertThat(recoveryState.getIndex().recoveredBytesPercent(), lessThanOrEqualTo(100.0f));

// indexing some new docs and validating
indexDocuments(client, restoredIndexName1, numDocsInIndex1, numDocsInIndex1 + 2);
ensureGreen(restoredIndexName1);
Expand Down
17 changes: 15 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -5130,10 +5130,23 @@ public void syncSegmentsFromGivenRemoteSegmentStore(
}
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = sourceRemoteDirectory
.getSegmentsUploadedToRemoteStore();
final Directory storeDirectory = store.directory();
store.incRef();

try {
final Directory storeDirectory;
if (recoveryState.getStage() == RecoveryState.Stage.INDEX) {
storeDirectory = new StoreRecovery.StatsDirectoryWrapper(store.directory(), recoveryState.getIndex());
for (String file : uploadedSegments.keySet()) {
long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum());
if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) {
recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), false);
} else {
recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), true);
}
}
} else {
storeDirectory = store.directory();
}

String segmentsNFile = copySegmentFiles(
storeDirectory,
sourceRemoteDirectory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2834,9 +2834,9 @@ public void testSyncSegmentsFromGivenRemoteSegmentStore() throws IOException {
RecoverySource.ExistingStoreRecoverySource.INSTANCE
);
routing = ShardRoutingHelper.newWithRestoreSource(routing, new RecoverySource.EmptyStoreRecoverySource());

target = reinitShard(target, routing);

DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
target.markAsRecovering("from snapshot", new RecoveryState(routing, localNode, null));
target.syncSegmentsFromGivenRemoteSegmentStore(false, tempRemoteSegmentDirectory, primaryTerm, commitGeneration);
RemoteSegmentStoreDirectory remoteStoreDirectory = ((RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) target
.remoteStore()
Expand Down

0 comments on commit 0e04819

Please sign in to comment.