Skip to content

Commit

Permalink
Refactor remote store flow to support any path type with bwc
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Mar 21, 2024
1 parent f3d2bee commit 21636af
Show file tree
Hide file tree
Showing 13 changed files with 148 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.MapperService.MergeReason;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.remote.RemoteStorePathResolver;
import org.opensearch.index.remote.RemoteStorePathType;
import org.opensearch.index.remote.RemoteStorePathTypeResolver;
import org.opensearch.index.shard.IndexSettingProvider;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.IndexCreationException;
Expand All @@ -113,6 +113,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -170,7 +171,7 @@ public class MetadataCreateIndexService {
private AwarenessReplicaBalance awarenessReplicaBalance;

@Nullable
private final RemoteStorePathResolver remoteStorePathResolver;
private final RemoteStorePathTypeResolver remoteStorePathTypeResolver;

public MetadataCreateIndexService(
final Settings settings,
Expand Down Expand Up @@ -203,8 +204,8 @@ public MetadataCreateIndexService(

// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
createIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_INDEX_KEY, true);
remoteStorePathResolver = isRemoteDataAttributePresent(settings)
? new RemoteStorePathResolver(clusterService.getClusterSettings())
remoteStorePathTypeResolver = isRemoteDataAttributePresent(settings)
? new RemoteStorePathTypeResolver(clusterService.getClusterSettings())
: null;
}

Expand Down Expand Up @@ -553,7 +554,7 @@ IndexMetadata buildAndValidateTemporaryIndexMetadata(
tmpImdBuilder.setRoutingNumShards(routingNumShards);
tmpImdBuilder.settings(indexSettings);
tmpImdBuilder.system(isSystem);
addRemoteCustomData(tmpImdBuilder);
addRemoteStorePathTypeInCustomData(tmpImdBuilder, true);

// Set up everything, now locally create the index to see that things are ok, and apply
IndexMetadata tempMetadata = tmpImdBuilder.build();
Expand All @@ -562,17 +563,18 @@ IndexMetadata buildAndValidateTemporaryIndexMetadata(
return tempMetadata;
}

public void addRemoteCustomData(IndexMetadata.Builder tmpImdBuilder) {
if (remoteStorePathResolver != null) {
public void addRemoteStorePathTypeInCustomData(IndexMetadata.Builder tmpImdBuilder, boolean assertNullOldType) {
if (remoteStorePathTypeResolver != null) {
// It is possible that remote custom data exists already. In such cases, we need to only update the path type
// in the remote store custom data map.
Map<String, String> existingRemoteCustomData = tmpImdBuilder.removeCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
Map<String, String> remoteCustomData = existingRemoteCustomData == null
? new HashMap<>()
: new HashMap<>(existingRemoteCustomData);
// Determine the path type for use using the remoteStorePathResolver.
String newPathType = remoteStorePathResolver.resolveType().toString();
String newPathType = remoteStorePathTypeResolver.getType().toString();
String oldPathType = remoteCustomData.put(RemoteStorePathType.NAME, newPathType);
assert !assertNullOldType || Objects.isNull(oldPathType);
logger.trace(() -> new ParameterizedMessage("Added new path type {}, replaced old path type {}", newPathType, oldPathType));
tmpImdBuilder.putCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY, remoteCustomData);
}
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,8 @@ public synchronized IndexShard createShard(
remoteDirectory = ((RemoteSegmentStoreDirectoryFactory) remoteDirectoryFactory).newDirectory(
RemoteStoreNodeAttribute.getRemoteStoreSegmentRepo(this.indexSettings.getNodeSettings()),
this.indexSettings.getUUID(),
shardId
shardId,
this.indexSettings.getRemoteStorePathType()

Check warning on line 511 in server/src/main/java/org/opensearch/index/IndexService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexService.java#L511

Added line #L511 was not covered by tests
);
}
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY, path);
Expand Down
9 changes: 9 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.index.Index;
import org.opensearch.index.remote.RemoteStorePathType;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.ingest.IngestService;
Expand All @@ -59,6 +60,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -1905,4 +1907,11 @@ public double getDocIdFuzzySetFalsePositiveProbability() {
public void setDocIdFuzzySetFalsePositiveProbability(double docIdFuzzySetFalsePositiveProbability) {
this.docIdFuzzySetFalsePositiveProbability = docIdFuzzySetFalsePositiveProbability;
}

public RemoteStorePathType getRemoteStorePathType() {
Map<String, String> remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
return remoteCustomData != null && remoteCustomData.containsKey(RemoteStorePathType.NAME)
? RemoteStorePathType.parseString(remoteCustomData.get(RemoteStorePathType.NAME))
: RemoteStorePathType.FIXED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@

package org.opensearch.index.remote;

import org.opensearch.common.blobstore.BlobPath;

import java.util.Locale;

import static org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory.LOCK_FILES;
import static org.opensearch.index.translog.RemoteFsTranslog.TRANSLOG;

/**
* Enumerates the types of remote store paths resolution techniques supported by OpenSearch.
* For more information, see <a href="https://github.com/opensearch-project/OpenSearch/issues/12567">Github issue #12567</a>.
Expand All @@ -18,13 +23,48 @@
*/
public enum RemoteStorePathType {

FIXED,
HASHED_PREFIX;
FIXED {
@Override
public BlobPath validateAndGeneratePath(BlobPath basePath, String indexUUID, String shardId, String dataCategory, String dataType) {
return basePath.add(indexUUID).add(shardId).add(dataCategory).add(dataType);
}
},
HASHED_PREFIX {
@Override
public BlobPath validateAndGeneratePath(BlobPath basePath, String indexUUID, String shardId, String dataCategory, String dataType) {
// TODO - We need to implement this, keeping the same path as Fixed for sake of multiple tests that can fail otherwise.
// throw new UnsupportedOperationException("Not implemented"); --> Not using this for some tests
return basePath.add(indexUUID).add(shardId).add(dataCategory).add(dataType);

Check warning on line 37 in server/src/main/java/org/opensearch/index/remote/RemoteStorePathType.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteStorePathType.java#L37

Added line #L37 was not covered by tests
}
};

/**
* @param basePath base path of the underlying blob store repository
* @param indexUUID of the index
* @param shardId shard id
* @param dataCategory is either translog or segment
* @param dataType can be one of data, metadata or lock_files.
* @return the blob path for the underlying remote store path type.
*/
public BlobPath generatePath(BlobPath basePath, String indexUUID, String shardId, String dataCategory, String dataType) {
assertDataCategoryAndTypeCombination(dataCategory, dataType);
return validateAndGeneratePath(basePath, indexUUID, shardId, dataCategory, dataType);
}

abstract BlobPath validateAndGeneratePath(BlobPath basePath, String indexUUID, String shardId, String dataCategory, String dataType);

/**
* This method verifies that if the data category is translog, then the data type can not be lock_files. All other
* combination of data categories and data types are possible.
*/
private static void assertDataCategoryAndTypeCombination(String dataCategory, String dataType) {
assert dataCategory.equals(TRANSLOG) == false || dataType.equals(LOCK_FILES) == false;
}

public static RemoteStorePathType parseString(String remoteStoreBlobPathType) {
try {
return RemoteStorePathType.valueOf(remoteStoreBlobPathType.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
} catch (IllegalArgumentException | NullPointerException e) {

Check warning on line 67 in server/src/main/java/org/opensearch/index/remote/RemoteStorePathType.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteStorePathType.java#L67

Added line #L67 was not covered by tests
throw new IllegalArgumentException("Could not parse RemoteStorePathType for [" + remoteStoreBlobPathType + "]");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,20 @@
*
* @opensearch.internal
*/
public class RemoteStorePathResolver {
public class RemoteStorePathTypeResolver {

private final ClusterSettings clusterSettings;
private volatile RemoteStorePathType type;

public RemoteStorePathResolver(ClusterSettings clusterSettings) {
this.clusterSettings = clusterSettings;
public RemoteStorePathTypeResolver(ClusterSettings clusterSettings) {
type = clusterSettings.get(IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING);
clusterSettings.addSettingsUpdateConsumer(IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING, this::setType);
}

public RemoteStorePathType resolveType() {
return clusterSettings.get(IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING);
public RemoteStorePathType getType() {
return type;
}

public void setType(RemoteStorePathType type) {
this.type = type;

Check warning on line 33 in server/src/main/java/org/opensearch/index/remote/RemoteStorePathTypeResolver.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteStorePathTypeResolver.java#L33

Added line #L33 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.remote.RemoteStorePathType;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.snapshots.IndexShardRestoreFailedException;
import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot;
Expand Down Expand Up @@ -409,7 +410,8 @@ void recoverFromSnapshotAndRemoteStore(
RemoteSegmentStoreDirectory sourceRemoteDirectory = (RemoteSegmentStoreDirectory) directoryFactory.newDirectory(
remoteStoreRepository,
indexUUID,
shardId
shardId,
RemoteStorePathType.FIXED // TODO - The path type needs to be obtained from RemoteStoreShardShallowCopySnapshot
);
sourceRemoteDirectory.initializeToSpecificCommit(
primaryTerm,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.remote.RemoteStorePathType;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.store.lockmanager.FileLockInfo;
import org.opensearch.index.store.lockmanager.RemoteStoreCommitLevelLockManager;
Expand Down Expand Up @@ -897,13 +898,15 @@ public static void remoteDirectoryCleanup(
RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory,
String remoteStoreRepoForIndex,
String indexUUID,
ShardId shardId
ShardId shardId,
RemoteStorePathType pathType
) {
try {
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory(
remoteStoreRepoForIndex,
indexUUID,
shardId
shardId,
pathType
);
remoteSegmentStoreDirectory.deleteStaleSegments(0);
remoteSegmentStoreDirectory.deleteIfEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.remote.RemoteStorePathType;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
Expand All @@ -23,6 +24,7 @@
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Objects;
import java.util.function.Supplier;

/**
Expand All @@ -32,6 +34,8 @@
*/
public class RemoteSegmentStoreDirectoryFactory implements IndexStorePlugin.DirectoryFactory {
private static final String SEGMENTS = "segments";
private final static String DATA_DIR = "data";
private final static String METADATA_DIR = "metadata";

private final Supplier<RepositoriesService> repositoriesService;

Expand All @@ -46,29 +50,38 @@ public RemoteSegmentStoreDirectoryFactory(Supplier<RepositoriesService> reposito
public Directory newDirectory(IndexSettings indexSettings, ShardPath path) throws IOException {
String repositoryName = indexSettings.getRemoteStoreRepository();
String indexUUID = indexSettings.getIndex().getUUID();
return newDirectory(repositoryName, indexUUID, path.getShardId());
return newDirectory(repositoryName, indexUUID, path.getShardId(), indexSettings.getRemoteStorePathType());
}

public Directory newDirectory(String repositoryName, String indexUUID, ShardId shardId) throws IOException {
public Directory newDirectory(String repositoryName, String indexUUID, ShardId shardId, RemoteStorePathType pathType)
throws IOException {
assert Objects.nonNull(pathType);
try (Repository repository = repositoriesService.get().repository(repositoryName)) {

assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobStoreRepository blobStoreRepository = ((BlobStoreRepository) repository);
BlobPath commonBlobPath = blobStoreRepository.basePath();
commonBlobPath = commonBlobPath.add(indexUUID).add(String.valueOf(shardId.id())).add(SEGMENTS);
BlobPath repositoryBasePath = blobStoreRepository.basePath();
String shardIdStr = String.valueOf(shardId.id());

// Derive the path for data directory of SEGMENTS
BlobPath dataBlobPath = pathType.generatePath(repositoryBasePath, indexUUID, shardIdStr, SEGMENTS, DATA_DIR);
RemoteDirectory dataDirectory = new RemoteDirectory(
blobStoreRepository.blobStore().blobContainer(commonBlobPath.add("data")),
blobStoreRepository.blobStore().blobContainer(dataBlobPath),
blobStoreRepository::maybeRateLimitRemoteUploadTransfers,
blobStoreRepository::maybeRateLimitRemoteDownloadTransfers
);
RemoteDirectory metadataDirectory = new RemoteDirectory(
blobStoreRepository.blobStore().blobContainer(commonBlobPath.add("metadata"))
);

// Derive the path for metadata directory of SEGMENTS
BlobPath mdBlobPath = pathType.generatePath(repositoryBasePath, indexUUID, shardIdStr, SEGMENTS, METADATA_DIR);
RemoteDirectory metadataDirectory = new RemoteDirectory(blobStoreRepository.blobStore().blobContainer(mdBlobPath));

// The path for lock is derived within the RemoteStoreLockManagerFactory
RemoteStoreLockManager mdLockManager = RemoteStoreLockManagerFactory.newLockManager(
repositoriesService.get(),
repositoryName,
indexUUID,
String.valueOf(shardId.id())
shardIdStr,
pathType
);

return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool, shardId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.index.remote.RemoteStorePathType;
import org.opensearch.index.store.RemoteBufferedOutputDirectory;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.IOException;
import java.util.function.Supplier;

/**
Expand All @@ -28,33 +28,30 @@
@PublicApi(since = "2.8.0")
public class RemoteStoreLockManagerFactory {
private static final String SEGMENTS = "segments";
private static final String LOCK_FILES = "lock_files";
public static final String LOCK_FILES = "lock_files";
private final Supplier<RepositoriesService> repositoriesService;

public RemoteStoreLockManagerFactory(Supplier<RepositoriesService> repositoriesService) {
this.repositoriesService = repositoriesService;
}

public RemoteStoreLockManager newLockManager(String repositoryName, String indexUUID, String shardId) throws IOException {
return newLockManager(repositoriesService.get(), repositoryName, indexUUID, shardId);
public RemoteStoreLockManager newLockManager(String repositoryName, String indexUUID, String shardId, RemoteStorePathType pathType) {
return newLockManager(repositoriesService.get(), repositoryName, indexUUID, shardId, pathType);
}

public static RemoteStoreMetadataLockManager newLockManager(
RepositoriesService repositoriesService,
String repositoryName,
String indexUUID,
String shardId
) throws IOException {
String shardId,
RemoteStorePathType pathType
) {
try (Repository repository = repositoriesService.repository(repositoryName)) {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobPath shardLevelBlobPath = ((BlobStoreRepository) repository).basePath().add(indexUUID).add(shardId).add(SEGMENTS);
RemoteBufferedOutputDirectory shardMDLockDirectory = createRemoteBufferedOutputDirectory(
repository,
shardLevelBlobPath,
LOCK_FILES
);

return new RemoteStoreMetadataLockManager(shardMDLockDirectory);
BlobPath repositoryBasePath = ((BlobStoreRepository) repository).basePath();
BlobPath lockDirectoryPath = pathType.generatePath(repositoryBasePath, indexUUID, shardId, SEGMENTS, LOCK_FILES);
BlobContainer lockDirectoryBlobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(lockDirectoryPath);
return new RemoteStoreMetadataLockManager(new RemoteBufferedOutputDirectory(lockDirectoryBlobContainer));
} catch (RepositoryMissingException e) {
throw new IllegalArgumentException("Repository should be present to acquire/release lock", e);
}
Expand All @@ -65,14 +62,4 @@ public static RemoteStoreMetadataLockManager newLockManager(
public Supplier<RepositoriesService> getRepositoriesService() {
return repositoriesService;
}

private static RemoteBufferedOutputDirectory createRemoteBufferedOutputDirectory(
Repository repository,
BlobPath commonBlobPath,
String extention
) {
BlobPath extendedPath = commonBlobPath.add(extention);
BlobContainer dataBlobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(extendedPath);
return new RemoteBufferedOutputDirectory(dataBlobContainer);
}
}
Loading

0 comments on commit 21636af

Please sign in to comment.