diff --git a/CHANGELOG.md b/CHANGELOG.md index ad70cc8ff2d39..3ba2a79c5fe94 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,11 +18,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Reject Resize index requests (i.e, split, shrink and clone), While DocRep to SegRep migration is in progress.([#12686](https://github.com/opensearch-project/OpenSearch/pull/12686)) - [Remote Store] Add capability of doing refresh as determined by the translog ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992)) - Add support for more than one protocol for transport ([#12967](https://github.com/opensearch-project/OpenSearch/pull/12967)) +- Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868)) ### Dependencies - Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896)) - Bump `asm` from 9.6 to 9.7 ([#12908](https://github.com/opensearch-project/OpenSearch/pull/12908)) -- Bump `net.minidev:json-smart` from 2.5.0 to 2.5.1 ([#12893](https://github.com/opensearch-project/OpenSearch/pull/12893)) +- Bump `net.minidev:json-smart` from 2.5.0 to 2.5.1 ([#12893](https://github.com/opensearch-project/OpenSearch/pull/12893), [#13117](https://github.com/opensearch-project/OpenSearch/pull/13117)) - Bump `netty` from 4.1.107.Final to 4.1.108.Final ([#12924](https://github.com/opensearch-project/OpenSearch/pull/12924)) - Bump `commons-io:commons-io` from 2.15.1 to 2.16.0 ([#12996](https://github.com/opensearch-project/OpenSearch/pull/12996), [#12998](https://github.com/opensearch-project/OpenSearch/pull/12998), [#12999](https://github.com/opensearch-project/OpenSearch/pull/12999)) - Bump `org.apache.commons:commons-compress` from 1.24.0 to 1.26.1 ([#12627](https://github.com/opensearch-project/OpenSearch/pull/12627)) diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/SearchIT.java b/client/rest-high-level/src/test/java/org/opensearch/client/SearchIT.java index f2778a97c0c1a..c1f1cbf1d0e91 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/SearchIT.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/SearchIT.java @@ -54,15 +54,19 @@ import org.opensearch.action.search.SearchScrollRequest; import org.opensearch.client.core.CountRequest; import org.opensearch.client.core.CountResponse; +import org.opensearch.common.geo.ShapeRelation; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.geometry.Rectangle; +import org.opensearch.index.query.GeoShapeQueryBuilder; import org.opensearch.index.query.MatchQueryBuilder; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.query.RangeQueryBuilder; import org.opensearch.index.query.ScriptQueryBuilder; import org.opensearch.index.query.TermsQueryBuilder; import org.opensearch.join.aggregations.Children; @@ -102,6 +106,8 @@ import org.opensearch.search.suggest.Suggest; import org.opensearch.search.suggest.SuggestBuilder; import org.opensearch.search.suggest.phrase.PhraseSuggestionBuilder; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.hamcrest.Matchers; import org.junit.Before; @@ -116,6 +122,7 @@ import java.util.concurrent.TimeUnit; import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.opensearch.index.query.QueryBuilders.geoShapeQuery; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertToXContentEquivalent; import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.both; @@ -764,6 +771,228 @@ public void testSearchWithWeirdScriptFields() throws Exception { } } + public void testSearchWithDerivedFields() throws Exception { + // Just testing DerivedField definition from SearchSourceBuilder derivedField() + // We are not testing the full functionality here + Request doc = new Request("PUT", "test/_doc/1"); + doc.setJsonEntity("{\"field\":\"value\"}"); + client().performRequest(doc); + client().performRequest(new Request("POST", "/test/_refresh")); + // Keyword field + { + SearchRequest searchRequest = new SearchRequest("test").source( + SearchSourceBuilder.searchSource() + .derivedField("result", "keyword", new Script("emit(params._source[\"field\"])")) + .fetchField("result") + .query(new TermsQueryBuilder("result", "value")) + ); + SearchResponse searchResponse = execute(searchRequest, highLevelClient()::search, highLevelClient()::searchAsync); + SearchHit searchHit = searchResponse.getHits().getAt(0); + List values = searchHit.getFields().get("result").getValues(); + assertNotNull(values); + assertEquals(1, values.size()); + assertEquals("value", values.get(0)); + + // multi valued + searchRequest = new SearchRequest("test").source( + SearchSourceBuilder.searchSource() + .derivedField( + "result", + "keyword", + new Script("emit(params._source[\"field\"]);emit(params._source[\"field\"] + \"_2\")") + ) + .query(new TermsQueryBuilder("result", "value_2")) + .fetchField("result") + ); + searchResponse = execute(searchRequest, highLevelClient()::search, highLevelClient()::searchAsync); + searchHit = searchResponse.getHits().getAt(0); + values = searchHit.getFields().get("result").getValues(); + assertNotNull(values); + assertEquals(2, values.size()); + assertEquals("value", values.get(0)); + assertEquals("value_2", values.get(1)); + } + // Boolean field + { + SearchRequest searchRequest = new SearchRequest("test").source( + SearchSourceBuilder.searchSource() + .derivedField("result", "boolean", new Script("emit(((String)params._source[\"field\"]).equals(\"value\"))")) + .query(new TermsQueryBuilder("result", "true")) + .fetchField("result") + ); + SearchResponse searchResponse = execute(searchRequest, highLevelClient()::search, highLevelClient()::searchAsync); + SearchHit searchHit = searchResponse.getHits().getAt(0); + List values = searchHit.getFields().get("result").getValues(); + assertNotNull(values); + assertEquals(1, values.size()); + assertEquals(true, values.get(0)); + } + // Long field + { + SearchRequest searchRequest = new SearchRequest("test").source( + SearchSourceBuilder.searchSource() + .derivedField("result", "long", new Script("emit(Long.MAX_VALUE)")) + .query(new RangeQueryBuilder("result").from(Long.MAX_VALUE - 1).to(Long.MAX_VALUE)) + .fetchField("result") + ); + + SearchResponse searchResponse = execute(searchRequest, highLevelClient()::search, highLevelClient()::searchAsync); + SearchHit searchHit = searchResponse.getHits().getAt(0); + List values = searchHit.getFields().get("result").getValues(); + assertNotNull(values); + assertEquals(1, values.size()); + assertEquals(Long.MAX_VALUE, values.get(0)); + + // multi-valued + searchRequest = new SearchRequest("test").source( + SearchSourceBuilder.searchSource() + .derivedField("result", "long", new Script("emit(Long.MAX_VALUE); emit(Long.MIN_VALUE);")) + .query(new RangeQueryBuilder("result").from(Long.MIN_VALUE).to(Long.MIN_VALUE + 1)) + .fetchField("result") + ); + + searchResponse = execute(searchRequest, highLevelClient()::search, highLevelClient()::searchAsync); + searchHit = searchResponse.getHits().getAt(0); + values = searchHit.getFields().get("result").getValues(); + assertNotNull(values); + assertEquals(2, values.size()); + assertEquals(Long.MAX_VALUE, values.get(0)); + assertEquals(Long.MIN_VALUE, values.get(1)); + } + // Double field + { + SearchRequest searchRequest = new SearchRequest("test").source( + SearchSourceBuilder.searchSource() + .derivedField("result", "double", new Script("emit(Double.MAX_VALUE)")) + .query(new RangeQueryBuilder("result").from(Double.MAX_VALUE - 1).to(Double.MAX_VALUE)) + .fetchField("result") + ); + SearchResponse searchResponse = execute(searchRequest, highLevelClient()::search, highLevelClient()::searchAsync); + SearchHit searchHit = searchResponse.getHits().getAt(0); + List values = searchHit.getFields().get("result").getValues(); + assertNotNull(values); + assertEquals(1, values.size()); + assertEquals(Double.MAX_VALUE, values.get(0)); + + // multi-valued + searchRequest = new SearchRequest("test").source( + SearchSourceBuilder.searchSource() + .derivedField("result", "double", new Script("emit(Double.MAX_VALUE); emit(Double.MIN_VALUE);")) + .query(new RangeQueryBuilder("result").from(Double.MIN_VALUE).to(Double.MIN_VALUE + 1)) + .fetchField("result") + ); + + searchResponse = execute(searchRequest, highLevelClient()::search, highLevelClient()::searchAsync); + searchHit = searchResponse.getHits().getAt(0); + values = searchHit.getFields().get("result").getValues(); + assertNotNull(values); + assertEquals(2, values.size()); + assertEquals(Double.MAX_VALUE, values.get(0)); + assertEquals(Double.MIN_VALUE, values.get(1)); + } + // Date field + { + DateTime date1 = new DateTime(1990, 12, 29, 0, 0, DateTimeZone.UTC); + DateTime date2 = new DateTime(1990, 12, 30, 0, 0, DateTimeZone.UTC); + SearchRequest searchRequest = new SearchRequest("test").source( + SearchSourceBuilder.searchSource() + .derivedField("result", "date", new Script("emit(" + date1.getMillis() + "L)")) + .query(new RangeQueryBuilder("result").from(date1.toString()).to(date2.toString())) + .fetchField("result") + ); + + SearchResponse searchResponse = execute(searchRequest, highLevelClient()::search, highLevelClient()::searchAsync); + SearchHit searchHit = searchResponse.getHits().getAt(0); + List values = searchHit.getFields().get("result").getValues(); + assertNotNull(values); + assertEquals(1, values.size()); + assertEquals(date1.toString(), values.get(0)); + + // multi-valued + searchRequest = new SearchRequest("test").source( + SearchSourceBuilder.searchSource() + .derivedField("result", "date", new Script("emit(" + date1.getMillis() + "L); " + "emit(" + date2.getMillis() + "L)")) + .query(new RangeQueryBuilder("result").from(date1.toString()).to(date2.toString())) + .fetchField("result") + ); + + searchResponse = execute(searchRequest, highLevelClient()::search, highLevelClient()::searchAsync); + searchHit = searchResponse.getHits().getAt(0); + values = searchHit.getFields().get("result").getValues(); + assertNotNull(values); + assertEquals(2, values.size()); + assertEquals(date1.toString(), values.get(0)); + assertEquals(date2.toString(), values.get(1)); + } + // Geo field + { + GeoShapeQueryBuilder qb = geoShapeQuery("result", new Rectangle(-35, 35, 35, -35)); + qb.relation(ShapeRelation.INTERSECTS); + SearchRequest searchRequest = new SearchRequest("test").source( + SearchSourceBuilder.searchSource() + .derivedField("result", "geo_point", new Script("emit(10.0, 20.0)")) + .query(qb) + .fetchField("result") + ); + + SearchResponse searchResponse = execute(searchRequest, highLevelClient()::search, highLevelClient()::searchAsync); + SearchHit searchHit = searchResponse.getHits().getAt(0); + List values = searchHit.getFields().get("result").getValues(); + assertNotNull(values); + assertEquals(1, values.size()); + assertEquals(10.0, ((HashMap) values.get(0)).get("lat")); + assertEquals(20.0, ((HashMap) values.get(0)).get("lon")); + + // multi-valued + searchRequest = new SearchRequest("test").source( + SearchSourceBuilder.searchSource() + .derivedField("result", "geo_point", new Script("emit(10.0, 20.0); emit(20.0, 30.0);")) + .query(qb) + .fetchField("result") + ); + + searchResponse = execute(searchRequest, highLevelClient()::search, highLevelClient()::searchAsync); + searchHit = searchResponse.getHits().getAt(0); + values = searchHit.getFields().get("result").getValues(); + assertNotNull(values); + assertEquals(2, values.size()); + assertEquals(10.0, ((HashMap) values.get(0)).get("lat")); + assertEquals(20.0, ((HashMap) values.get(0)).get("lon")); + assertEquals(20.0, ((HashMap) values.get(1)).get("lat")); + assertEquals(30.0, ((HashMap) values.get(1)).get("lon")); + } + // IP field + { + SearchRequest searchRequest = new SearchRequest("test").source( + SearchSourceBuilder.searchSource().derivedField("result", "ip", new Script("emit(\"10.0.0.1\")")).fetchField("result") + ); + + SearchResponse searchResponse = execute(searchRequest, highLevelClient()::search, highLevelClient()::searchAsync); + SearchHit searchHit = searchResponse.getHits().getAt(0); + List values = searchHit.getFields().get("result").getValues(); + assertNotNull(values); + assertEquals(1, values.size()); + assertEquals("10.0.0.1", values.get(0)); + + // multi-valued + searchRequest = new SearchRequest("test").source( + SearchSourceBuilder.searchSource() + .derivedField("result", "ip", new Script("emit(\"10.0.0.1\"); emit(\"10.0.0.2\");")) + .fetchField("result") + ); + + searchResponse = execute(searchRequest, highLevelClient()::search, highLevelClient()::searchAsync); + searchHit = searchResponse.getHits().getAt(0); + values = searchHit.getFields().get("result").getValues(); + assertNotNull(values); + assertEquals(2, values.size()); + assertEquals("10.0.0.1", values.get(0)); + assertEquals("10.0.0.2", values.get(1)); + + } + + } + public void testSearchScroll() throws Exception { for (int i = 0; i < 100; i++) { XContentBuilder builder = jsonBuilder().startObject().field("field", i).endObject(); diff --git a/plugins/repository-hdfs/build.gradle b/plugins/repository-hdfs/build.gradle index 2c51bb4cbea53..cd7175e70e607 100644 --- a/plugins/repository-hdfs/build.gradle +++ b/plugins/repository-hdfs/build.gradle @@ -81,7 +81,7 @@ dependencies { api 'javax.servlet:servlet-api:2.5' api "org.slf4j:slf4j-api:${versions.slf4j}" api "org.apache.logging.log4j:log4j-slf4j-impl:${versions.log4j}" - api 'net.minidev:json-smart:2.5.0' + api 'net.minidev:json-smart:2.5.1' api "io.netty:netty-all:${versions.netty}" implementation "com.fasterxml.woodstox:woodstox-core:${versions.woodstox}" implementation 'org.codehaus.woodstox:stax2-api:4.2.2' diff --git a/plugins/repository-hdfs/licenses/json-smart-2.5.0.jar.sha1 b/plugins/repository-hdfs/licenses/json-smart-2.5.0.jar.sha1 deleted file mode 100644 index 3ec055efa1255..0000000000000 --- a/plugins/repository-hdfs/licenses/json-smart-2.5.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -57a64f421b472849c40e77d2e7cce3a141b41e99 \ No newline at end of file diff --git a/plugins/repository-hdfs/licenses/json-smart-2.5.1.jar.sha1 b/plugins/repository-hdfs/licenses/json-smart-2.5.1.jar.sha1 new file mode 100644 index 0000000000000..fe23968afce1e --- /dev/null +++ b/plugins/repository-hdfs/licenses/json-smart-2.5.1.jar.sha1 @@ -0,0 +1 @@ +4c11d2808d009132dfbbf947ebf37de6bf266c8e \ No newline at end of file diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/GatewayRecoveryTestUtils.java b/server/src/internalClusterTest/java/org/opensearch/gateway/GatewayRecoveryTestUtils.java index 2b6a5b4ee6867..dc157681be6fa 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/GatewayRecoveryTestUtils.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/GatewayRecoveryTestUtils.java @@ -54,7 +54,7 @@ public static Map prepareRequestMap(String[] indices, ); for (int shardIdNum = 0; shardIdNum < primaryShardCount; shardIdNum++) { final ShardId shardId = new ShardId(index, shardIdNum); - shardIdShardAttributesMap.put(shardId, new ShardAttributes(shardId, customDataPath)); + shardIdShardAttributesMap.put(shardId, new ShardAttributes(customDataPath)); } } return shardIdShardAttributesMap; diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java index f5f9d515f2712..d34a5f4edbaec 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java @@ -12,8 +12,6 @@ import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; -import org.opensearch.action.admin.indices.get.GetIndexRequest; -import org.opensearch.action.admin.indices.get.GetIndexResponse; import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.client.Client; @@ -36,6 +34,7 @@ import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.snapshots.AbstractSnapshotIntegTestCase; import org.opensearch.snapshots.SnapshotInfo; +import org.opensearch.snapshots.SnapshotRestoreException; import org.opensearch.snapshots.SnapshotState; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; @@ -55,7 +54,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS; import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG; @@ -118,7 +116,7 @@ private void assertDocsPresentInIndex(Client client, String indexName, int numOf } } - public void testRestoreOperationsShallowCopyEnabled() throws IOException, ExecutionException, InterruptedException { + public void testRestoreOperationsShallowCopyEnabled() throws Exception { String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); String primary = internalCluster().startDataOnlyNode(); String indexName1 = "testindex1"; @@ -129,8 +127,6 @@ public void testRestoreOperationsShallowCopyEnabled() throws IOException, Execut Path absolutePath1 = randomRepoPath().toAbsolutePath(); logger.info("Snapshot Path [{}]", absolutePath1); String restoredIndexName1 = indexName1 + "-restored"; - String restoredIndexName1Seg = indexName1 + "-restored-seg"; - String restoredIndexName1Doc = indexName1 + "-restored-doc"; String restoredIndexName2 = indexName2 + "-restored"; createRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, true)); @@ -212,60 +208,6 @@ public void testRestoreOperationsShallowCopyEnabled() throws IOException, Execut indexDocuments(client, restoredIndexName1, numDocsInIndex1, numDocsInIndex1 + 2); ensureGreen(restoredIndexName1); assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2); - - // restore index as seg rep enabled with remote store and remote translog disabled - RestoreSnapshotResponse restoreSnapshotResponse3 = client.admin() - .cluster() - .prepareRestoreSnapshot(snapshotRepoName, snapshotName1) - .setWaitForCompletion(false) - .setIgnoreIndexSettings(IndexMetadata.SETTING_REMOTE_STORE_ENABLED) - .setIndices(indexName1) - .setRenamePattern(indexName1) - .setRenameReplacement(restoredIndexName1Seg) - .get(); - assertEquals(restoreSnapshotResponse3.status(), RestStatus.ACCEPTED); - ensureGreen(restoredIndexName1Seg); - - GetIndexResponse getIndexResponse = client.admin() - .indices() - .getIndex(new GetIndexRequest().indices(restoredIndexName1Seg).includeDefaults(true)) - .get(); - indexSettings = getIndexResponse.settings().get(restoredIndexName1Seg); - assertNull(indexSettings.get(SETTING_REMOTE_STORE_ENABLED)); - assertNull(indexSettings.get(SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, null)); - assertEquals(ReplicationType.SEGMENT.toString(), indexSettings.get(IndexMetadata.SETTING_REPLICATION_TYPE)); - assertDocsPresentInIndex(client, restoredIndexName1Seg, numDocsInIndex1); - // indexing some new docs and validating - indexDocuments(client, restoredIndexName1Seg, numDocsInIndex1, numDocsInIndex1 + 2); - ensureGreen(restoredIndexName1Seg); - assertDocsPresentInIndex(client, restoredIndexName1Seg, numDocsInIndex1 + 2); - - // restore index as doc rep based from shallow copy snapshot - RestoreSnapshotResponse restoreSnapshotResponse4 = client.admin() - .cluster() - .prepareRestoreSnapshot(snapshotRepoName, snapshotName1) - .setWaitForCompletion(false) - .setIgnoreIndexSettings(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, IndexMetadata.SETTING_REPLICATION_TYPE) - .setIndices(indexName1) - .setRenamePattern(indexName1) - .setRenameReplacement(restoredIndexName1Doc) - .get(); - assertEquals(restoreSnapshotResponse4.status(), RestStatus.ACCEPTED); - ensureGreen(restoredIndexName1Doc); - - getIndexResponse = client.admin() - .indices() - .getIndex(new GetIndexRequest().indices(restoredIndexName1Doc).includeDefaults(true)) - .get(); - indexSettings = getIndexResponse.settings().get(restoredIndexName1Doc); - assertNull(indexSettings.get(SETTING_REMOTE_STORE_ENABLED)); - assertNull(indexSettings.get(SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, null)); - assertNull(indexSettings.get(IndexMetadata.SETTING_REPLICATION_TYPE)); - assertDocsPresentInIndex(client, restoredIndexName1Doc, numDocsInIndex1); - // indexing some new docs and validating - indexDocuments(client, restoredIndexName1Doc, numDocsInIndex1, numDocsInIndex1 + 2); - ensureGreen(restoredIndexName1Doc); - assertDocsPresentInIndex(client, restoredIndexName1Doc, numDocsInIndex1 + 2); } /** @@ -579,83 +521,6 @@ protected IndexShard getIndexShard(String node, String indexName) { return shardId.map(indexService::getShard).orElse(null); } - public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException { - String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); - String primary = internalCluster().startDataOnlyNode(); - String indexName1 = "testindex1"; - String indexName2 = "testindex2"; - String snapshotRepoName = "test-restore-snapshot-repo"; - String remoteStoreRepo2Name = "test-rs-repo-2" + TEST_REMOTE_STORE_REPO_SUFFIX; - String snapshotName1 = "test-restore-snapshot1"; - Path absolutePath1 = randomRepoPath().toAbsolutePath(); - Path absolutePath3 = randomRepoPath().toAbsolutePath(); - String restoredIndexName1 = indexName1 + "-restored"; - - createRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, false)); - createRepository(remoteStoreRepo2Name, "fs", absolutePath3); - - Client client = client(); - Settings indexSettings = getIndexSettings(1, 0).build(); - createIndex(indexName1, indexSettings); - - Settings indexSettings2 = getIndexSettings(1, 0).build(); - createIndex(indexName2, indexSettings2); - - final int numDocsInIndex1 = 5; - final int numDocsInIndex2 = 6; - indexDocuments(client, indexName1, numDocsInIndex1); - indexDocuments(client, indexName2, numDocsInIndex2); - ensureGreen(indexName1, indexName2); - - internalCluster().startDataOnlyNode(); - - logger.info("--> snapshot"); - SnapshotInfo snapshotInfo1 = createSnapshot( - snapshotRepoName, - snapshotName1, - new ArrayList<>(Arrays.asList(indexName1, indexName2)) - ); - assertThat(snapshotInfo1.successfulShards(), greaterThan(0)); - assertThat(snapshotInfo1.successfulShards(), equalTo(snapshotInfo1.totalShards())); - assertThat(snapshotInfo1.state(), equalTo(SnapshotState.SUCCESS)); - - Settings remoteStoreIndexSettings = Settings.builder() - .put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, remoteStoreRepo2Name) - .build(); - // restore index as a remote store index with different remote store repo - RestoreSnapshotResponse restoreSnapshotResponse = client.admin() - .cluster() - .prepareRestoreSnapshot(snapshotRepoName, snapshotName1) - .setWaitForCompletion(false) - .setIndexSettings(remoteStoreIndexSettings) - .setIndices(indexName1) - .setRenamePattern(indexName1) - .setRenameReplacement(restoredIndexName1) - .get(); - assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED); - ensureGreen(restoredIndexName1); - assertDocsPresentInIndex(client(), restoredIndexName1, numDocsInIndex1); - - // deleting data for restoredIndexName1 and restoring from remote store. - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary)); - // Re-initialize client to make sure we are not using client from stopped node. - client = client(clusterManagerNode); - assertAcked(client.admin().indices().prepareClose(restoredIndexName1)); - client.admin() - .cluster() - .restoreRemoteStore( - new RestoreRemoteStoreRequest().indices(restoredIndexName1).restoreAllShards(true), - PlainActionFuture.newFuture() - ); - ensureYellowAndNoInitializingShards(restoredIndexName1); - ensureGreen(restoredIndexName1); - // indexing some new docs and validating - assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1); - indexDocuments(client, restoredIndexName1, numDocsInIndex1, numDocsInIndex1 + 2); - ensureGreen(restoredIndexName1); - assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2); - } - public void testRestoreShallowSnapshotRepository() throws ExecutionException, InterruptedException { String indexName1 = "testindex1"; String snapshotRepoName = "test-restore-snapshot-repo"; @@ -787,4 +652,98 @@ public void testRestoreShallowSnapshotIndexAfterSnapshot() throws ExecutionExcep assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2); } + public void testInvalidRestoreRequestScenarios() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNode(); + String index = "test-index"; + String snapshotRepo = "test-restore-snapshot-repo"; + String newRemoteStoreRepo = "test-new-rs-repo"; + String snapshotName1 = "test-restore-snapshot1"; + String snapshotName2 = "test-restore-snapshot2"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + String restoredIndex = index + "-restored"; + + createRepository(snapshotRepo, "fs", getRepositorySettings(absolutePath1, true)); + + Client client = client(); + Settings indexSettings = getIndexSettings(1, 0).build(); + createIndex(index, indexSettings); + + final int numDocsInIndex = 5; + indexDocuments(client, index, numDocsInIndex); + ensureGreen(index); + + internalCluster().startDataOnlyNode(); + logger.info("--> snapshot"); + + SnapshotInfo snapshotInfo = createSnapshot(snapshotRepo, snapshotName1, new ArrayList<>(List.of(index))); + assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotInfo.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); + + updateRepository(snapshotRepo, "fs", getRepositorySettings(absolutePath1, false)); + SnapshotInfo snapshotInfo2 = createSnapshot(snapshotRepo, snapshotName2, new ArrayList<>(List.of(index))); + assertThat(snapshotInfo2.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotInfo2.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo2.successfulShards(), equalTo(snapshotInfo2.totalShards())); + + DeleteResponse deleteResponse = client().prepareDelete(index, "0").execute().actionGet(); + assertEquals(deleteResponse.getResult(), DocWriteResponse.Result.DELETED); + indexDocuments(client, index, numDocsInIndex, numDocsInIndex + randomIntBetween(2, 5)); + ensureGreen(index); + + // try index restore with remote store disabled + SnapshotRestoreException exception = expectThrows( + SnapshotRestoreException.class, + () -> client().admin() + .cluster() + .prepareRestoreSnapshot(snapshotRepo, snapshotName1) + .setWaitForCompletion(false) + .setIgnoreIndexSettings(SETTING_REMOTE_STORE_ENABLED) + .setIndices(index) + .setRenamePattern(index) + .setRenameReplacement(restoredIndex) + .get() + ); + assertTrue(exception.getMessage().contains("cannot remove setting [index.remote_store.enabled] on restore")); + + // try index restore with remote store repository modified + Settings remoteStoreIndexSettings = Settings.builder() + .put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, newRemoteStoreRepo) + .build(); + + exception = expectThrows( + SnapshotRestoreException.class, + () -> client().admin() + .cluster() + .prepareRestoreSnapshot(snapshotRepo, snapshotName1) + .setWaitForCompletion(false) + .setIndexSettings(remoteStoreIndexSettings) + .setIndices(index) + .setRenamePattern(index) + .setRenameReplacement(restoredIndex) + .get() + ); + assertTrue(exception.getMessage().contains("cannot modify setting [index.remote_store.segment.repository]" + " on restore")); + + // try index restore with remote store repository and translog store repository disabled + exception = expectThrows( + SnapshotRestoreException.class, + () -> client().admin() + .cluster() + .prepareRestoreSnapshot(snapshotRepo, snapshotName1) + .setWaitForCompletion(false) + .setIgnoreIndexSettings( + IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, + IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY + ) + .setIndices(index) + .setRenamePattern(index) + .setRenameReplacement(restoredIndex) + .get() + ); + assertTrue(exception.getMessage().contains("cannot remove setting [index.remote_store.segment.repository]" + " on restore")); + } + } diff --git a/server/src/internalClusterTest/java/org/opensearch/search/fields/SearchFieldsIT.java b/server/src/internalClusterTest/java/org/opensearch/search/fields/SearchFieldsIT.java index 906d45ef84b3f..2ce96092203e8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/fields/SearchFieldsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/fields/SearchFieldsIT.java @@ -40,6 +40,7 @@ import org.opensearch.common.Numbers; import org.opensearch.common.collect.MapBuilder; import org.opensearch.common.document.DocumentField; +import org.opensearch.common.geo.GeoPoint; import org.opensearch.common.settings.Settings; import org.opensearch.common.time.DateFormatter; import org.opensearch.common.time.DateUtils; @@ -51,6 +52,7 @@ import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.fielddata.ScriptDocValues; +import org.opensearch.index.mapper.DateFieldMapper; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.query.QueryBuilders; import org.opensearch.plugins.Plugin; @@ -189,6 +191,20 @@ protected Map, Object>> pluginScripts() { scripts.put("doc['s']", vars -> docScript(vars, "s")); scripts.put("doc['ms']", vars -> docScript(vars, "ms")); + scripts.put("doc['keyword_field']", vars -> sourceScript(vars, "keyword_field")); + scripts.put("doc['multi_keyword_field']", vars -> sourceScript(vars, "multi_keyword_field")); + scripts.put("doc['long_field']", vars -> sourceScript(vars, "long_field")); + scripts.put("doc['multi_long_field']", vars -> sourceScript(vars, "multi_long_field")); + scripts.put("doc['double_field']", vars -> sourceScript(vars, "double_field")); + scripts.put("doc['multi_double_field']", vars -> sourceScript(vars, "multi_double_field")); + scripts.put("doc['date_field']", vars -> sourceScript(vars, "date_field")); + scripts.put("doc['multi_date_field']", vars -> sourceScript(vars, "multi_date_field")); + scripts.put("doc['ip_field']", vars -> sourceScript(vars, "ip_field")); + scripts.put("doc['multi_ip_field']", vars -> sourceScript(vars, "multi_ip_field")); + scripts.put("doc['boolean_field']", vars -> sourceScript(vars, "boolean_field")); + scripts.put("doc['geo_field']", vars -> sourceScript(vars, "geo_field")); + scripts.put("doc['multi_geo_field']", vars -> sourceScript(vars, "multi_geo_field")); + return scripts; } @@ -1299,6 +1315,147 @@ public void testScriptFields() throws Exception { } } + public void testDerivedFields() throws Exception { + assertAcked( + prepareCreate("index").setMapping( + "keyword_field", + "type=keyword", + "multi_keyword_field", + "type=keyword", + "long_field", + "type=long", + "multi_long_field", + "type=long", + "double_field", + "type=double", + "multi_double_field", + "type=double", + "date_field", + "type=date", + "multi_date_field", + "type=date", + "ip_field", + "type=ip", + "multi_ip_field", + "type=ip", + "boolean_field", + "type=boolean", + "geo_field", + "type=geo_point", + "multi_geo_field", + "type=geo_point" + ).get() + ); + final int numDocs = randomIntBetween(3, 8); + List reqs = new ArrayList<>(); + + DateTime date1 = new DateTime(1990, 12, 29, 0, 0, DateTimeZone.UTC); + DateTime date2 = new DateTime(1990, 12, 30, 0, 0, DateTimeZone.UTC); + + for (int i = 0; i < numDocs; ++i) { + reqs.add( + client().prepareIndex("index") + .setId(Integer.toString(i)) + .setSource( + "keyword_field", + Integer.toString(i), + "multi_keyword_field", + new String[] { Integer.toString(i), Integer.toString(i + 1) }, + "long_field", + (long) i, + "multi_long_field", + new long[] { i, i + 1 }, + "double_field", + (double) i, + "multi_double_field", + new double[] { i, i + 1 }, + "date_field", + date1.getMillis(), + "multi_date_field", + new Long[] { date1.getMillis(), date2.getMillis() }, + "ip_field", + "172.16.1.10", + "multi_ip_field", + new String[] { "172.16.1.10", "172.16.1.11" }, + "boolean_field", + true, + "geo_field", + new GeoPoint(12.0, 10.0), + "multi_geo_field", + new GeoPoint[] { new GeoPoint(12.0, 10.0), new GeoPoint(13.0, 10.0) } + ) + ); + } + indexRandom(true, reqs); + indexRandomForConcurrentSearch("index"); + ensureSearchable(); + SearchRequestBuilder req = client().prepareSearch("index"); + String[][] fieldLookup = new String[][] { + { "keyword_field", "keyword" }, + { "multi_keyword_field", "keyword" }, + { "long_field", "long" }, + { "multi_long_field", "long" }, + { "double_field", "double" }, + { "multi_double_field", "double" }, + { "date_field", "date" }, + { "multi_date_field", "date" }, + { "ip_field", "ip" }, + { "multi_ip_field", "ip" }, + { "boolean_field", "boolean" }, + { "geo_field", "geo_point" }, + { "multi_geo_field", "geo_point" } }; + for (String[] field : fieldLookup) { + req.addDerivedField( + "derived_" + field[0], + field[1], + new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "doc['" + field[0] + "']", Collections.emptyMap()) + ); + } + req.addFetchField("derived_*"); + SearchResponse resp = req.get(); + assertSearchResponse(resp); + for (SearchHit hit : resp.getHits().getHits()) { + final int id = Integer.parseInt(hit.getId()); + Map fields = hit.getFields(); + + assertEquals(fields.get("derived_keyword_field").getValues().get(0), Integer.toString(id)); + assertEquals(fields.get("derived_multi_keyword_field").getValues().get(0), Integer.toString(id)); + assertEquals(fields.get("derived_multi_keyword_field").getValues().get(1), Integer.toString(id + 1)); + + assertEquals(fields.get("derived_long_field").getValues().get(0), id); + assertEquals(fields.get("derived_multi_long_field").getValues().get(0), id); + assertEquals(fields.get("derived_multi_long_field").getValues().get(1), (id + 1)); + + assertEquals(fields.get("derived_double_field").getValues().get(0), (double) id); + assertEquals(fields.get("derived_multi_double_field").getValues().get(0), (double) id); + assertEquals(fields.get("derived_multi_double_field").getValues().get(1), (double) (id + 1)); + + assertEquals( + fields.get("derived_date_field").getValues().get(0), + DateFieldMapper.getDefaultDateTimeFormatter().formatJoda(date1) + ); + assertEquals( + fields.get("derived_multi_date_field").getValues().get(0), + DateFieldMapper.getDefaultDateTimeFormatter().formatJoda(date1) + ); + assertEquals( + fields.get("derived_multi_date_field").getValues().get(1), + DateFieldMapper.getDefaultDateTimeFormatter().formatJoda(date2) + ); + + assertEquals(fields.get("derived_ip_field").getValues().get(0), "172.16.1.10"); + assertEquals(fields.get("derived_multi_ip_field").getValues().get(0), "172.16.1.10"); + assertEquals(fields.get("derived_multi_ip_field").getValues().get(1), "172.16.1.11"); + + assertEquals(fields.get("derived_boolean_field").getValues().get(0), true); + + assertEquals(fields.get("derived_geo_field").getValues().get(0), new GeoPoint(12.0, 10.0)); + assertEquals(fields.get("derived_multi_geo_field").getValues().get(0), new GeoPoint(12.0, 10.0)); + assertEquals(fields.get("derived_multi_geo_field").getValues().get(1), new GeoPoint(13.0, 10.0)); + + } + } + public void testDocValueFieldsWithFieldAlias() throws Exception { XContentBuilder mapping = XContentFactory.jsonBuilder() .startObject() diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java index b019bb57743c9..df1fc9b833171 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java @@ -112,12 +112,16 @@ public void createSnapshot() { } public RestoreSnapshotResponse restoreSnapshotWithSettings(Settings indexSettings) { + return restoreSnapshotWithSettings(indexSettings, RESTORED_INDEX_NAME); + } + + public RestoreSnapshotResponse restoreSnapshotWithSettings(Settings indexSettings, String restoredIndexName) { RestoreSnapshotRequestBuilder builder = client().admin() .cluster() .prepareRestoreSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME) .setWaitForCompletion(false) .setRenamePattern(INDEX_NAME) - .setRenameReplacement(RESTORED_INDEX_NAME); + .setRenameReplacement(restoredIndexName); if (indexSettings != null) { builder.setIndexSettings(indexSettings); } @@ -311,7 +315,8 @@ public void testSnapshotRestoreOnIndexWithSegRepClusterSetting() throws Exceptio * 2. Snapshot index * 3. Add new set of nodes with `cluster.indices.replication.strategy` set to SEGMENT and `cluster.index.restrict.replication.type` * set to true. - * 4. Perform restore on new set of nodes to validate restored index has `DOCUMENT` replication. + * 4. Perform restore on new set of nodes to validate restored index has `SEGMENT` replication. + * 5. Validate that if replication type is passed as DOCUMENT as request parameter, restore operation fails */ public void testSnapshotRestoreOnRestrictReplicationSetting() throws Exception { final int documentCount = scaledRandomIntBetween(1, 10); @@ -337,9 +342,20 @@ public void testSnapshotRestoreOnRestrictReplicationSetting() throws Exception { createSnapshot(); - // Delete index + RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexSegRepSettings(), RESTORED_INDEX_NAME); + assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED); + ensureGreen(RESTORED_INDEX_NAME); + GetSettingsResponse settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME).includeDefaults(true)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, SETTING_REPLICATION_TYPE), ReplicationType.SEGMENT.toString()); + + // Delete indices assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); assertFalse("index [" + INDEX_NAME + "] should have been deleted", indexExists(INDEX_NAME)); + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(RESTORED_INDEX_NAME)).get()); + assertFalse("index [" + RESTORED_INDEX_NAME + "] should have been deleted", indexExists(RESTORED_INDEX_NAME)); // Start new set of nodes with cluster level replication type setting and restrict replication type setting. Settings settings = Settings.builder() @@ -361,7 +377,25 @@ public void testSnapshotRestoreOnRestrictReplicationSetting() throws Exception { // Perform snapshot restore logger.info("--> Performing snapshot restore to target index"); - IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> restoreSnapshotWithSettings(null)); + restoreSnapshotResponse = restoreSnapshotWithSettings(null); + + // Assertions + assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED); + ensureGreen(RESTORED_INDEX_NAME); + settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME).includeDefaults(true)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, SETTING_REPLICATION_TYPE), ReplicationType.SEGMENT.toString()); + + // restore index with cluster default setting + restoreSnapshotWithSettings(restoreIndexSegRepSettings(), RESTORED_INDEX_NAME + "1"); + + // Perform Snapshot Restore with different index name + IllegalArgumentException exception = expectThrows( + IllegalArgumentException.class, + () -> restoreSnapshotWithSettings(restoreIndexDocRepSettings(), RESTORED_INDEX_NAME + "2") + ); assertEquals(REPLICATION_MISMATCH_VALIDATION_ERROR, exception.getMessage()); } } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java b/server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java index 9dac827e7d518..4a547ee2c82bd 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java @@ -363,6 +363,21 @@ public SearchRequestBuilder addScriptField(String name, Script script) { return this; } + /** + * Adds a derived field of a given type. The script provided will be used to derive the value + * of a given type. Thereafter, it can be treated as regular field of a given type to perform + * query on them. + * + * @param name The name of the field to be used in various parts of the query. The name will also represent + * the field value in the return hit. + * @param type The type of derived field. All values emitted by script must be of this type + * @param script The script to use + */ + public SearchRequestBuilder addDerivedField(String name, String type, Script script) { + sourceBuilder().derivedField(name, type, script); + return this; + } + /** * Adds a sort against the given field name and the sort ordering. * diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index 64bea79c9e47b..a180876685f9c 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -993,7 +993,7 @@ static Settings aggregateIndexSettings( * @param clusterSettings cluster level settings * @param combinedTemplateSettings combined template settings which satisfy the index */ - private static void updateReplicationStrategy( + public static void updateReplicationStrategy( Settings.Builder settingsBuilder, Settings requestSettings, Settings clusterSettings, @@ -1008,7 +1008,7 @@ private static void updateReplicationStrategy( final ReplicationType indexReplicationType; if (INDEX_REPLICATION_TYPE_SETTING.exists(requestSettings)) { indexReplicationType = INDEX_REPLICATION_TYPE_SETTING.get(requestSettings); - } else if (INDEX_REPLICATION_TYPE_SETTING.exists(combinedTemplateSettings)) { + } else if (combinedTemplateSettings != null && INDEX_REPLICATION_TYPE_SETTING.exists(combinedTemplateSettings)) { indexReplicationType = INDEX_REPLICATION_TYPE_SETTING.get(combinedTemplateSettings); } else if (CLUSTER_REPLICATION_TYPE_SETTING.exists(clusterSettings)) { indexReplicationType = CLUSTER_REPLICATION_TYPE_SETTING.get(clusterSettings); @@ -1023,20 +1023,20 @@ private static void updateReplicationStrategy( /** * Updates index settings to enable remote store by default based on node attributes * @param settingsBuilder index settings builder to be updated with relevant settings - * @param clusterSettings cluster level settings + * @param nodeSettings node settings */ - private static void updateRemoteStoreSettings(Settings.Builder settingsBuilder, Settings clusterSettings) { - if (isRemoteDataAttributePresent(clusterSettings)) { + public static void updateRemoteStoreSettings(Settings.Builder settingsBuilder, Settings nodeSettings) { + if (RemoteStoreNodeAttribute.isRemoteStoreAttributePresent(nodeSettings)) { settingsBuilder.put(SETTING_REMOTE_STORE_ENABLED, true) .put( SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, - clusterSettings.get( + nodeSettings.get( Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY ) ) .put( SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, - clusterSettings.get( + nodeSettings.get( Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY ) ); diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java new file mode 100644 index 0000000000000..4f39a39cea678 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java @@ -0,0 +1,243 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway; + +import org.apache.logging.log4j.Logger; +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.logging.Loggers; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.store.ShardAttributes; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; + +import reactor.util.annotation.NonNull; + +/** + * Implementation of AsyncShardFetch with batching support. This class is responsible for executing the fetch + * part using the base class {@link AsyncShardFetch}. Other functionalities needed for a batch are only written here. + * This separation also takes care of the extra generic type V which is only needed for batch + * transport actions like {@link TransportNodesListGatewayStartedShardsBatch} and + * {@link org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch}. + * + * @param Response type of the transport action. + * @param Data type of shard level response. + * + * @opensearch.internal + */ +public abstract class AsyncShardBatchFetch extends AsyncShardFetch { + + @SuppressWarnings("unchecked") + AsyncShardBatchFetch( + Logger logger, + String type, + Map shardAttributesMap, + AsyncShardFetch.Lister, T> action, + String batchId, + Class clazz, + V emptyShardResponse, + Predicate emptyShardResponsePredicate, + ShardBatchResponseFactory responseFactory + ) { + super( + logger, + type, + shardAttributesMap, + action, + batchId, + new ShardBatchCache<>( + logger, + type, + shardAttributesMap, + "BatchID=[" + batchId + "]", + clazz, + emptyShardResponse, + emptyShardResponsePredicate, + responseFactory + ) + ); + } + + /** + * Remove a shard from the cache maintaining a full batch of shards. This is needed to clear the shard once it's + * assigned or failed. + * + * @param shardId shardId to be removed from the batch. + */ + public synchronized void clearShard(ShardId shardId) { + this.shardAttributesMap.remove(shardId); + this.cache.deleteShard(shardId); + } + + /** + * Cache implementation of transport actions returning batch of shards related data in the response. + * Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShardsBatch} or + * {@link org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch} with memory efficient caching + * approach. This cache class is not thread safe, all of its methods are being called from + * {@link AsyncShardFetch} class which has synchronized blocks present to handle multiple threads. + * + * @param Response type of transport action. + * @param Data type of shard level response. + */ + static class ShardBatchCache extends AsyncShardFetchCache { + private final Map> cache; + private final Map shardIdToArray; + private final int batchSize; + private final Class shardResponseClass; + private final ShardBatchResponseFactory responseFactory; + private final V emptyResponse; + private final Predicate emptyShardResponsePredicate; + private final Logger logger; + + public ShardBatchCache( + Logger logger, + String type, + Map shardAttributesMap, + String logKey, + Class clazz, + V emptyResponse, + Predicate emptyShardResponsePredicate, + ShardBatchResponseFactory responseFactory + ) { + super(Loggers.getLogger(logger, "_" + logKey), type); + this.batchSize = shardAttributesMap.size(); + this.emptyShardResponsePredicate = emptyShardResponsePredicate; + cache = new HashMap<>(); + shardIdToArray = new HashMap<>(); + fillShardIdKeys(shardAttributesMap.keySet()); + this.shardResponseClass = clazz; + this.emptyResponse = emptyResponse; + this.logger = logger; + this.responseFactory = responseFactory; + } + + @Override + @NonNull + public Map getCache() { + return cache; + } + + @Override + public void deleteShard(ShardId shardId) { + if (shardIdToArray.containsKey(shardId)) { + Integer shardIdIndex = shardIdToArray.remove(shardId); + for (String nodeId : cache.keySet()) { + cache.get(nodeId).clearShard(shardIdIndex); + } + } + } + + @Override + public void initData(DiscoveryNode node) { + cache.put(node.getId(), new NodeEntry<>(node.getId(), shardResponseClass, batchSize, emptyShardResponsePredicate)); + } + + /** + * Put the response received from data nodes into the cache. + * Get shard level data from batch, then filter out if any shards received failures. + * After that complete storing the data at node level and mark fetching as done. + * + * @param node node from which we got the response. + * @param response shard metadata coming from node. + */ + @Override + public void putData(DiscoveryNode node, T response) { + NodeEntry nodeEntry = cache.get(node.getId()); + Map batchResponse = responseFactory.getShardBatchData(response); + nodeEntry.doneFetching(batchResponse, shardIdToArray); + } + + @Override + public T getData(DiscoveryNode node) { + return this.responseFactory.getNewResponse(node, getBatchData(cache.get(node.getId()))); + } + + private HashMap getBatchData(NodeEntry nodeEntry) { + V[] nodeShardEntries = nodeEntry.getData(); + boolean[] emptyResponses = nodeEntry.getEmptyShardResponse(); + HashMap shardData = new HashMap<>(); + for (Map.Entry shardIdEntry : shardIdToArray.entrySet()) { + ShardId shardId = shardIdEntry.getKey(); + Integer arrIndex = shardIdEntry.getValue(); + if (emptyResponses[arrIndex]) { + shardData.put(shardId, emptyResponse); + } else if (nodeShardEntries[arrIndex] != null) { + // ignore null responses here + shardData.put(shardId, nodeShardEntries[arrIndex]); + } + } + return shardData; + } + + private void fillShardIdKeys(Set shardIds) { + int shardIdIndex = 0; + for (ShardId shardId : shardIds) { + this.shardIdToArray.putIfAbsent(shardId, shardIdIndex++); + } + } + + /** + * A node entry, holding the state of the fetched data for a specific shard + * for a giving node. + */ + static class NodeEntry extends BaseNodeEntry { + private final V[] shardData; + private final boolean[] emptyShardResponse; // we can not rely on null entries of the shardData array, + // those null entries means that we need to ignore those entries. Empty responses on the other hand are + // actually needed in allocation/explain API response. So instead of storing full empty response object + // in cache, it's better to just store a boolean and create that object on the fly just before + // decision-making. + private final Predicate emptyShardResponsePredicate; + + NodeEntry(String nodeId, Class clazz, int batchSize, Predicate emptyShardResponsePredicate) { + super(nodeId); + this.shardData = (V[]) Array.newInstance(clazz, batchSize); + this.emptyShardResponse = new boolean[batchSize]; + this.emptyShardResponsePredicate = emptyShardResponsePredicate; + } + + void doneFetching(Map shardDataFromNode, Map shardIdKey) { + fillShardData(shardDataFromNode, shardIdKey); + super.doneFetching(); + } + + void clearShard(Integer shardIdIndex) { + this.shardData[shardIdIndex] = null; + emptyShardResponse[shardIdIndex] = false; + } + + V[] getData() { + return this.shardData; + } + + boolean[] getEmptyShardResponse() { + return emptyShardResponse; + } + + private void fillShardData(Map shardDataFromNode, Map shardIdKey) { + for (Map.Entry shardData : shardDataFromNode.entrySet()) { + if (shardData.getValue() != null) { + ShardId shardId = shardData.getKey(); + if (emptyShardResponsePredicate.test(shardData.getValue())) { + this.emptyShardResponse[shardIdKey.get(shardId)] = true; + this.shardData[shardIdKey.get(shardId)] = null; + } else { + this.shardData[shardIdKey.get(shardId)] = shardData.getValue(); + } + } + } + } + } + } +} diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java index 3d129d4794a10..b664dd573ce67 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java @@ -82,10 +82,10 @@ public interface Lister, N protected final String type; protected final Map shardAttributesMap; private final Lister, T> action; - private final AsyncShardFetchCache cache; + protected final AsyncShardFetchCache cache; private final AtomicLong round = new AtomicLong(); private boolean closed; - private final String reroutingKey; + final String reroutingKey; private final Map> shardToIgnoreNodes = new HashMap<>(); @SuppressWarnings("unchecked") @@ -99,7 +99,7 @@ protected AsyncShardFetch( this.logger = logger; this.type = type; shardAttributesMap = new HashMap<>(); - shardAttributesMap.put(shardId, new ShardAttributes(shardId, customDataPath)); + shardAttributesMap.put(shardId, new ShardAttributes(customDataPath)); this.action = (Lister, T>) action; this.reroutingKey = "ShardId=[" + shardId.toString() + "]"; cache = new ShardCache<>(logger, reroutingKey, type); @@ -120,14 +120,15 @@ protected AsyncShardFetch( String type, Map shardAttributesMap, Lister, T> action, - String batchId + String batchId, + AsyncShardFetchCache cache ) { this.logger = logger; this.type = type; this.shardAttributesMap = shardAttributesMap; this.action = (Lister, T>) action; this.reroutingKey = "BatchID=[" + batchId + "]"; - cache = new ShardCache<>(logger, reroutingKey, type); + this.cache = cache; } @Override diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java index 3140ceef4f3ee..2a4e6181467b0 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java @@ -48,6 +48,7 @@ * @opensearch.internal */ public abstract class AsyncShardFetchCache { + private final Logger logger; private final String type; diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java index 8d222903b6f29..1979f33484d49 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java @@ -15,6 +15,7 @@ import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.gateway.AsyncShardFetch.FetchResult; +import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard; import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShard; import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch; @@ -132,9 +133,7 @@ private static List adaptToNodeShardStates( // build data for a shard from all the nodes nodeResponses.forEach((node, nodeGatewayStartedShardsBatch) -> { - TransportNodesGatewayStartedShardHelper.GatewayStartedShard shardData = nodeGatewayStartedShardsBatch - .getNodeGatewayStartedShardsBatch() - .get(unassignedShard.shardId()); + GatewayStartedShard shardData = nodeGatewayStartedShardsBatch.getNodeGatewayStartedShardsBatch().get(unassignedShard.shardId()); nodeShardStates.add( new NodeGatewayStartedShard( shardData.allocationId(), diff --git a/server/src/main/java/org/opensearch/gateway/ShardBatchResponseFactory.java b/server/src/main/java/org/opensearch/gateway/ShardBatchResponseFactory.java new file mode 100644 index 0000000000000..4b85ef995f1e1 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/ShardBatchResponseFactory.java @@ -0,0 +1,51 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway; + +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard; +import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch; + +import java.util.Map; + +/** + * A factory class to create new responses of batch transport actions like + * {@link TransportNodesListGatewayStartedShardsBatch} or {@link org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch} + * + * @param Node level response returned by batch transport actions. + * @param Shard level metadata returned by batch transport actions. + */ +public class ShardBatchResponseFactory { + private final boolean primary; + + public ShardBatchResponseFactory(boolean primary) { + this.primary = primary; + } + + public T getNewResponse(DiscoveryNode node, Map shardData) { + if (primary) { + return (T) new NodeGatewayStartedShardsBatch(node, (Map) shardData); + } else { + return (T) new NodeStoreFilesMetadataBatch(node, (Map) shardData); + } + } + + public Map getShardBatchData(T response) { + if (primary) { + return (Map) ((NodeGatewayStartedShardsBatch) response).getNodeGatewayStartedShardsBatch(); + } else { + return (Map) ((NodeStoreFilesMetadataBatch) response).getNodeStoreFilesMetadataBatch(); + } + } + +} diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java b/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java index 27cce76b1b694..2ddae1d8410c9 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java @@ -42,6 +42,8 @@ * @opensearch.internal */ public class TransportNodesGatewayStartedShardHelper { + public static final String INDEX_NOT_FOUND = "node doesn't have meta data for index"; + public static GatewayStartedShard getShardInfoOnLocalNode( Logger logger, final ShardId shardId, @@ -72,7 +74,7 @@ public static GatewayStartedShard getShardInfoOnLocalNode( customDataPath = new IndexSettings(metadata, settings).customDataPath(); } else { logger.trace("{} node doesn't have meta data for the requests index", shardId); - throw new OpenSearchException("node doesn't have meta data for index " + shardId.getIndex()); + throw new OpenSearchException(INDEX_NOT_FOUND + " " + shardId.getIndex()); } } // we don't have an open shard on the store, validate the files on disk are openable @@ -230,6 +232,13 @@ public String toString() { buf.append("]"); return buf.toString(); } + + public static boolean isEmpty(GatewayStartedShard gatewayStartedShard) { + return gatewayStartedShard.allocationId() == null + && gatewayStartedShard.primary() == false + && gatewayStartedShard.storeException() == null + && gatewayStartedShard.replicationCheckpoint() == null; + } } /** diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java index dc5d85b17bc32..89362988b4d85 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java @@ -8,7 +8,6 @@ package org.opensearch.gateway; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionType; import org.opensearch.action.FailedNodeException; import org.opensearch.action.support.ActionFilters; @@ -27,7 +26,6 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.NodeEnvironment; -import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard; import org.opensearch.indices.IndicesService; import org.opensearch.indices.store.ShardAttributes; import org.opensearch.threadpool.ThreadPool; @@ -40,6 +38,8 @@ import java.util.Map; import java.util.Objects; +import static org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard; +import static org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.INDEX_NOT_FOUND; import static org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.getShardInfoOnLocalNode; /** @@ -136,8 +136,10 @@ protected NodesGatewayStartedShardsBatch newResponse( @Override protected NodeGatewayStartedShardsBatch nodeOperation(NodeRequest request) { Map shardsOnNode = new HashMap<>(); - for (ShardAttributes shardAttr : request.shardAttributes.values()) { - final ShardId shardId = shardAttr.getShardId(); + // NOTE : If we ever change this for loop to run in parallel threads, we should re-visit the exception + // handling in AsyncShardBatchFetch class. + for (Map.Entry shardAttr : request.shardAttributes.entrySet()) { + final ShardId shardId = shardAttr.getKey(); try { shardsOnNode.put( shardId, @@ -147,16 +149,19 @@ protected NodeGatewayStartedShardsBatch nodeOperation(NodeRequest request) { namedXContentRegistry, nodeEnv, indicesService, - shardAttr.getCustomDataPath(), + shardAttr.getValue().getCustomDataPath(), settings, clusterService ) ); } catch (Exception e) { - shardsOnNode.put( - shardId, - new GatewayStartedShard(null, false, null, new OpenSearchException("failed to load started shards", e)) - ); + // should return null in case of known exceptions being returned from getShardInfoOnLocalNode method. + if (e instanceof IllegalStateException || e.getMessage().contains(INDEX_NOT_FOUND) || e instanceof IOException) { + shardsOnNode.put(shardId, null); + } else { + // return actual exception as it is for unknown exceptions + shardsOnNode.put(shardId, new GatewayStartedShard(null, false, null, e)); + } } } return new NodeGatewayStartedShardsBatch(clusterService.localNode(), shardsOnNode); @@ -264,13 +269,26 @@ public Map getNodeGatewayStartedShardsBatch() { public NodeGatewayStartedShardsBatch(StreamInput in) throws IOException { super(in); - this.nodeGatewayStartedShardsBatch = in.readMap(ShardId::new, GatewayStartedShard::new); + this.nodeGatewayStartedShardsBatch = in.readMap(ShardId::new, i -> { + if (i.readBoolean()) { + return new GatewayStartedShard(i); + } else { + return null; + } + }); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeMap(nodeGatewayStartedShardsBatch, (o, k) -> k.writeTo(o), (o, v) -> v.writeTo(o)); + out.writeMap(nodeGatewayStartedShardsBatch, (o, k) -> k.writeTo(o), (o, v) -> { + if (v != null) { + o.writeBoolean(true); + v.writeTo(o); + } else { + o.writeBoolean(false); + } + }); } public NodeGatewayStartedShardsBatch(DiscoveryNode node, Map nodeGatewayStartedShardsBatch) { diff --git a/server/src/main/java/org/opensearch/index/mapper/DerivedField.java b/server/src/main/java/org/opensearch/index/mapper/DerivedField.java new file mode 100644 index 0000000000000..7ebe4e5f0b0e8 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/mapper/DerivedField.java @@ -0,0 +1,90 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.mapper; + +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.script.Script; + +import java.io.IOException; +import java.util.Objects; + +/** + * DerivedField representation: expects a name, type and script. + */ +@PublicApi(since = "2.14.0") +public class DerivedField implements Writeable, ToXContentFragment { + + private final String name; + private final String type; + private final Script script; + + public DerivedField(String name, String type, Script script) { + this.name = name; + this.type = type; + this.script = script; + } + + public DerivedField(StreamInput in) throws IOException { + name = in.readString(); + type = in.readString(); + script = new Script(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(name); + out.writeString(type); + script.writeTo(out); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(name); + builder.field("type", type); + builder.field("script", script); + builder.endObject(); + return builder; + } + + public String getName() { + return name; + } + + public String getType() { + return type; + } + + public Script getScript() { + return script; + } + + @Override + public int hashCode() { + return Objects.hash(name, type, script); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + DerivedField other = (DerivedField) obj; + return Objects.equals(name, other.name) && Objects.equals(type, other.type) && Objects.equals(script, other.script); + } + +} diff --git a/server/src/main/java/org/opensearch/index/mapper/DerivedFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/DerivedFieldMapper.java index b448487a4f810..c6ae71320c35c 100644 --- a/server/src/main/java/org/opensearch/index/mapper/DerivedFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/DerivedFieldMapper.java @@ -14,7 +14,9 @@ import java.io.IOException; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.function.Function; /** @@ -37,7 +39,7 @@ private static DerivedFieldMapper toType(FieldMapper in) { */ public static class Builder extends ParametrizedFieldMapper.Builder { // TODO: The type of parameter may change here if the actual underlying FieldType object is needed - private final Parameter type = Parameter.stringParam("type", false, m -> toType(m).type, "text"); + private final Parameter type = Parameter.stringParam("type", false, m -> toType(m).type, ""); private final Parameter