From 96f19c27c6f676898f3dadbb9e4243ef9a68bdb5 Mon Sep 17 00:00:00 2001 From: Shreyansh Ray Date: Sat, 7 Dec 2024 02:48:56 +0530 Subject: [PATCH] Derived Source POC Signed-off-by: Shreyansh Ray --- .../org/opensearch/index/IndexService.java | 3 +- .../fielddata/IndexFieldDataService.java | 2 + .../opensearch/index/get/ShardGetService.java | 139 ++++++++++++++++++ .../index/mapper/NumberFieldMapper.java | 4 + .../opensearch/index/shard/IndexShard.java | 10 +- .../opensearch/search/fetch/FetchPhase.java | 139 +++++++++++++++++- 6 files changed, 293 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 83a53a48ec936..ef9eff9c7258b 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -769,7 +769,8 @@ protected void closeInternal() { recoverySettings, remoteStoreSettings, seedRemote, - discoveryNodes + discoveryNodes, + indexFieldData ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/fielddata/IndexFieldDataService.java b/server/src/main/java/org/opensearch/index/fielddata/IndexFieldDataService.java index 98900482176e5..fa11cc19c6782 100644 --- a/server/src/main/java/org/opensearch/index/fielddata/IndexFieldDataService.java +++ b/server/src/main/java/org/opensearch/index/fielddata/IndexFieldDataService.java @@ -34,6 +34,7 @@ import org.apache.lucene.util.Accountable; import org.opensearch.ExceptionsHelper; +import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.core.index.shard.ShardId; @@ -59,6 +60,7 @@ * * @opensearch.internal */ +@PublicApi(since = "1.0.0") public class IndexFieldDataService extends AbstractIndexComponent implements Closeable { public static final String FIELDDATA_CACHE_VALUE_NODE = "node"; public static final String FIELDDATA_CACHE_KEY = "index.fielddata.cache"; diff --git a/server/src/main/java/org/opensearch/index/get/ShardGetService.java b/server/src/main/java/org/opensearch/index/get/ShardGetService.java index d4eeb8aae8e24..6264418fbd94d 100644 --- a/server/src/main/java/org/opensearch/index/get/ShardGetService.java +++ b/server/src/main/java/org/opensearch/index/get/ShardGetService.java @@ -37,10 +37,14 @@ import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.IndexableFieldType; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.index.StoredFieldVisitor; import org.apache.lucene.index.Term; import org.apache.lucene.index.VectorEncoding; import org.apache.lucene.index.VectorSimilarityFunction; +import org.apache.lucene.util.BytesRef; import org.opensearch.OpenSearchException; import org.opensearch.common.Nullable; import org.opensearch.common.annotation.PublicApi; @@ -51,21 +55,28 @@ import org.opensearch.common.metrics.CounterMetric; import org.opensearch.common.metrics.MeanMetric; import org.opensearch.common.util.set.Sets; +import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.common.xcontent.XContentType; import org.opensearch.common.xcontent.support.XContentMapValues; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.IndexSettings; import org.opensearch.index.VersionType; import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.TranslogLeafReader; +import org.opensearch.index.fielddata.LeafNumericFieldData; +import org.opensearch.index.fielddata.SortedNumericDoubleValues; import org.opensearch.index.fieldvisitor.CustomFieldsVisitor; import org.opensearch.index.fieldvisitor.FieldsVisitor; import org.opensearch.index.mapper.DocumentMapper; +import org.opensearch.index.mapper.FieldMapper; import org.opensearch.index.mapper.IdFieldMapper; import org.opensearch.index.mapper.Mapper; import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.mapper.MetadataFieldMapper; +import org.opensearch.index.mapper.NumberFieldMapper; import org.opensearch.index.mapper.ParsedDocument; import org.opensearch.index.mapper.RoutingFieldMapper; import org.opensearch.index.mapper.SourceFieldMapper; @@ -73,14 +84,17 @@ import org.opensearch.index.mapper.Uid; import org.opensearch.index.shard.AbstractIndexShardComponent; import org.opensearch.index.shard.IndexShard; +import org.opensearch.search.DocValueFormat; import org.opensearch.search.fetch.subphase.FetchSourceContext; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; @@ -415,6 +429,18 @@ private GetResult innerGetLoadFromStoredFields( } } + try { + Map sourceAsMap = buildUsingDocValues(docIdAndVersion.docId, docIdAndVersion.reader, mapperService, indexShard); + sourceAsMap = unflatten(sourceAsMap); + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + builder.map(sourceAsMap); + source = BytesReference.bytes(builder); + } + } catch (IOException ex) { + throw new RuntimeException(ex); + } + + return new GetResult( shardId.getIndexName(), id, @@ -428,6 +454,41 @@ private GetResult innerGetLoadFromStoredFields( ); } + private static Map unflatten(Map flattened) { + Map unflattened = new HashMap<>(); + for (String key : flattened.keySet()) { + doUnflatten(flattened, unflattened, key, flattened.get(key)); + } + return unflattened; + } + + private static Map doUnflatten( + Map flattened, + Map unflattened, + String key, + Object value) { + + String[] parts = key.split("\\."); + for (int i = 0; i < parts.length; i++) { + String part = parts[i]; + Object current = flattened.get(part); + if (i == (parts.length - 1)) { + unflattened.put(part, value); + } else if (current == null) { + if ((current = unflattened.get(part)) == null) { + current = new HashMap<>(); + } + unflattened.put(part, current); + unflattened = (Map) current; + } else if (current instanceof Map) { + unflattened.put(part, current); + unflattened = (Map) current; + } + } + return unflattened; + } + + private static FieldsVisitor buildFieldsVisitors(String[] fields, FetchSourceContext fetchSourceContext) { if (fields == null || fields.length == 0) { return fetchSourceContext.fetchSource() ? new FieldsVisitor(true) : null; @@ -435,4 +496,82 @@ private static FieldsVisitor buildFieldsVisitors(String[] fields, FetchSourceCon return new CustomFieldsVisitor(Sets.newHashSet(fields), fetchSourceContext.fetchSource()); } + + private static Map buildUsingDocValues(int docId, LeafReader reader, MapperService mapperService, IndexShard indexShard) throws IOException { + Map docValues = new HashMap<>(); + for (Mapper mapper: mapperService.documentMapper().mappers()) { + if (mapper instanceof MetadataFieldMapper) { + continue; + } + mapper.name(); + if (mapper instanceof FieldMapper) { + FieldMapper fieldMapper = (FieldMapper) mapper; + if (fieldMapper.fieldType().hasDocValues()) { + String fieldName = fieldMapper.name(); + FieldInfo fieldInfo = reader.getFieldInfos().fieldInfo(fieldName); + DocValueFormat format = fieldMapper.fieldType().docValueFormat(null, null); + if (fieldInfo != null) { + switch (fieldInfo.getDocValuesType()) { + case SORTED_SET: + SortedSetDocValues dv = reader.getSortedSetDocValues(fieldName); + if (dv.advanceExact(docId)) { + BytesRef[] values = new BytesRef[dv.docValueCount()]; + for (int i = 0; i < dv.docValueCount(); i++) { + values[i] = dv.lookupOrd(dv.nextOrd()); + } + if (values.length > 1) { + docValues.put(fieldName, Arrays.stream(values).map(format::format).collect(Collectors.toList())); + } else { + docValues.put(fieldName, format.format(values[0])); + } + } + break; + case SORTED_NUMERIC: + SortedNumericDocValues sndv = reader.getSortedNumericDocValues(fieldName); + if (fieldMapper instanceof NumberFieldMapper) { + NumberFieldMapper.NumberType numberType = ((NumberFieldMapper) fieldMapper).getType(); + switch (numberType) { + case HALF_FLOAT: + case FLOAT: + case DOUBLE: + SortedNumericDoubleValues doubleValues = ((LeafNumericFieldData) indexShard.indexFieldDataService().getForField(fieldMapper.fieldType(), "", () -> null) + .load(reader.getContext())).getDoubleValues(); + if (doubleValues.advanceExact(docId)) { + int size = doubleValues.docValueCount(); + double[] vals = new double[size]; + for (int i = 0; i < size; i++) { + vals[i] = doubleValues.nextValue(); + } + if (size > 1) { + docValues.put(fieldName, vals); + } else { + docValues.put(fieldName, vals[0]); + } + } + break; + case INTEGER: + case LONG: + case UNSIGNED_LONG: + if (sndv.advanceExact(docId)) { + int size = sndv.docValueCount(); + long[] vals = new long[size]; + for (int i = 0; i < size; i++) { + vals[i] = sndv.nextValue(); + } + if (size > 1) { + docValues.put(fieldName, vals); + } else { + docValues.put(fieldName, vals[0]); + } + } + } + } + break; + } + } + } + } + } + return docValues; + } } diff --git a/server/src/main/java/org/opensearch/index/mapper/NumberFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/NumberFieldMapper.java index 43e975f95757b..f79cbe8fcf152 100644 --- a/server/src/main/java/org/opensearch/index/mapper/NumberFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/NumberFieldMapper.java @@ -1817,4 +1817,8 @@ protected void parseCreateField(ParseContext context) throws IOException { public ParametrizedFieldMapper.Builder getMergeBuilder() { return new Builder(simpleName(), type, ignoreMalformedByDefault, coerceByDefault).init(this); } + + public NumberType getType() { + return type; + } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index baa1351f15cda..2feffe19c5861 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -135,6 +135,7 @@ import org.opensearch.index.engine.Segment; import org.opensearch.index.engine.SegmentsStats; import org.opensearch.index.fielddata.FieldDataStats; +import org.opensearch.index.fielddata.IndexFieldDataService; import org.opensearch.index.fielddata.ShardFieldData; import org.opensearch.index.flush.FlushStats; import org.opensearch.index.get.GetStats; @@ -363,6 +364,7 @@ Runnable getGlobalCheckpointSyncer() { */ private final ShardMigrationState shardMigrationState; private DiscoveryNodes discoveryNodes; + private final IndexFieldDataService indexFieldDataService; public IndexShard( final ShardRouting shardRouting, @@ -393,7 +395,8 @@ public IndexShard( final RecoverySettings recoverySettings, final RemoteStoreSettings remoteStoreSettings, boolean seedRemote, - final DiscoveryNodes discoveryNodes + final DiscoveryNodes discoveryNodes, + final IndexFieldDataService indexFieldDataService ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -495,6 +498,11 @@ public boolean shouldCache(Query query) { this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings); this.shardMigrationState = getShardMigrationState(indexSettings, seedRemote); this.discoveryNodes = discoveryNodes; + this.indexFieldDataService = indexFieldDataService; + } + + public IndexFieldDataService indexFieldDataService() { + return indexFieldDataService; } public ThreadPool getThreadPool() { diff --git a/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java b/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java index 1698f41caaf2b..f3ba6983d6fed 100644 --- a/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java +++ b/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java @@ -34,8 +34,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.ReaderUtil; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreMode; @@ -43,26 +47,38 @@ import org.apache.lucene.search.TotalHits; import org.apache.lucene.search.Weight; import org.apache.lucene.util.BitSet; +import org.apache.lucene.util.BytesRef; import org.opensearch.common.CheckedBiConsumer; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.collect.Tuple; import org.opensearch.common.document.DocumentField; import org.opensearch.common.lucene.index.SequentialStoredFieldsLeafReader; import org.opensearch.common.lucene.search.Queries; +import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.common.xcontent.XContentType; import org.opensearch.common.xcontent.support.XContentMapValues; +import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.text.Text; import org.opensearch.core.tasks.TaskCancelledException; import org.opensearch.core.xcontent.MediaType; +import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.IndexSettings; +import org.opensearch.index.fielddata.LeafNumericFieldData; +import org.opensearch.index.fielddata.SortedNumericDoubleValues; import org.opensearch.index.fieldvisitor.CustomFieldsVisitor; import org.opensearch.index.fieldvisitor.FieldsVisitor; import org.opensearch.index.mapper.DocumentMapper; +import org.opensearch.index.mapper.FieldMapper; import org.opensearch.index.mapper.MappedFieldType; +import org.opensearch.index.mapper.Mapper; import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.mapper.MetadataFieldMapper; +import org.opensearch.index.mapper.NumberFieldMapper; import org.opensearch.index.mapper.ObjectMapper; import org.opensearch.index.mapper.SourceFieldMapper; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.search.DocValueFormat; import org.opensearch.search.SearchContextSourcePrinter; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; @@ -86,6 +102,7 @@ import java.util.Map; import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; import static java.util.Collections.emptyMap; @@ -349,13 +366,131 @@ private HitContext prepareNonNestedHitContext( } HitContext hitContext = new HitContext(hit, subReaderContext, subDocId, lookup.source()); - if (fieldsVisitor.source() != null) { - hitContext.sourceLookup().setSource(fieldsVisitor.source()); + //if (fieldsVisitor.source() != null) { + hitContext.sourceLookup().setSource(fieldsVisitor.source()); + Map sourceAsMap = buildUsingDocValues(docId, subReaderContext.reader(), context.mapperService(), context.indexShard()); + sourceAsMap = unflatten(sourceAsMap); + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + builder.map(sourceAsMap); + hitContext.sourceLookup().setSource(BytesReference.bytes(builder)); } + //} return hitContext; } } + private static Map unflatten(Map flattened) { + Map unflattened = new HashMap<>(); + for (String key : flattened.keySet()) { + doUnflatten(flattened, unflattened, key, flattened.get(key)); + } + return unflattened; + } + + private static Map doUnflatten( + Map flattened, + Map unflattened, + String key, + Object value) { + + String[] parts = key.split("\\."); + for (int i = 0; i < parts.length; i++) { + String part = parts[i]; + Object current = flattened.get(part); + if (i == (parts.length - 1)) { + unflattened.put(part, value); + } else if (current == null) { + if ((current = unflattened.get(part)) == null) { + current = new HashMap<>(); + } + unflattened.put(part, current); + unflattened = (Map) current; + } else if (current instanceof Map) { + unflattened.put(part, current); + unflattened = (Map) current; + } + } + return unflattened; + } + + private static Map buildUsingDocValues(int docId, LeafReader reader, MapperService mapperService, IndexShard indexShard) throws IOException { + Map docValues = new HashMap<>(); + for (Mapper mapper: mapperService.documentMapper().mappers()) { + if (mapper instanceof MetadataFieldMapper) { + continue; + } + mapper.name(); + if (mapper instanceof FieldMapper) { + FieldMapper fieldMapper = (FieldMapper) mapper; + if (fieldMapper.fieldType().hasDocValues()) { + String fieldName = fieldMapper.name(); + FieldInfo fieldInfo = reader.getFieldInfos().fieldInfo(fieldName); + DocValueFormat format = fieldMapper.fieldType().docValueFormat(null, null); + if (fieldInfo != null) { + switch (fieldInfo.getDocValuesType()) { + case SORTED_SET: + SortedSetDocValues dv = reader.getSortedSetDocValues(fieldName); + if (dv.advanceExact(docId)) { + BytesRef[] values = new BytesRef[dv.docValueCount()]; + for (int i = 0; i < dv.docValueCount(); i++) { + values[i] = dv.lookupOrd(dv.nextOrd()); + } + if (values.length > 1) { + docValues.put(fieldName, Arrays.stream(values).map(format::format).collect(Collectors.toList())); + } else { + docValues.put(fieldName, format.format(values[0])); + } + } + break; + case SORTED_NUMERIC: + SortedNumericDocValues sndv = reader.getSortedNumericDocValues(fieldName); + if (fieldMapper instanceof NumberFieldMapper) { + NumberFieldMapper.NumberType numberType = ((NumberFieldMapper) fieldMapper).getType(); + switch (numberType) { + case HALF_FLOAT: + case FLOAT: + case DOUBLE: + SortedNumericDoubleValues doubleValues = ((LeafNumericFieldData) indexShard.indexFieldDataService().getForField(fieldMapper.fieldType(), "", () -> null) + .load(reader.getContext())).getDoubleValues(); + if (doubleValues.advanceExact(docId)) { + int size = doubleValues.docValueCount(); + double[] vals = new double[size]; + for (int i = 0; i < size; i++) { + vals[i] = doubleValues.nextValue(); + } + if (size > 1) { + docValues.put(fieldName, vals); + } else { + docValues.put(fieldName, vals[0]); + } + } + break; + case INTEGER: + case LONG: + case UNSIGNED_LONG: + if (sndv.advanceExact(docId)) { + int size = sndv.docValueCount(); + long[] vals = new long[size]; + for (int i = 0; i < size; i++) { + vals[i] = sndv.nextValue(); + } + if (size > 1) { + docValues.put(fieldName, vals); + } else { + docValues.put(fieldName, vals[0]); + } + } + } + } + break; + } + } + } + } + } + return docValues; + } + /** * Resets the provided {@link HitContext} with information on the current * nested document. This includes the following: