diff --git a/CHANGELOG.md b/CHANGELOG.md index db44887a0e59f..cfbdf0f2bb5c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x] ### Added - Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478)) +- Add composite directory implementation and integrate it with FileCache ([12782](https://github.com/opensearch-project/OpenSearch/pull/12782)) ### Dependencies - Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559)) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexRemoteStoreSegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexRemoteStoreSegmentReplicationIT.java new file mode 100644 index 0000000000000..1964dc3136bb2 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexRemoteStoreSegmentReplicationIT.java @@ -0,0 +1,297 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.Before; +import org.opensearch.cluster.metadata.RepositoriesMetadata; +import org.opensearch.cluster.metadata.RepositoryMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.IndexModule; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchIntegTestCase; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) +public class WarmIndexRemoteStoreSegmentReplicationIT extends SegmentReplicationIT { + + protected static final String REPOSITORY_NAME = "test-remote-store-repo"; + protected static final String REPOSITORY_2_NAME = "test-remote-store-repo-2"; + + protected Path segmentRepoPath; + protected Path translogRepoPath; + protected boolean clusterSettingsSuppliedByTest = false; + + @Before + private void setup() { + internalCluster().startClusterManagerOnlyNode(); + } + + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), "partial") + .build(); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + if (segmentRepoPath == null || translogRepoPath == null) { + segmentRepoPath = randomRepoPath().toAbsolutePath(); + translogRepoPath = randomRepoPath().toAbsolutePath(); + } + if (clusterSettingsSuppliedByTest) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build(); + } else { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath)) + //.put(RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), -1) + .build(); + } + } + + @Override + protected Settings featureFlagSettings() { + Settings.Builder featureSettings = Settings.builder(); + featureSettings.put(FeatureFlags.WRITEABLE_REMOTE_INDEX, true); + + return featureSettings.build(); + } + + @Override + protected boolean addMockIndexStorePlugin() { + return false; + } + + public void testPrimaryStopped_ReplicaPromoted() throws Exception { + super.testPrimaryStopped_ReplicaPromoted(); + } + + public void testRestartPrimary() throws Exception { + super.testRestartPrimary(); + } + + public void testCancelPrimaryAllocation() throws Exception { + super.testCancelPrimaryAllocation(); + } + + public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { + super.testReplicationAfterPrimaryRefreshAndFlush(); + } + + public void testIndexReopenClose() throws Exception { + super.testIndexReopenClose(); + } + + public void testScrollWithConcurrentIndexAndSearch() throws Exception { + super.testScrollWithConcurrentIndexAndSearch(); + } + + public void testMultipleShards() throws Exception { + super.testMultipleShards(); + } + + public void testReplicationAfterForceMerge() throws Exception { + super.testReplicationAfterForceMerge(); + } + + public void testReplicationAfterForceMergeOnPrimaryShardsOnly() throws Exception { + super.testReplicationAfterForceMergeOnPrimaryShardsOnly(); + } + + public void testNodeDropWithOngoingReplication() throws Exception { + super.testNodeDropWithOngoingReplication(); + } + + public void testCancellation() throws Exception { + super.testCancellation(); + } + + public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { + super.testStartReplicaAfterPrimaryIndexesDocs(); + } + + public void testDeleteOperations() throws Exception { + super.testDeleteOperations(); + } + + public void testReplicationPostDeleteAndForceMerge() throws Exception { + super.testReplicationPostDeleteAndForceMerge(); + } + + public void testUpdateOperations() throws Exception { + super.testUpdateOperations(); + } + + public void testReplicaHasDiffFilesThanPrimary() throws Exception { + super.testReplicaHasDiffFilesThanPrimary(); + } + + public void testDropPrimaryDuringReplication() throws Exception { + super.testDropPrimaryDuringReplication(); + } + + public void testPressureServiceStats() throws Exception { + super.testPressureServiceStats(); + } + + public void testScrollCreatedOnReplica() throws Exception { + assumeFalse( + "Skipping the test as its not compatible with segment replication with remote store.", + warmIndexSegmentReplicationEnabled() + ); + super.testScrollCreatedOnReplica(); + } + + public void testScrollWithOngoingSegmentReplication() { + assumeFalse( + "Skipping the test as its not compatible with segment replication with remote store.", + warmIndexSegmentReplicationEnabled() + ); + } + + public void testPitCreatedOnReplica() throws Exception { + assumeFalse( + "Skipping the test as its not compatible with segment replication with remote store.", + warmIndexSegmentReplicationEnabled() + ); + super.testPitCreatedOnReplica(); + } + + public void testPrimaryReceivesDocsDuringReplicaRecovery() throws Exception { + super.testPrimaryReceivesDocsDuringReplicaRecovery(); + } + + public void testIndexWhileRecoveringReplica() throws Exception { + super.testIndexWhileRecoveringReplica(); + } + + public void testRestartPrimary_NoReplicas() throws Exception { + super.testRestartPrimary_NoReplicas(); + } + + public void testRealtimeGetRequestsSuccessful() { + super.testRealtimeGetRequestsSuccessful(); + } + + public void testRealtimeGetRequestsUnsuccessful() { + super.testRealtimeGetRequestsUnsuccessful(); + } + + public void testRealtimeMultiGetRequestsSuccessful() { + super.testRealtimeMultiGetRequestsSuccessful(); + } + + public void testRealtimeMultiGetRequestsUnsuccessful() { + super.testRealtimeMultiGetRequestsUnsuccessful(); + } + + public void testRealtimeTermVectorRequestsSuccessful() throws IOException { + super.testRealtimeTermVectorRequestsSuccessful(); + } + + public void testRealtimeTermVectorRequestsUnSuccessful() throws IOException { + super.testRealtimeTermVectorRequestsUnSuccessful(); + } + + public void testReplicaAlreadyAtCheckpoint() throws Exception { + super.testReplicaAlreadyAtCheckpoint(); + } + + public void testCancellationDuringGetCheckpointInfo() { + assumeFalse( + "Skipping the test as its not compatible with segment replication with remote store.", + warmIndexSegmentReplicationEnabled() + ); + } + + public void testCancellationDuringGetSegments() { + assumeFalse( + "Skipping the test as its not compatible with segment replication with remote store.", + warmIndexSegmentReplicationEnabled() + ); + } + + protected boolean warmIndexSegmentReplicationEnabled() { + return !Objects.equals(IndexModule.INDEX_STORE_LOCALITY_SETTING.get(indexSettings()).toString(), "partial"); + } + + @After + public void teardown() { + clusterSettingsSuppliedByTest = false; + assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_NAME); + assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_2_NAME); + clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get(); + clusterAdmin().prepareCleanupRepository(REPOSITORY_2_NAME).get(); + } + + public RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String name) { + Map nodeAttributes = node.getAttributes(); + String type = nodeAttributes.get(String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name)); + + String settingsAttributeKeyPrefix = String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, name); + Map settingsMap = node.getAttributes() + .keySet() + .stream() + .filter(key -> key.startsWith(settingsAttributeKeyPrefix)) + .collect(Collectors.toMap(key -> key.replace(settingsAttributeKeyPrefix, ""), key -> node.getAttributes().get(key))); + + Settings.Builder settings = Settings.builder(); + settingsMap.entrySet().forEach(entry -> settings.put(entry.getKey(), entry.getValue())); + settings.put(BlobStoreRepository.SYSTEM_REPOSITORY_SETTING.getKey(), true); + + return new RepositoryMetadata(name, type, settings.build()); + } + + public void assertRemoteStoreRepositoryOnAllNodes(String repositoryName) { + RepositoriesMetadata repositories = internalCluster().getInstance(ClusterService.class, internalCluster().getNodeNames()[0]) + .state() + .metadata() + .custom(RepositoriesMetadata.TYPE); + RepositoryMetadata actualRepository = repositories.repository(repositoryName); + + final RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class); + final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repositoryName); + + for (String nodeName : internalCluster().getNodeNames()) { + ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nodeName); + DiscoveryNode node = clusterService.localNode(); + RepositoryMetadata expectedRepository = buildRepositoryMetadata(node, repositoryName); + + // Validated that all the restricted settings are entact on all the nodes. + repository.getRestrictedSystemRepositorySettings() + .stream() + .forEach( + setting -> assertEquals( + String.format(Locale.ROOT, "Restricted Settings mismatch [%s]", setting.getKey()), + setting.get(actualRepository.settings()), + setting.get(expectedRepository.settings()) + ) + ); + } + } + +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/CompositeDirectoryIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/CompositeDirectoryIT.java new file mode 100644 index 0000000000000..3d52e6614f6a3 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/CompositeDirectoryIT.java @@ -0,0 +1,89 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.opensearch.action.admin.indices.get.GetIndexRequest; +import org.opensearch.action.admin.indices.get.GetIndexResponse; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexService; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.CompositeDirectory; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; +import org.opensearch.indices.IndicesService; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.Map; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false) +// Uncomment the below line to enable trace level logs for this test for better debugging +// @TestLogging(reason = "Getting trace logs from composite directory package", value = "org.opensearch.index.store:TRACE") +public class CompositeDirectoryIT extends RemoteStoreBaseIntegTestCase { + + /* + Disabling MockFSIndexStore plugin as the MockFSDirectoryFactory wraps the FSDirectory over a OpenSearchMockDirectoryWrapper which extends FilterDirectory (whereas FSDirectory extends BaseDirectory) + As a result of this wrapping the local directory of Composite Directory does not satisfy the assertion that local directory must be of type FSDirectory + */ + @Override + protected boolean addMockIndexStorePlugin() { + return false; + } + + @Override + protected Settings featureFlagSettings() { + Settings.Builder featureSettings = Settings.builder(); + featureSettings.put(FeatureFlags.WRITEABLE_REMOTE_INDEX, true); + + return featureSettings.build(); + } + + public void testCompositeDirectory() throws Exception { + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), "partial") + .build(); + assertAcked(client().admin().indices().prepareCreate("test-idx-1").setSettings(settings).get()); + + // Check if the Directory initialized for the IndexShard is of Composite Directory type + IndexService indexService = internalCluster().getDataNodeInstance(IndicesService.class).indexService(resolveIndex("test-idx-1")); + IndexShard shard = indexService.getShardOrNull(0); + Directory directory = (((FilterDirectory) (((FilterDirectory) (shard.store().directory())).getDelegate())).getDelegate()); + assertTrue(directory instanceof CompositeDirectory); + + // Verify from the cluster settings if the data locality is partial + GetIndexResponse getIndexResponse = client().admin() + .indices() + .getIndex(new GetIndexRequest().indices("test-idx-1").includeDefaults(true)) + .get(); + Settings indexSettings = getIndexResponse.settings().get("test-idx-1"); + assertEquals("partial", indexSettings.get("index.store.data_locality")); + + // Index data and ensure cluster does not turn red while indexing + Map stats = indexData(10, false, "test-idx-1"); + refresh("test-idx-1"); + ensureGreen("test-idx-1"); + + // Search and verify that the total docs indexed match the search hits + SearchResponse searchResponse3 = client().prepareSearch("test-idx-1").setQuery(QueryBuilders.matchAllQuery()).get(); + assertHitCount(searchResponse3, stats.get(TOTAL_OPERATIONS)); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index 121f8d935cf48..c8bbad2f0073b 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -75,6 +75,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; @@ -986,6 +987,7 @@ static Settings aggregateIndexSettings( validateStoreTypeSettings(indexSettings); validateRefreshIntervalSettings(request.settings(), clusterSettings); validateTranslogDurabilitySettings(request.settings(), clusterSettings, settings); + validateIndexStoreLocality(request.settings()); return indexSettings; } @@ -1679,4 +1681,16 @@ static void validateTranslogDurabilitySettings(Settings requestSettings, Cluster } } + + public static void validateIndexStoreLocality(Settings indexSettings) { + if (indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.FULL.toString()) + .equalsIgnoreCase(IndexModule.DataLocalityType.PARTIAL.toString()) + && !FeatureFlags.isEnabled(FeatureFlags.WRITEABLE_REMOTE_INDEX_SETTING)) { + throw new IllegalArgumentException( + "index.store.locality can be set to PARTIAL only if Feature Flag [" + + FeatureFlags.WRITEABLE_REMOTE_INDEX_SETTING.getKey() + + "] is set to true" + ); + } + } } diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 980c432774f6e..f70b558273242 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -188,6 +188,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING, BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING, IndexModule.INDEX_STORE_TYPE_SETTING, + IndexModule.INDEX_STORE_LOCALITY_SETTING, IndexModule.INDEX_STORE_PRE_LOAD_SETTING, IndexModule.INDEX_STORE_HYBRID_MMAP_EXTENSIONS, IndexModule.INDEX_STORE_HYBRID_NIO_EXTENSIONS, diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 3c4cb4fd596c1..4c494a6b35153 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -107,6 +107,8 @@ import java.util.function.Function; import java.util.function.Supplier; +import static org.apache.logging.log4j.util.Strings.toRootUpperCase; + /** * IndexModule represents the central extension point for index level custom implementations like: *
    @@ -141,6 +143,17 @@ public final class IndexModule { Property.NodeScope ); + /** + * Index setting which used to determine how the data is cached locally fully or partially + */ + public static final Setting INDEX_STORE_LOCALITY_SETTING = new Setting<>( + "index.store.data_locality", + DataLocalityType.FULL.name(), + DataLocalityType::getValueOf, + Property.IndexScope, + Property.NodeScope + ); + public static final Setting INDEX_RECOVERY_TYPE_SETTING = new Setting<>( "index.recovery.type", "", @@ -297,6 +310,7 @@ public Iterator> settings() { private final AtomicBoolean frozen = new AtomicBoolean(false); private final BooleanSupplier allowExpensiveQueries; private final Map recoveryStateFactories; + private final FileCache fileCache; /** * Construct the index module for the index with the specified index settings. The index module contains extension points for plugins @@ -315,7 +329,8 @@ public IndexModule( final Map directoryFactories, final BooleanSupplier allowExpensiveQueries, final IndexNameExpressionResolver expressionResolver, - final Map recoveryStateFactories + final Map recoveryStateFactories, + final FileCache fileCache ) { this.indexSettings = indexSettings; this.analysisRegistry = analysisRegistry; @@ -327,6 +342,30 @@ public IndexModule( this.allowExpensiveQueries = allowExpensiveQueries; this.expressionResolver = expressionResolver; this.recoveryStateFactories = recoveryStateFactories; + this.fileCache = fileCache; + } + + public IndexModule( + final IndexSettings indexSettings, + final AnalysisRegistry analysisRegistry, + final EngineFactory engineFactory, + final EngineConfigFactory engineConfigFactory, + final Map directoryFactories, + final BooleanSupplier allowExpensiveQueries, + final IndexNameExpressionResolver expressionResolver, + final Map recoveryStateFactories + ) { + this( + indexSettings, + analysisRegistry, + engineFactory, + engineConfigFactory, + directoryFactories, + allowExpensiveQueries, + expressionResolver, + recoveryStateFactories, + null + ); } /** @@ -577,6 +616,40 @@ public boolean match(Settings settings) { } } + /** + * Indicates the locality of the data - whether it will be cached fully or partially + */ + public enum DataLocalityType { + /** + * Indicates that all the data will be cached locally + */ + FULL, + /** + * Indicates that only a subset of the data will be cached locally + */ + PARTIAL; + + private static final Map LOCALITY_TYPES; + + static { + final Map localityTypes = new HashMap<>(values().length); + for (final DataLocalityType dataLocalityType : values()) { + localityTypes.put(dataLocalityType.name(), dataLocalityType); + } + LOCALITY_TYPES = Collections.unmodifiableMap(localityTypes); + } + + public static DataLocalityType getValueOf(final String localityType) { + Objects.requireNonNull(localityType, "No locality type given."); + final String localityTypeName = toRootUpperCase(localityType.trim()); + final DataLocalityType type = LOCALITY_TYPES.get(localityTypeName); + if (type != null) { + return type; + } + throw new IllegalArgumentException("Unknown locality type constant [" + localityType + "]."); + } + } + public static Type defaultStoreType(final boolean allowMmap) { if (allowMmap && Constants.JRE_IS_64BIT && MMapDirectory.UNMAP_SUPPORTED) { return Type.HYBRIDFS; @@ -665,7 +738,8 @@ public IndexService newIndexService( translogFactorySupplier, clusterDefaultRefreshIntervalSupplier, recoverySettings, - remoteStoreSettings + remoteStoreSettings, + fileCache ); success = true; return indexService; diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index e501d7eff3f81..d021b290e585b 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -39,6 +39,7 @@ import org.apache.lucene.search.Sort; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.Accountable; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexMetadata; @@ -55,6 +56,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.AbstractAsyncTask; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.io.IOUtils; @@ -91,8 +93,11 @@ import org.opensearch.index.shard.ShardNotInPrimaryModeException; import org.opensearch.index.shard.ShardPath; import org.opensearch.index.similarity.SimilarityService; +import org.opensearch.index.store.CompositeDirectory; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.index.store.Store; +import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogFactory; import org.opensearch.indices.RemoteStoreSettings; @@ -188,6 +193,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final Supplier clusterDefaultRefreshIntervalSupplier; private final RecoverySettings recoverySettings; private final RemoteStoreSettings remoteStoreSettings; + private final FileCache fileCache; public IndexService( IndexSettings indexSettings, @@ -223,7 +229,8 @@ public IndexService( BiFunction translogFactorySupplier, Supplier clusterDefaultRefreshIntervalSupplier, RecoverySettings recoverySettings, - RemoteStoreSettings remoteStoreSettings + RemoteStoreSettings remoteStoreSettings, + FileCache fileCache ) { super(indexSettings); this.allowExpensiveQueries = allowExpensiveQueries; @@ -301,9 +308,85 @@ public IndexService( this.translogFactorySupplier = translogFactorySupplier; this.recoverySettings = recoverySettings; this.remoteStoreSettings = remoteStoreSettings; + this.fileCache = fileCache; updateFsyncTaskIfNecessary(); } + public IndexService( + IndexSettings indexSettings, + IndexCreationContext indexCreationContext, + NodeEnvironment nodeEnv, + NamedXContentRegistry xContentRegistry, + SimilarityService similarityService, + ShardStoreDeleter shardStoreDeleter, + IndexAnalyzers indexAnalyzers, + EngineFactory engineFactory, + EngineConfigFactory engineConfigFactory, + CircuitBreakerService circuitBreakerService, + BigArrays bigArrays, + ThreadPool threadPool, + ScriptService scriptService, + ClusterService clusterService, + Client client, + QueryCache queryCache, + IndexStorePlugin.DirectoryFactory directoryFactory, + IndexStorePlugin.DirectoryFactory remoteDirectoryFactory, + IndexEventListener eventListener, + Function> wrapperFactory, + MapperRegistry mapperRegistry, + IndicesFieldDataCache indicesFieldDataCache, + List searchOperationListeners, + List indexingOperationListeners, + NamedWriteableRegistry namedWriteableRegistry, + BooleanSupplier idFieldDataEnabled, + BooleanSupplier allowExpensiveQueries, + IndexNameExpressionResolver expressionResolver, + ValuesSourceRegistry valuesSourceRegistry, + IndexStorePlugin.RecoveryStateFactory recoveryStateFactory, + BiFunction translogFactorySupplier, + Supplier clusterDefaultRefreshIntervalSupplier, + RecoverySettings recoverySettings, + RemoteStoreSettings remoteStoreSettings + ) { + this( + indexSettings, + indexCreationContext, + nodeEnv, + xContentRegistry, + similarityService, + shardStoreDeleter, + indexAnalyzers, + engineFactory, + engineConfigFactory, + circuitBreakerService, + bigArrays, + threadPool, + scriptService, + clusterService, + client, + queryCache, + directoryFactory, + remoteDirectoryFactory, + eventListener, + wrapperFactory, + mapperRegistry, + indicesFieldDataCache, + searchOperationListeners, + indexingOperationListeners, + namedWriteableRegistry, + idFieldDataEnabled, + allowExpensiveQueries, + expressionResolver, + valuesSourceRegistry, + recoveryStateFactory, + translogFactorySupplier, + clusterDefaultRefreshIntervalSupplier, + recoverySettings, + remoteStoreSettings, + null + ); + } + static boolean needsMapperService(IndexSettings indexSettings, IndexCreationContext indexCreationContext) { return false == (indexSettings.getIndexMetadata().getState() == IndexMetadata.State.CLOSE && indexCreationContext == IndexCreationContext.CREATE_INDEX); // metadata verification needs a mapper service @@ -495,9 +578,9 @@ public synchronized IndexShard createShard( } }; Store remoteStore = null; + Directory remoteDirectory = null; boolean seedRemote = false; if (targetNode.isRemoteStoreNode()) { - final Directory remoteDirectory; if (this.indexSettings.isRemoteStoreEnabled()) { remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path); } else { @@ -530,7 +613,33 @@ public synchronized IndexShard createShard( } } - Directory directory = directoryFactory.newDirectory(this.indexSettings, path); + Directory directory = null; + if (FeatureFlags.isEnabled(FeatureFlags.WRITEABLE_REMOTE_INDEX_SETTING) && + // TODO : Need to remove this check after support for hot indices is added in Composite Directory + this.indexSettings.isStoreLocalityPartial()) { + /* + Currently Composite Directory only supports local directory to be of type FSDirectory + The reason is that FileCache currently has it key type as Path + Composite Directory currently uses FSDirectory's getDirectory() method to fetch and use the Path for operating on FileCache + TODO : Refactor FileCache to have key in form of String instead of Path. Once that is done we can remove this assertion + */ + Directory localDirectory = directoryFactory.newDirectory(this.indexSettings, path); + + if (localDirectory instanceof FSDirectory == false) throw new IllegalStateException( + "For Composite Directory, local directory must be of type FSDirectory" + ); + else if (fileCache == null) throw new IllegalStateException( + "File Cache not initialized on this Node, cannot create Composite Directory without FileCache" + ); + else if (remoteDirectory == null) throw new IllegalStateException( + "Remote Directory must not be null for Composite Directory" + ); + + directory = new CompositeDirectory((FSDirectory) localDirectory, (RemoteSegmentStoreDirectory) remoteDirectory, fileCache); + } else { + directory = directoryFactory.newDirectory(this.indexSettings, path); + } + store = new Store( shardId, this.indexSettings, diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 9d8ab6815eecc..27d194b59bc82 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -725,6 +725,7 @@ public static IndexMergePolicy fromString(String text) { private final int numberOfShards; private final ReplicationType replicationType; private final boolean isRemoteStoreEnabled; + private final boolean isStoreLocalityPartial; private volatile TimeValue remoteTranslogUploadBufferInterval; private final String remoteStoreTranslogRepository; private final String remoteStoreRepository; @@ -914,6 +915,10 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null); replicationType = IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(settings); isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false); + isStoreLocalityPartial = settings.get( + IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), + IndexModule.DataLocalityType.FULL.toString() + ).equalsIgnoreCase(IndexModule.DataLocalityType.PARTIAL.toString()); remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY); remoteTranslogUploadBufferInterval = INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings); remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY); @@ -1271,6 +1276,13 @@ public String getRemoteStoreTranslogRepository() { return remoteStoreTranslogRepository; } + /** + * Returns true if the store locality is partial + */ + public boolean isStoreLocalityPartial() { + return isStoreLocalityPartial; + } + /** * Returns true if this is remote/searchable snapshot */ diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 18d4a2ca6d639..11f12a5dcbbf0 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -5022,6 +5022,8 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE */ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runnable onFileSync) throws IOException { boolean syncSegmentSuccess = false; + + boolean shouldOverrideLocalFiles = overrideLocal && indexSettings.isStoreLocalityPartial() == false; long startTimeMs = System.currentTimeMillis(); assert indexSettings.isRemoteStoreEnabled() || this.isRemoteSeeded(); logger.trace("Downloading segments from remote segment store"); @@ -5044,7 +5046,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn storeDirectory = new StoreRecovery.StatsDirectoryWrapper(store.directory(), recoveryState.getIndex()); for (String file : uploadedSegments.keySet()) { long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum()); - if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) { + if (shouldOverrideLocalFiles || localDirectoryContains(storeDirectory, file, checksum) == false) { recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), false); } else { recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), true); @@ -5053,7 +5055,9 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn } else { storeDirectory = store.directory(); } - copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync); + if (indexSettings.isStoreLocalityPartial() == false) { + copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync); + } if (remoteSegmentMetadata != null) { final SegmentInfos infosSnapshot = store.buildSegmentInfos( @@ -5063,13 +5067,15 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); // delete any other commits, we want to start the engine only from a new commit made with the downloaded infos bytes. // Extra segments will be wiped on engine open. - for (String file : List.of(store.directory().listAll())) { - if (file.startsWith(IndexFileNames.SEGMENTS)) { - store.deleteQuiet(file); + if (indexSettings.isStoreLocalityPartial() == false) { + for (String file : List.of(store.directory().listAll())) { + if (file.startsWith(IndexFileNames.SEGMENTS)) { + store.deleteQuiet(file); + } } + assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty() + : "There should not be any segments file in the dir"; } - assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty() - : "There should not be any segments file in the dir"; store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); } syncSegmentSuccess = true; diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 351aec6e3af6c..ca9d7379de4c3 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -30,6 +30,7 @@ import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.remote.RemoteSegmentTransferTracker; import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.store.CompositeDirectory; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.translog.Translog; @@ -249,6 +250,12 @@ private boolean syncSegments() { Map localSegmentsSizeMap = updateLocalSizeMapAndTracker(localSegmentsPostRefresh).entrySet() .stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Collection segmentsToRefresh = localSegmentsPostRefresh.stream() + .filter(file -> !skipUpload(file)) + .collect(Collectors.toList()); + Directory directory = ((FilterDirectory) (((FilterDirectory) storeDirectory).getDelegate())).getDelegate(); + CountDownLatch latch = new CountDownLatch(1); ActionListener segmentUploadsCompletedListener = new LatchedActionListener<>(new ActionListener<>() { @Override @@ -270,6 +277,9 @@ public void onResponse(Void unused) { // At this point since we have uploaded new segments, segment infos and segment metadata file, // along with marking minSeqNoToKeep, upload has succeeded completely. successful.set(true); + if (directory instanceof CompositeDirectory) { + ((CompositeDirectory) directory).afterSyncToRemote(segmentsToRefresh); + } } catch (Exception e) { // We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried // as part of exponential back-off retry logic. This should not affect durability of the indexed data diff --git a/server/src/main/java/org/opensearch/index/store/CloseableFilterIndexOutput.java b/server/src/main/java/org/opensearch/index/store/CloseableFilterIndexOutput.java new file mode 100644 index 0000000000000..3a4309fe6ee6d --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/CloseableFilterIndexOutput.java @@ -0,0 +1,47 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.IndexOutput; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.lucene.store.FilterIndexOutput; + +import java.io.IOException; + +/** + * FilterIndexOutput which takes in an additional FunctionalInterface as a parameter to perform required operations once the IndexOutput is closed + * + * @opensearch.experimental + */ +@ExperimentalApi +public class CloseableFilterIndexOutput extends FilterIndexOutput { + + /** + * Functional Interface which takes the name of the file as input on which the required operations are to be performed + */ + @FunctionalInterface + public interface OnCloseListener { + void onClose(String name) throws IOException; + } + + private final OnCloseListener onCloseListener; + private final String fileName; + + public CloseableFilterIndexOutput(IndexOutput out, String fileName, OnCloseListener onCloseListener) { + super("CloseableFilterIndexOutput for file " + fileName, out); + this.fileName = fileName; + this.onCloseListener = onCloseListener; + } + + @Override + public void close() throws IOException { + super.close(); + onCloseListener.onClose(fileName); + } +} diff --git a/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java b/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java new file mode 100644 index 0000000000000..d4c5c7cce9c51 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java @@ -0,0 +1,290 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.Version; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.lucene.store.InputStreamIndexInput; +import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput; +import org.opensearch.index.store.remote.filecache.CachedIndexInput; +import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.index.store.remote.filecache.FullFileCachedIndexInput; +import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; +import org.opensearch.index.store.remote.utils.BlockIOContext; +import org.opensearch.index.store.remote.utils.FileType; +import org.opensearch.index.store.remote.utils.TransferManager; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Composite Directory will contain both local and remote directory + * Consumers of Composite directory need not worry whether file is in local or remote + * All such abstractions will be handled by the Composite directory itself + * Implements all required methods by Directory abstraction + * + * @opensearch.experimental + */ +@ExperimentalApi +public class CompositeDirectory extends FilterDirectory { + private static final Logger logger = LogManager.getLogger(CompositeDirectory.class); + private final FSDirectory localDirectory; + private final RemoteSegmentStoreDirectory remoteDirectory; + private final FileCache fileCache; + private final TransferManager transferManager; + + /** + * Constructor to initialise the composite directory + * @param localDirectory corresponding to the local FSDirectory + * @param remoteDirectory corresponding to the remote directory + * @param fileCache used to cache the remote files locally + */ + public CompositeDirectory(FSDirectory localDirectory, RemoteSegmentStoreDirectory remoteDirectory, FileCache fileCache) { + super(localDirectory); + this.localDirectory = localDirectory; + this.remoteDirectory = remoteDirectory; + this.fileCache = fileCache; + transferManager = new TransferManager( + (name, position, length) -> new InputStreamIndexInput( + remoteDirectory.openInput(name, new BlockIOContext(IOContext.DEFAULT, position, length)), + length + ), + fileCache + ); + } + + /** + * Returns names of all files stored in this directory in sorted order + * Does not include locally stored block files (having _block_ in their names) + * + * @throws IOException in case of I/O error + */ + @Override + public String[] listAll() throws IOException { + logger.trace("listAll() called"); + String[] localFiles = localDirectory.listAll(); + logger.trace("Local Directory files : {}", () -> Arrays.toString(localFiles)); + Set allFiles = new HashSet<>(Arrays.asList(localFiles)); + String[] remoteFiles = getRemoteFiles(); + allFiles.addAll(Arrays.asList(remoteFiles)); + logger.trace("Remote Directory files : {}", () -> Arrays.toString(remoteFiles)); + Set localLuceneFiles = allFiles.stream() + .filter(file -> !FileType.isBlockFile(file)) + .collect(Collectors.toUnmodifiableSet()); + String[] files = new String[localLuceneFiles.size()]; + localLuceneFiles.toArray(files); + Arrays.sort(files); + logger.trace("listAll() returns : {}", () -> Arrays.toString(files)); + return files; + } + + /** + * Removes an existing file in the directory. + * Currently deleting only from local directory as files from remote should not be deleted as that is taken care by garbage collection logic of remote directory + * @param name the name of an existing file. + * @throws IOException in case of I/O error + */ + @Override + public void deleteFile(String name) throws IOException { + logger.trace("deleteFile() called {}", name); + if (isTempFile(name)) { + localDirectory.deleteFile(name); + } else { + /* + Not deleting from localDirectory directly since it causes a race condition when the localDirectory deletes a file, and it ends up in pendingDeletion state. + Meanwhile, fileCache on removal deletes the file directly via the Files class and later when the directory tries to delete the files pending for deletion (which happens before creating a new file), it causes NoSuchFileException and new file creation fails + */ + fileCache.remove(localDirectory.getDirectory().resolve(name)); + } + } + + /** + * Returns the byte length of a file in the directory. + * Throws {@link NoSuchFileException} or {@link FileNotFoundException} in case file is not present locally and in remote as well + * @param name the name of an existing file. + * @throws IOException in case of I/O error + */ + @Override + public long fileLength(String name) throws IOException { + logger.trace("fileLength() called {}", name); + long fileLength; + Path key = localDirectory.getDirectory().resolve(name); + if (isTempFile(name) || fileCache.get(key) != null) { + try { + fileLength = localDirectory.fileLength(name); + logger.trace("fileLength from Local {}", fileLength); + } finally { + fileCache.decRef(key); + } + } else { + fileLength = remoteDirectory.fileLength(name); + logger.trace("fileLength from Remote {}", fileLength); + } + return fileLength; + } + + /** + * Creates a new, empty file in the directory and returns an {@link IndexOutput} instance for + * appending data to this file. + * @param name the name of the file to create. + * @throws IOException in case of I/O error + */ + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + logger.trace("createOutput() called {}", name); + // The CloseableFilterIndexOutput will ensure that the file is added to FileCache once write is completed on this file + return new CloseableFilterIndexOutput(localDirectory.createOutput(name, context), name, this::cacheFile); + } + + /** + * Ensures that any writes to these files are moved to stable storage (made durable). + * @throws IOException in case of I/O error + */ + @Override + public void sync(Collection names) throws IOException { + logger.trace("sync() called {}", names); + Collection remoteFiles = Arrays.asList(getRemoteFiles()); + Collection filesToSync = names.stream().filter(name -> remoteFiles.contains(name) == false).collect(Collectors.toList()); + logger.trace("Synced files : {}", filesToSync); + localDirectory.sync(filesToSync); + } + + /** + * Renames {@code source} file to {@code dest} file where {@code dest} must not already exist in + * the directory. + * @throws IOException in case of I/O error + */ + @Override + public void rename(String source, String dest) throws IOException { + logger.trace("rename() called {}, {}", source, dest); + localDirectory.rename(source, dest); + fileCache.remove(localDirectory.getDirectory().resolve(source)); + cacheFile(dest); + fileCache.decRef(localDirectory.getDirectory().resolve(dest)); + } + + /** + * Opens a stream for reading an existing file. + * Check whether the file is present locally or in remote and return the IndexInput accordingly + * @param name the name of an existing file. + * @throws IOException in case of I/O error + */ + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + logger.trace("openInput() called {}", name); + // We aren't tracking temporary files (created via createTempOutput) currently in FileCache as these are created and then deleted + // within a very short span of time + // We will be reading them directory from the local directory + if (isTempFile(name)) { + return localDirectory.openInput(name, context); + } + // Return directly from the FileCache (via TransferManager) if complete file is present + Path key = localDirectory.getDirectory().resolve(name); + CachedIndexInput indexInput = fileCache.get(key); + if (indexInput != null) { + logger.trace("Complete file found in FileCache"); + try { + return indexInput.getIndexInput().clone(); + } finally { + fileCache.decRef(key); + } + } + // If file has been uploaded to the Remote Store, fetch it from the Remote Store in blocks via OnDemandCompositeBlockIndexInput + else { + logger.trace("Complete file not in FileCache, to be fetched in Blocks from Remote {}", name); + RemoteSegmentStoreDirectory.UploadedSegmentMetadata uploadedSegmentMetadata = remoteDirectory.getSegmentsUploadedToRemoteStore().get(name); + // TODO : Refactor FileInfo and OnDemandBlockSnapshotIndexInput to more generic names as they are not Remote Snapshot specific + BlobStoreIndexShardSnapshot.FileInfo fileInfo = new BlobStoreIndexShardSnapshot.FileInfo( + name, + new StoreFileMetadata(name, uploadedSegmentMetadata.getLength(), uploadedSegmentMetadata.getChecksum(), Version.LATEST), + null + ); + return new OnDemandBlockSnapshotIndexInput(fileInfo, localDirectory, transferManager); + } + } + + /** + * Closes the directory + * @throws IOException in case of I/O error + */ + @Override + public void close() throws IOException { + Arrays.stream(localDirectory.listAll()).forEach(f -> fileCache.remove(localDirectory.getDirectory().resolve(f))); + localDirectory.close(); + } + + /** + * Function to perform operations once files have been uploaded to Remote Store + * Currently deleting the local files here, as once uploaded to Remote, local files become eligible for eviction from FileCache + * @param files : recent files which have been successfully uploaded to Remote Store + * @throws IOException in case of I/O error + */ + public void afterSyncToRemote(Collection files) throws IOException { + logger.trace("afterSyncToRemote called for {}", files); + for (String fileName : files) { + /* + Decrementing the refCount here for the path so that it becomes eligible for eviction + This is a temporary solution until pinning support is added + TODO - Unpin the files here from FileCache so that they become eligible for eviction, once pinning/unpinning support is added in FileCache + Uncomment the below commented line(to remove the file from cache once uploaded) to test block based functionality + */ + logger.trace("File {} uploaded to Remote Store and now can be eligible for eviction in FileCache", fileName); + fileCache.decRef(localDirectory.getDirectory().resolve(fileName)); + // fileCache.remove(localDirectory.getDirectory().resolve(fileName)); + } + } + + private boolean isTempFile(String name) { + return name.endsWith(".tmp"); + } + + /** + * Return the list of files present in Remote + */ + private String[] getRemoteFiles() throws IOException { + String[] remoteFiles; + try { + remoteFiles = remoteDirectory.listAll(); + } catch (NullPointerException e) { + /* + We can encounter NPE when no data has been uploaded to remote store yet and as a result the metadata is empty + Empty metadata means that there are no files currently in remote, hence returning an empty list in this scenario + TODO : Catch the NPE in listAll of RemoteSegmentStoreDirectory itself instead of catching here + */ + remoteFiles = new String[0]; + } + return remoteFiles; + } + + private void cacheFile(String name) throws IOException { + Path filePath = localDirectory.getDirectory().resolve(name); + // put will increase the refCount for the path, making sure it is not evicted, will decrease the ref after it is uploaded to Remote + // so that it can be evicted after that + // this is just a temporary solution, will pin the file once support for that is added in FileCache + // TODO : Pin the above filePath in the file cache once pinning support is added so that it cannot be evicted unless it has been + // successfully uploaded to Remote + fileCache.put(filePath, new FullFileCachedIndexInput(fileCache, filePath, localDirectory.openInput(name, IOContext.READ))); + } + +} diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 345583bbbd1be..9ed64501600c1 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -28,8 +28,10 @@ import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.core.action.ActionListener; import org.opensearch.index.store.exception.ChecksumCombinationException; +import org.opensearch.index.store.remote.utils.BlockIOContext; import java.io.FileNotFoundException; import java.io.IOException; @@ -198,10 +200,20 @@ public IndexInput openInput(String name, IOContext context) throws IOException { public IndexInput openInput(String name, long fileLength, IOContext context) throws IOException { InputStream inputStream = null; try { - inputStream = blobContainer.readBlob(name); - return new RemoteIndexInput(name, downloadRateLimiter.apply(inputStream), fileLength); + if (context instanceof BlockIOContext) { + long position = ((BlockIOContext) context).getBlockStart(); + long length = ((BlockIOContext) context).getBlockSize(); + inputStream = blobContainer.readBlob(name, position, length); + // TODO - Explore how we can buffer small chunks of data instead of having the whole 8MB block in memory + byte[] bytes = downloadRateLimiter.apply(inputStream).readAllBytes(); + inputStream.close(); + return new ByteArrayIndexInput(name, bytes); + } else { + inputStream = blobContainer.readBlob(name); + return new RemoteIndexInput(name, downloadRateLimiter.apply(inputStream), fileLength); + } } catch (Exception e) { - // Incase the RemoteIndexInput creation fails, close the input stream to avoid file handler leak. + // In case the RemoteIndexInput creation fails, close the input stream to avoid file handler leak. if (inputStream != null) { try { inputStream.close(); diff --git a/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectoryFactory.java index 7cfa738e75e52..177f0526e7571 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectoryFactory.java @@ -94,7 +94,7 @@ private Future createRemoteSnapshotDirectoryFromSnapsho assert indexShardSnapshot instanceof BlobStoreIndexShardSnapshot : "indexShardSnapshot should be an instance of BlobStoreIndexShardSnapshot"; final BlobStoreIndexShardSnapshot snapshot = (BlobStoreIndexShardSnapshot) indexShardSnapshot; - TransferManager transferManager = new TransferManager(blobContainer, remoteStoreFileCache); + TransferManager transferManager = new TransferManager(blobContainer::readBlob, remoteStoreFileCache); return new RemoteSnapshotDirectory(snapshot, localStoreDir, transferManager); }); } diff --git a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java index 8097fd08da50a..ad56127394779 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java +++ b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java @@ -8,6 +8,8 @@ package org.opensearch.index.store.remote.file; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.IndexInput; import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; @@ -26,6 +28,7 @@ * @opensearch.internal */ public class OnDemandBlockSnapshotIndexInput extends OnDemandBlockIndexInput { + private static final Logger logger = LogManager.getLogger(OnDemandBlockSnapshotIndexInput.class); /** * Where this class fetches IndexInput parts from */ @@ -133,10 +136,19 @@ protected OnDemandBlockSnapshotIndexInput buildSlice(String sliceDescription, lo @Override protected IndexInput fetchBlock(int blockId) throws IOException { - final String blockFileName = fileName + "." + blockId; + logger.trace("fetchBlock called with blockId -> {}", blockId); + final String blockFileName = fileName + "_block_" + blockId; final long blockStart = getBlockStart(blockId); final long blockEnd = blockStart + getActualBlockSize(blockId); + logger.trace( + "File: {} , Block File: {} , BlockStart: {} , BlockEnd: {} , OriginalFileSize: {}", + fileName, + blockFileName, + blockStart, + blockEnd, + originalFileSize + ); // Block may be present on multiple chunks of a file, so we need // to fetch each chunk/blob part separately to fetch an entire block. diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCachedIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCachedIndexInput.java index 7d7c40be3a833..200a47e661ab4 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCachedIndexInput.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCachedIndexInput.java @@ -133,8 +133,9 @@ public FileCachedIndexInput clone() { @Override public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { - // never reach here! - throw new UnsupportedOperationException("FileCachedIndexInput couldn't be sliced."); + IndexInput slicedIndexInput = luceneIndexInput.slice(sliceDescription, offset, length); + cache.incRef(filePath); + return new FileCachedIndexInput(cache, filePath, slicedIndexInput, true); } @Override diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInput.java new file mode 100644 index 0000000000000..f8aed0432cba8 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInput.java @@ -0,0 +1,74 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.filecache; + +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.IndexInput; +import org.opensearch.common.annotation.ExperimentalApi; + +import java.nio.file.Path; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Implementation of the CachedIndexInput for NON_BLOCK files which takes in an IndexInput as parameter + * + * @opensearch.experimental + */ +@ExperimentalApi +public class FullFileCachedIndexInput implements CachedIndexInput { + private final FileCache fileCache; + private final Path path; + private final FileCachedIndexInput fileCachedIndexInput; + private final AtomicBoolean isClosed; + + /** + * Constructor - takes IndexInput as parameter + */ + public FullFileCachedIndexInput(FileCache fileCache, Path path, IndexInput indexInput) { + this.fileCache = fileCache; + this.path = path; + fileCachedIndexInput = new FileCachedIndexInput(fileCache, path, indexInput); + isClosed = new AtomicBoolean(false); + } + + /** + * Returns the wrapped indexInput + */ + @Override + public IndexInput getIndexInput() { + if (isClosed.get()) throw new AlreadyClosedException("Index input is already closed"); + return fileCachedIndexInput; + } + + /** + * Returns the length of the wrapped indexInput + */ + @Override + public long length() { + return fileCachedIndexInput.length(); + } + + /** + * Checks if the wrapped indexInput is closed + */ + @Override + public boolean isClosed() { + return isClosed.get(); + } + + /** + * Closes the wrapped indexInput + */ + @Override + public void close() throws Exception { + if (!isClosed.getAndSet(true)) { + fileCachedIndexInput.close(); + } + } +} diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/BlockIOContext.java b/server/src/main/java/org/opensearch/index/store/remote/utils/BlockIOContext.java new file mode 100644 index 0000000000000..a78dd85d6f194 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/BlockIOContext.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.utils; + +import org.apache.lucene.store.IOContext; +import org.opensearch.common.annotation.ExperimentalApi; + +/** + * BlockIOContext is an extension of IOContext which can be used to pass block related information to the openInput() method of any directory + * + * @opensearch.experimental + */ +@ExperimentalApi +public class BlockIOContext extends IOContext { + + private long blockStart; + private long blockSize; + + /** + * Constructor to initialise BlockIOContext with block related information + */ + public BlockIOContext(IOContext ctx, long blockStart, long blockSize) { + super(ctx.context); + verifyBlockStartAndSize(blockStart, blockSize); + this.blockStart = blockStart; + this.blockSize = blockSize; + } + + /** + * Getter for blockStart + */ + public long getBlockStart() { + return blockStart; + } + + /** + * Getter for blockSize + */ + public long getBlockSize() { + return blockSize; + } + + private void verifyBlockStartAndSize(long blockStart, long blockSize) { + if (blockStart < 0) throw new IllegalArgumentException("blockStart must be greater than or equal to 0"); + if (blockSize <= 0) throw new IllegalArgumentException(("blockSize must be greater than 0")); + } +} diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/FileType.java b/server/src/main/java/org/opensearch/index/store/remote/utils/FileType.java new file mode 100644 index 0000000000000..e340c82ba9ba3 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/FileType.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.utils; + +import org.opensearch.common.annotation.ExperimentalApi; + +/** + * Enum to represent whether a file is block or not + * + * @opensearch.experimental + */ +@ExperimentalApi +public enum FileType { + /** + * Block file + */ + BLOCK(".*_block_.*"), + /** + * Full file - Non-Block + */ + FULL(".*"); + + private final String pattern; + + FileType(String pattern) { + this.pattern = pattern; + } + + /** + * Returns if the fileType is a block file or not + */ + public static boolean isBlockFile(FileType fileType) { + return fileType.equals(FileType.BLOCK); + } + + /** + * Returns if the fileName is block file or not + */ + public static boolean isBlockFile(String fileName) { + if (fileName.matches(FileType.BLOCK.pattern)) return true; + return false; + } +} diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java index 98cad7bfadb09..df26f2f0925f6 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java @@ -12,7 +12,6 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; -import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.index.store.remote.filecache.CachedIndexInput; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.store.remote.filecache.FileCachedIndexInput; @@ -39,11 +38,19 @@ public class TransferManager { private static final Logger logger = LogManager.getLogger(TransferManager.class); - private final BlobContainer blobContainer; + /** + * Functional interface to get an InputStream for a file at a certain offset and size + */ + @FunctionalInterface + public interface StreamReader { + InputStream read(String name, long position, long length) throws IOException; + } + + private final StreamReader streamReader; private final FileCache fileCache; - public TransferManager(final BlobContainer blobContainer, final FileCache fileCache) { - this.blobContainer = blobContainer; + public TransferManager(final StreamReader streamReader, final FileCache fileCache) { + this.streamReader = streamReader; this.fileCache = fileCache; } @@ -55,12 +62,15 @@ public TransferManager(final BlobContainer blobContainer, final FileCache fileCa public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOException { final Path key = blobFetchRequest.getFilePath(); + logger.trace("fetchBlob called for {}", key.toString()); final CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> { if (cachedIndexInput == null || cachedIndexInput.isClosed()) { + logger.trace("Transfer Manager - IndexInput closed or not in cache"); // Doesn't exist or is closed, either way create a new one - return new DelayedCreationCachedIndexInput(fileCache, blobContainer, blobFetchRequest); + return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest); } else { + logger.trace("Transfer Manager - Already in cache"); // already in the cache and ready to be used (open) return cachedIndexInput; } @@ -77,7 +87,7 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio } @SuppressWarnings("removal") - private static FileCachedIndexInput createIndexInput(FileCache fileCache, BlobContainer blobContainer, BlobFetchRequest request) { + private static FileCachedIndexInput createIndexInput(FileCache fileCache, StreamReader streamReader, BlobFetchRequest request) { // We need to do a privileged action here in order to fetch from remote // and write to the local file cache in case this is invoked as a side // effect of a plugin (such as a scripted search) that doesn't have the @@ -85,13 +95,14 @@ private static FileCachedIndexInput createIndexInput(FileCache fileCache, BlobCo return AccessController.doPrivileged((PrivilegedAction) () -> { try { if (Files.exists(request.getFilePath()) == false) { + logger.trace("Fetching from Remote in createIndexInput of Transfer Manager"); try ( OutputStream fileOutputStream = Files.newOutputStream(request.getFilePath()); OutputStream localFileOutputStream = new BufferedOutputStream(fileOutputStream) ) { for (BlobFetchRequest.BlobPart blobPart : request.blobParts()) { try ( - InputStream snapshotFileInputStream = blobContainer.readBlob( + InputStream snapshotFileInputStream = streamReader.read( blobPart.getBlobName(), blobPart.getPosition(), blobPart.getLength() @@ -119,15 +130,15 @@ private static FileCachedIndexInput createIndexInput(FileCache fileCache, BlobCo */ private static class DelayedCreationCachedIndexInput implements CachedIndexInput { private final FileCache fileCache; - private final BlobContainer blobContainer; + private final StreamReader streamReader; private final BlobFetchRequest request; private final CompletableFuture result = new CompletableFuture<>(); private final AtomicBoolean isStarted = new AtomicBoolean(false); private final AtomicBoolean isClosed = new AtomicBoolean(false); - private DelayedCreationCachedIndexInput(FileCache fileCache, BlobContainer blobContainer, BlobFetchRequest request) { + private DelayedCreationCachedIndexInput(FileCache fileCache, StreamReader streamReader, BlobFetchRequest request) { this.fileCache = fileCache; - this.blobContainer = blobContainer; + this.streamReader = streamReader; this.request = request; } @@ -139,7 +150,7 @@ public IndexInput getIndexInput() throws IOException { if (isStarted.getAndSet(true) == false) { // We're the first one here, need to download the block try { - result.complete(createIndexInput(fileCache, blobContainer, request)); + result.complete(createIndexInput(fileCache, streamReader, request)); } catch (Exception e) { result.completeExceptionally(e); fileCache.remove(request.getFilePath()); diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 251be8a990055..3f8a3d20ce12d 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -136,6 +136,7 @@ import org.opensearch.index.shard.IndexingOperationListener; import org.opensearch.index.shard.IndexingStats; import org.opensearch.index.shard.IndexingStats.Stats.DocStatusStats; +import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.translog.InternalTranslogFactory; import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory; import org.opensearch.index.translog.TranslogFactory; @@ -354,6 +355,7 @@ public class IndicesService extends AbstractLifecycleComponent private final BiFunction translogFactorySupplier; private volatile TimeValue clusterDefaultRefreshInterval; private final SearchRequestStats searchRequestStats; + private final FileCache fileCache; @Override protected void doStart() { @@ -388,7 +390,8 @@ public IndicesService( @Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, RecoverySettings recoverySettings, CacheService cacheService, - RemoteStoreSettings remoteStoreSettings + RemoteStoreSettings remoteStoreSettings, + FileCache fileCache ) { this.settings = settings; this.threadPool = threadPool; @@ -495,6 +498,68 @@ protected void closeInternal() { .addSettingsUpdateConsumer(CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING, this::onRefreshIntervalUpdate); this.recoverySettings = recoverySettings; this.remoteStoreSettings = remoteStoreSettings; + this.fileCache = fileCache; + } + + public IndicesService( + Settings settings, + PluginsService pluginsService, + NodeEnvironment nodeEnv, + NamedXContentRegistry xContentRegistry, + AnalysisRegistry analysisRegistry, + IndexNameExpressionResolver indexNameExpressionResolver, + MapperRegistry mapperRegistry, + NamedWriteableRegistry namedWriteableRegistry, + ThreadPool threadPool, + IndexScopedSettings indexScopedSettings, + CircuitBreakerService circuitBreakerService, + BigArrays bigArrays, + ScriptService scriptService, + ClusterService clusterService, + Client client, + MetaStateService metaStateService, + Collection>> engineFactoryProviders, + Map directoryFactories, + ValuesSourceRegistry valuesSourceRegistry, + Map recoveryStateFactories, + IndexStorePlugin.DirectoryFactory remoteDirectoryFactory, + Supplier repositoriesServiceSupplier, + SearchRequestStats searchRequestStats, + @Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, + RecoverySettings recoverySettings, + CacheService cacheService, + RemoteStoreSettings remoteStoreSettings + ) { + this( + settings, + pluginsService, + nodeEnv, + xContentRegistry, + analysisRegistry, + indexNameExpressionResolver, + mapperRegistry, + namedWriteableRegistry, + threadPool, + indexScopedSettings, + circuitBreakerService, + bigArrays, + scriptService, + clusterService, + client, + metaStateService, + engineFactoryProviders, + directoryFactories, + valuesSourceRegistry, + recoveryStateFactories, + remoteDirectoryFactory, + repositoriesServiceSupplier, + searchRequestStats, + remoteStoreStatsTrackerFactory, + recoverySettings, + cacheService, + remoteStoreSettings, + null + ); } /** @@ -873,7 +938,8 @@ private synchronized IndexService createIndexService( directoryFactories, () -> allowExpensiveQueries, indexNameExpressionResolver, - recoveryStateFactories + recoveryStateFactories, + fileCache ); for (IndexingOperationListener operationListener : indexingOperationListeners) { indexModule.addIndexOperationListener(operationListener); @@ -963,7 +1029,8 @@ public synchronized MapperService createIndexMapperService(IndexMetadata indexMe directoryFactories, () -> allowExpensiveQueries, indexNameExpressionResolver, - recoveryStateFactories + recoveryStateFactories, + fileCache ); pluginsService.onIndexModule(indexModule); return indexModule.newIndexMapperService(xContentRegistry, mapperRegistry, scriptService); diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index b06b3e0497cf7..0bc762d1ec6de 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -117,9 +117,13 @@ public void getSegmentFiles( final List toDownloadSegmentNames = new ArrayList<>(); for (StoreFileMetadata fileMetadata : filesToFetch) { String file = fileMetadata.name(); - assert directoryFiles.contains(file) == false : "Local store already contains the file " + file; + assert directoryFiles.contains(file) == false || indexShard.indexSettings().isStoreLocalityPartial() : "Local store already contains the file " + file; toDownloadSegmentNames.add(file); } + if(indexShard.indexSettings().isStoreLocalityPartial()) { + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + return; + } indexShard.getFileDownloader() .downloadAsync( cancellableThreads, diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index af764556b7549..7e9211233e358 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication; +import java.util.Map; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.CorruptIndexException; @@ -169,15 +170,21 @@ public void startReplication(ActionListener listener) { state.setStage(SegmentReplicationState.Stage.REPLICATING); final StepListener checkpointInfoListener = new StepListener<>(); final StepListener getFilesListener = new StepListener<>(); - + Map replicaMd = null; + try { + replicaMd = indexShard.getSegmentMetadataMap(); + } catch (IOException e) { + listener.onFailure(new RuntimeException(e)); + } logger.trace(new ParameterizedMessage("Starting Replication Target: {}", description())); // Get list of files to copy from this checkpoint. state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO); cancellableThreads.checkForCancel(); source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener); + Map finalReplicaMd = replicaMd; checkpointInfoListener.whenComplete(checkpointInfo -> { - final List filesToFetch = getFiles(checkpointInfo); + final List filesToFetch = getFiles(checkpointInfo, finalReplicaMd); state.setStage(SegmentReplicationState.Stage.GET_FILES); cancellableThreads.checkForCancel(); source.getSegmentFiles( @@ -196,31 +203,36 @@ public void startReplication(ActionListener listener) { }, listener::onFailure); } - private List getFiles(CheckpointInfoResponse checkpointInfo) throws IOException { + private List getFiles(CheckpointInfoResponse checkpointInfo, Map finalReplicaMd) throws IOException { cancellableThreads.checkForCancel(); state.setStage(SegmentReplicationState.Stage.FILE_DIFF); - final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), indexShard.getSegmentMetadataMap()); + final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), finalReplicaMd); // local files - final Set localFiles = Set.of(indexShard.store().directory().listAll()); - // set of local files that can be reused - final Set reuseFiles = diff.missing.stream() - .filter(storeFileMetadata -> localFiles.contains(storeFileMetadata.name())) - .filter(this::validateLocalChecksum) - .map(StoreFileMetadata::name) - .collect(Collectors.toSet()); - - final List missingFiles = diff.missing.stream() - .filter(md -> reuseFiles.contains(md.name()) == false) - .collect(Collectors.toList()); + final List missingFiles; + // Skip reuse logic for warm indices + if (indexShard.indexSettings().isStoreLocalityPartial() == true) { + missingFiles = diff.missing; + } else { + final Set localFiles = Set.of(indexShard.store().directory().listAll()); + // set of local files that can be reused + final Set reuseFiles = diff.missing.stream() + .filter(storeFileMetadata -> localFiles.contains(storeFileMetadata.name())) + .filter(this::validateLocalChecksum) + .map(StoreFileMetadata::name) + .collect(Collectors.toSet()); + missingFiles = diff.missing.stream() + .filter(md -> reuseFiles.contains(md.name()) == false) + .collect(Collectors.toList()); - logger.trace( - () -> new ParameterizedMessage( - "Replication diff for checkpoint {} {} {}", - checkpointInfo.getCheckpoint(), - missingFiles, - diff.different - ) - ); + logger.trace( + () -> new ParameterizedMessage( + "Replication diff for checkpoint {} {} {}", + checkpointInfo.getCheckpoint(), + missingFiles, + diff.different + ) + ); + } /* * Segments are immutable. So if the replica has any segments with the same name that differ from the one in the incoming * snapshot from source that means the local copy of the segment has been corrupted/changed in some way and we throw an diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 614f39166ea66..11ba0ff34d1a8 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -840,7 +840,8 @@ protected Node( remoteStoreStatsTrackerFactory, recoverySettings, cacheService, - remoteStoreSettings + remoteStoreSettings, + fileCache ); final IngestService ingestService = new IngestService( @@ -1948,7 +1949,8 @@ DiscoveryNode getNode() { * Else it configures the size to 80% of available capacity for a dedicated search node, if not explicitly defined. */ private void initializeFileCache(Settings settings, CircuitBreaker circuitBreaker) throws IOException { - if (DiscoveryNode.isSearchNode(settings)) { + boolean isWritableRemoteIndexEnabled = FeatureFlags.isEnabled(FeatureFlags.WRITEABLE_REMOTE_INDEX_SETTING); + if (DiscoveryNode.isSearchNode(settings) || isWritableRemoteIndexEnabled) { NodeEnvironment.NodePath fileCacheNodePath = nodeEnvironment.fileCacheNodePath(); long capacity = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings).getBytes(); FsInfo.Path info = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getFSInfo(fileCacheNodePath)); @@ -1957,7 +1959,10 @@ private void initializeFileCache(Settings settings, CircuitBreaker circuitBreake // Initialize default values for cache if NODE_SEARCH_CACHE_SIZE_SETTING is not set. if (capacity == 0) { // If node is not a dedicated search node without configuration, prevent cache initialization - if (DiscoveryNode.getRolesFromSettings(settings).stream().anyMatch(role -> !DiscoveryNodeRole.SEARCH_ROLE.equals(role))) { + if (!isWritableRemoteIndexEnabled + && DiscoveryNode.getRolesFromSettings(settings) + .stream() + .anyMatch(role -> !DiscoveryNodeRole.SEARCH_ROLE.equals(role))) { throw new SettingsException( "Unable to initialize the " + DiscoveryNodeRole.SEARCH_ROLE.roleName() diff --git a/server/src/test/java/org/opensearch/index/store/BaseRemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/BaseRemoteSegmentStoreDirectoryTests.java new file mode 100644 index 0000000000000..ff9b62a341deb --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/BaseRemoteSegmentStoreDirectoryTests.java @@ -0,0 +1,178 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.index.SegmentInfos; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +import static org.opensearch.index.store.RemoteSegmentStoreDirectory.METADATA_FILES_TO_FETCH; +import static org.opensearch.test.RemoteStoreTestUtils.createMetadataFileBytes; +import static org.opensearch.test.RemoteStoreTestUtils.getDummyMetadata; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class BaseRemoteSegmentStoreDirectoryTests extends IndexShardTestCase { + + protected RemoteDirectory remoteDataDirectory; + protected RemoteDirectory remoteMetadataDirectory; + protected RemoteStoreMetadataLockManager mdLockManager; + protected RemoteSegmentStoreDirectory remoteSegmentStoreDirectory; + protected TestUploadListener testUploadTracker; + protected IndexShard indexShard; + protected SegmentInfos segmentInfos; + protected ThreadPool threadPool; + + protected final String metadataFilename = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( + 12, + 23, + 34, + 1, + 1, + "node-1" + ); + + protected final String metadataFilenameDup = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( + 12, + 23, + 34, + 2, + 1, + "node-2" + ); + protected final String metadataFilename2 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( + 12, + 13, + 34, + 1, + 1, + "node-1" + ); + protected final String metadataFilename3 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( + 10, + 38, + 34, + 1, + 1, + "node-1" + ); + protected final String metadataFilename4 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( + 10, + 36, + 34, + 1, + 1, + "node-1" + ); + + public void setupRemoteSegmentStoreDirectory() throws IOException { + remoteDataDirectory = mock(RemoteDirectory.class); + remoteMetadataDirectory = mock(RemoteDirectory.class); + mdLockManager = mock(RemoteStoreMetadataLockManager.class); + threadPool = mock(ThreadPool.class); + testUploadTracker = new TestUploadListener(); + + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService(); + + indexShard = newStartedShard(false, indexSettings, new NRTReplicationEngineFactory()); + remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory( + remoteDataDirectory, + remoteMetadataDirectory, + mdLockManager, + threadPool, + indexShard.shardId() + ); + try (Store store = indexShard.store()) { + segmentInfos = store.readLastCommittedSegmentsInfo(); + } + + when(threadPool.executor(ThreadPool.Names.REMOTE_PURGE)).thenReturn(executorService); + when(threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY)).thenReturn(executorService); + when(threadPool.executor(ThreadPool.Names.SAME)).thenReturn(executorService); + } + + protected Map> populateMetadata() throws IOException { + List metadataFiles = new ArrayList<>(); + + metadataFiles.add(metadataFilename); + metadataFiles.add(metadataFilename2); + metadataFiles.add(metadataFilename3); + + when( + remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, + METADATA_FILES_TO_FETCH + ) + ).thenReturn(List.of(metadataFilename)); + when( + remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, + Integer.MAX_VALUE + ) + ).thenReturn(metadataFiles); + + Map> metadataFilenameContentMapping = Map.of( + metadataFilename, + getDummyMetadata("_0", 1), + metadataFilename2, + getDummyMetadata("_0", 1), + metadataFilename3, + getDummyMetadata("_0", 1) + ); + + when(remoteMetadataDirectory.getBlobStream(metadataFilename)).thenAnswer( + I -> createMetadataFileBytes( + metadataFilenameContentMapping.get(metadataFilename), + indexShard.getLatestReplicationCheckpoint(), + segmentInfos + ) + ); + when(remoteMetadataDirectory.getBlobStream(metadataFilename2)).thenAnswer( + I -> createMetadataFileBytes( + metadataFilenameContentMapping.get(metadataFilename2), + indexShard.getLatestReplicationCheckpoint(), + segmentInfos + ) + ); + when(remoteMetadataDirectory.getBlobStream(metadataFilename3)).thenAnswer( + I -> createMetadataFileBytes( + metadataFilenameContentMapping.get(metadataFilename3), + indexShard.getLatestReplicationCheckpoint(), + segmentInfos + ) + ); + + return metadataFilenameContentMapping; + } + + @After + public void tearDown() throws Exception { + indexShard.close("test tearDown", true, false); + super.tearDown(); + } + +} diff --git a/server/src/test/java/org/opensearch/index/store/CompositeDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/CompositeDirectoryTests.java new file mode 100644 index 0000000000000..64649978129c4 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/CompositeDirectoryTests.java @@ -0,0 +1,189 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; +import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput; +import org.opensearch.index.store.remote.filecache.CachedIndexInput; +import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.index.store.remote.filecache.FullFileCachedIndexInput; +import org.junit.Before; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.startsWith; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) +public class CompositeDirectoryTests extends BaseRemoteSegmentStoreDirectoryTests { + private FileCache fileCache; + private FSDirectory localDirectory; + private CompositeDirectory compositeDirectory; + + @Before + public void setup() throws IOException { + setupRemoteSegmentStoreDirectory(); + localDirectory = mock(FSDirectory.class); + fileCache = mock(FileCache.class); + compositeDirectory = new CompositeDirectory(localDirectory, remoteSegmentStoreDirectory, fileCache); + } + + public void testListAll() throws IOException { + when(localDirectory.listAll()).thenReturn(new String[]{}); + String[] actualFileNames = compositeDirectory.listAll(); + String[] expectedFileNames = new String[] {}; + assertArrayEquals(expectedFileNames, actualFileNames); + + populateMetadata(); + when(localDirectory.listAll()).thenReturn(new String[] { "_1.cfe", "_2.cfe", "_0.cfe_block_7", "_0.cfs_block_7" }); + + actualFileNames = compositeDirectory.listAll(); + expectedFileNames = new String[] { "_0.cfe", "_0.cfs", "_0.si", "_1.cfe", "_2.cfe", "segments_1" }; + assertArrayEquals(expectedFileNames, actualFileNames); + } + + public void testDeleteFile() throws IOException { + Path basePath = mock(Path.class); + Path resolvedPath = mock(Path.class); + when(basePath.resolve(anyString())).thenReturn(resolvedPath); + when(localDirectory.getDirectory()).thenReturn(basePath); + + compositeDirectory.deleteFile("_0.tmp"); + verify(localDirectory).deleteFile("_0.tmp"); + + compositeDirectory.deleteFile("_0.si"); + verify(fileCache).remove(resolvedPath); + } + + public void testFileLength() throws IOException { + populateMetadata(); + remoteSegmentStoreDirectory.init(); + Path basePath = mock(Path.class); + Path resolvedPath = mock(Path.class); + when(basePath.resolve("_0.si")).thenReturn(resolvedPath); + when(localDirectory.getDirectory()).thenReturn(basePath); + when(localDirectory.fileLength("_0.si")).thenReturn(7L); + + // File present locally + CachedIndexInput indexInput = mock(CachedIndexInput.class); + when(fileCache.get(resolvedPath)).thenReturn(indexInput); + assertEquals(compositeDirectory.fileLength("_0.si"), 7L); + verify(localDirectory).fileLength(startsWith("_0.si")); + + // File not present locally + Map uploadedSegments = remoteSegmentStoreDirectory + .getSegmentsUploadedToRemoteStore(); + assertTrue(uploadedSegments.containsKey("_0.si")); + when(fileCache.get(resolvedPath)).thenReturn(null); + assertEquals(compositeDirectory.fileLength("_0.si"), uploadedSegments.get("_0.si").getLength()); + } + + public void testCreateOutput() throws IOException { + IndexOutput indexOutput = mock(IndexOutput.class); + when(localDirectory.createOutput("_0.si", IOContext.DEFAULT)).thenReturn(indexOutput); + IndexOutput actualIndexOutput = compositeDirectory.createOutput("_0.si", IOContext.DEFAULT); + assert actualIndexOutput instanceof CloseableFilterIndexOutput; + verify(localDirectory).createOutput("_0.si", IOContext.DEFAULT); + } + + public void testSync() throws IOException { + populateMetadata(); + remoteSegmentStoreDirectory.init(); + Collection names = List.of("_0.cfe", "_0.cfs", "_1.cfe", "_1.cfs", "_2.nvm", "segments_1"); + compositeDirectory.sync(names); + verify(localDirectory).sync(List.of("_1.cfe", "_1.cfs", "_2.nvm")); + } + + public void testRename() throws IOException { + Path basePath = mock(Path.class); + Path resolvedPathOldFile = mock(Path.class); + Path resolvedPathNewFile = mock(Path.class); + when(basePath.resolve("old_file_name")).thenReturn(resolvedPathOldFile); + when(basePath.resolve("new_file_name")).thenReturn(resolvedPathNewFile); + when(localDirectory.getDirectory()).thenReturn(basePath); + CachedIndexInput indexInput = mock(CachedIndexInput.class); + when(fileCache.get(resolvedPathNewFile)).thenReturn(indexInput); + compositeDirectory.rename("old_file_name", "new_file_name"); + verify(localDirectory).rename("old_file_name", "new_file_name"); + verify(fileCache).remove(resolvedPathOldFile); + verify(fileCache).put(eq(resolvedPathNewFile), any(FullFileCachedIndexInput.class)); + } + + public void testOpenInput() throws IOException { + populateMetadata(); + remoteSegmentStoreDirectory.init(); + Path basePath = mock(Path.class); + Path resolvedPathInCache = mock(Path.class); + Path resolvedPathNotInCache = mock(Path.class); + when(basePath.resolve("_0.si")).thenReturn(resolvedPathInCache); + when(basePath.resolve("_0.cfs")).thenReturn(resolvedPathNotInCache); + when(localDirectory.getDirectory()).thenReturn(basePath); + CachedIndexInput cachedIndexInput = mock(CachedIndexInput.class); + IndexInput localIndexInput = mock(IndexInput.class); + IndexInput indexInput = mock(IndexInput.class); + when(fileCache.get(resolvedPathInCache)).thenReturn(cachedIndexInput); + when(fileCache.compute(eq(resolvedPathInCache), any())).thenReturn(cachedIndexInput); + when(cachedIndexInput.getIndexInput()).thenReturn(indexInput); + when(indexInput.clone()).thenReturn(indexInput); + when(fileCache.get(resolvedPathNotInCache)).thenReturn(null); + + // Temp file, read directly form local directory + when(localDirectory.openInput("_0.tmp", IOContext.DEFAULT)).thenReturn(localIndexInput); + assertEquals(compositeDirectory.openInput("_0.tmp", IOContext.DEFAULT), localIndexInput); + verify(localDirectory).openInput("_0.tmp", IOContext.DEFAULT); + + // File present in file cache + assertEquals(compositeDirectory.openInput("_0.si", IOContext.DEFAULT), indexInput); + + // File present in Remote + IndexInput indexInput1 = compositeDirectory.openInput("_0.cfs", IOContext.DEFAULT); + assert indexInput1 instanceof OnDemandBlockSnapshotIndexInput; + } + + public void testClose() throws IOException { + Path basePath = mock(Path.class); + Path resolvedPath1 = mock(Path.class); + Path resolvedPath2 = mock(Path.class); + when(basePath.resolve("_0.si")).thenReturn(resolvedPath1); + when(basePath.resolve("_0.cfs")).thenReturn(resolvedPath2); + when(localDirectory.getDirectory()).thenReturn(basePath); + when(localDirectory.listAll()).thenReturn(new String[] { "_0.si", "_0.cfs" }); + compositeDirectory.close(); + verify(localDirectory).close(); + verify(fileCache).remove(resolvedPath1); + verify(fileCache).remove(resolvedPath2); + } + + public void testAfterSyncToRemote() throws IOException { + Path basePath = mock(Path.class); + Path resolvedPath = mock(Path.class); + when(basePath.resolve(anyString())).thenReturn(resolvedPath); + when(localDirectory.getDirectory()).thenReturn(basePath); + Collection files = Arrays.asList("_0.si", "_0.cfs"); + compositeDirectory.afterSyncToRemote(files); + verify(fileCache, times(files.size())).decRef(resolvedPath); + } +} diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index b1e2028d761f0..c738156411dd7 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -10,7 +10,6 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFormatTooNewException; @@ -23,34 +22,25 @@ import org.apache.lucene.store.OutputStreamIndexOutput; import org.apache.lucene.tests.util.LuceneTestCase; import org.apache.lucene.util.Version; -import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.UUIDs; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.store.ByteArrayIndexInput; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; -import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; import org.opensearch.index.remote.RemoteStoreEnums.PathType; import org.opensearch.index.remote.RemoteStorePathStrategy; import org.opensearch.index.remote.RemoteStoreUtils; -import org.opensearch.index.shard.IndexShard; -import org.opensearch.index.shard.IndexShardTestCase; -import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; -import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.MockLogAppender; import org.opensearch.test.junit.annotations.TestLogging; import org.opensearch.threadpool.ThreadPool; -import org.junit.After; import org.junit.Before; import java.io.ByteArrayInputStream; @@ -64,7 +54,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -87,95 +76,11 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class RemoteSegmentStoreDirectoryTests extends IndexShardTestCase { - private static final Logger logger = LogManager.getLogger(RemoteSegmentStoreDirectoryTests.class); - private RemoteDirectory remoteDataDirectory; - private RemoteDirectory remoteMetadataDirectory; - private RemoteStoreMetadataLockManager mdLockManager; - - private RemoteSegmentStoreDirectory remoteSegmentStoreDirectory; - private TestUploadListener testUploadTracker; - private IndexShard indexShard; - private SegmentInfos segmentInfos; - private ThreadPool threadPool; - - private final String metadataFilename = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( - 12, - 23, - 34, - 1, - 1, - "node-1" - ); - - private final String metadataFilenameDup = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( - 12, - 23, - 34, - 2, - 1, - "node-2" - ); - private final String metadataFilename2 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( - 12, - 13, - 34, - 1, - 1, - "node-1" - ); - private final String metadataFilename3 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( - 10, - 38, - 34, - 1, - 1, - "node-1" - ); - private final String metadataFilename4 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( - 10, - 36, - 34, - 1, - 1, - "node-1" - ); +public class RemoteSegmentStoreDirectoryTests extends BaseRemoteSegmentStoreDirectoryTests { @Before public void setup() throws IOException { - remoteDataDirectory = mock(RemoteDirectory.class); - remoteMetadataDirectory = mock(RemoteDirectory.class); - mdLockManager = mock(RemoteStoreMetadataLockManager.class); - threadPool = mock(ThreadPool.class); - testUploadTracker = new TestUploadListener(); - - Settings indexSettings = Settings.builder() - .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .build(); - ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService(); - - indexShard = newStartedShard(false, indexSettings, new NRTReplicationEngineFactory()); - remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory( - remoteDataDirectory, - remoteMetadataDirectory, - mdLockManager, - threadPool, - indexShard.shardId() - ); - try (Store store = indexShard.store()) { - segmentInfos = store.readLastCommittedSegmentsInfo(); - } - - when(threadPool.executor(ThreadPool.Names.REMOTE_PURGE)).thenReturn(executorService); - when(threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY)).thenReturn(executorService); - when(threadPool.executor(ThreadPool.Names.SAME)).thenReturn(executorService); - } - - @After - public void tearDown() throws Exception { - indexShard.close("test tearDown", true, false); - super.tearDown(); + setupRemoteSegmentStoreDirectory(); } public void testUploadedSegmentMetadataToString() { @@ -256,60 +161,6 @@ public void testInitMultipleMetadataFile() throws IOException { assertThrows(IllegalStateException.class, () -> remoteSegmentStoreDirectory.init()); } - private Map> populateMetadata() throws IOException { - List metadataFiles = new ArrayList<>(); - - metadataFiles.add(metadataFilename); - metadataFiles.add(metadataFilename2); - metadataFiles.add(metadataFilename3); - - when( - remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( - RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, - METADATA_FILES_TO_FETCH - ) - ).thenReturn(List.of(metadataFilename)); - when( - remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( - RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, - Integer.MAX_VALUE - ) - ).thenReturn(metadataFiles); - - Map> metadataFilenameContentMapping = Map.of( - metadataFilename, - getDummyMetadata("_0", 1), - metadataFilename2, - getDummyMetadata("_0", 1), - metadataFilename3, - getDummyMetadata("_0", 1) - ); - - when(remoteMetadataDirectory.getBlobStream(metadataFilename)).thenAnswer( - I -> createMetadataFileBytes( - metadataFilenameContentMapping.get(metadataFilename), - indexShard.getLatestReplicationCheckpoint(), - segmentInfos - ) - ); - when(remoteMetadataDirectory.getBlobStream(metadataFilename2)).thenAnswer( - I -> createMetadataFileBytes( - metadataFilenameContentMapping.get(metadataFilename2), - indexShard.getLatestReplicationCheckpoint(), - segmentInfos - ) - ); - when(remoteMetadataDirectory.getBlobStream(metadataFilename3)).thenAnswer( - I -> createMetadataFileBytes( - metadataFilenameContentMapping.get(metadataFilename3), - indexShard.getLatestReplicationCheckpoint(), - segmentInfos - ) - ); - - return metadataFilenameContentMapping; - } - public void testInit() throws IOException { populateMetadata(); diff --git a/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java b/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java index a135802c5f49c..c7d0cc0c5b96e 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java @@ -207,7 +207,7 @@ private void initBlockFiles(int blockSize, FSDirectory fsDirectory) { // write 48, -80 alternatively for (int i = 0; i < numOfBlocks; i++) { // create normal blocks - String blockName = BLOCK_FILE_PREFIX + "." + i; + String blockName = BLOCK_FILE_PREFIX + "_block_" + i; IndexOutput output = fsDirectory.createOutput(blockName, null); // since block size is always even number, safe to do division for (int j = 0; j < blockSize / 2; j++) { @@ -221,7 +221,7 @@ private void initBlockFiles(int blockSize, FSDirectory fsDirectory) { if (numOfBlocks > 1 && sizeOfLastBlock != 0) { // create last block - String lastBlockName = BLOCK_FILE_PREFIX + "." + numOfBlocks; + String lastBlockName = BLOCK_FILE_PREFIX + "_block_" + numOfBlocks; IndexOutput output = fsDirectory.createOutput(lastBlockName, null); for (int i = 0; i < sizeOfLastBlock; i++) { if ((i & 1) == 0) { diff --git a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java index 7ae3944eb6944..c0a5ea749b765 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java @@ -60,7 +60,7 @@ public void setUp() throws Exception { directory = new MMapDirectory(createTempDir(), SimpleFSLockFactory.INSTANCE); blobContainer = mock(BlobContainer.class); doAnswer(i -> new ByteArrayInputStream(createData())).when(blobContainer).readBlob(eq("blob"), anyLong(), anyLong()); - transferManager = new TransferManager(blobContainer, fileCache); + transferManager = new TransferManager(blobContainer::readBlob, fileCache); } @After diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index a9f6fdc86155d..603c063a3da47 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -2077,6 +2077,10 @@ protected boolean addMockTransportService() { return true; } + protected boolean addMockIndexStorePlugin() { + return true; + } + /** Returns {@code true} iff this test cluster should use a dummy http transport */ protected boolean addMockHttpTransport() { return true; @@ -2119,7 +2123,7 @@ protected Collection> getMockPlugins() { if (randomBoolean() && addMockTransportService()) { mocks.add(MockTransportService.TestPlugin.class); } - if (randomBoolean()) { + if (randomBoolean() && addMockIndexStorePlugin()) { mocks.add(MockFSIndexStore.TestPlugin.class); } if (randomBoolean()) {