From 1c3b9461dc1eb2add5c7ccc168ee2bc444f43bc4 Mon Sep 17 00:00:00 2001 From: Junwei Dai Date: Wed, 4 Dec 2024 15:04:47 -0800 Subject: [PATCH 1/8] Add verbose pipeline parameter to output each processor's execution details Signed-off-by: Junwei Dai --- .../action/search/SearchResponse.java | 5 +- .../action/search/SearchResponseSections.java | 27 ++- .../rest/action/search/RestSearchAction.java | 3 + .../org/opensearch/search/SearchHits.java | 17 ++ .../search/builder/SearchSourceBuilder.java | 34 +++- .../internal/InternalSearchResponse.java | 43 +++- .../opensearch/search/pipeline/Pipeline.java | 27 +++ .../pipeline/PipelineProcessingContext.java | 33 ++++ .../pipeline/ProcessorExecutionDetail.java | 183 ++++++++++++++++++ .../action/search/SearchResponseTests.java | 6 +- .../search/GenericSearchExtBuilderTests.java | 4 +- .../builder/SearchSourceBuilderTests.java | 24 +++ .../pipeline/SearchPipelineServiceTests.java | 82 +++++++- 13 files changed, 470 insertions(+), 18 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/pipeline/ProcessorExecutionDetail.java diff --git a/server/src/main/java/org/opensearch/action/search/SearchResponse.java b/server/src/main/java/org/opensearch/action/search/SearchResponse.java index 899c71e91e3ab..90d8232a82c8b 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchResponse.java +++ b/server/src/main/java/org/opensearch/action/search/SearchResponse.java @@ -59,6 +59,7 @@ import org.opensearch.search.aggregations.Aggregations; import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.internal.InternalSearchResponse; +import org.opensearch.search.pipeline.ProcessorExecutionDetail; import org.opensearch.search.profile.ProfileShardResult; import org.opensearch.search.profile.SearchProfileShardResults; import org.opensearch.search.suggest.Suggest; @@ -394,6 +395,7 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE List failures = new ArrayList<>(); Clusters clusters = Clusters.EMPTY; List extBuilders = new ArrayList<>(); + List processorResult = new ArrayList<>(); for (Token token = parser.nextToken(); token != Token.END_OBJECT; token = parser.nextToken()) { if (token == Token.FIELD_NAME) { currentFieldName = parser.currentName(); @@ -530,7 +532,8 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE terminatedEarly, profile, numReducePhases, - extBuilders + extBuilders, + processorResult ); return new SearchResponse( searchResponseSections, diff --git a/server/src/main/java/org/opensearch/action/search/SearchResponseSections.java b/server/src/main/java/org/opensearch/action/search/SearchResponseSections.java index bca2c8a52b691..fa4b0030148f5 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchResponseSections.java +++ b/server/src/main/java/org/opensearch/action/search/SearchResponseSections.java @@ -40,6 +40,7 @@ import org.opensearch.search.SearchExtBuilder; import org.opensearch.search.SearchHits; import org.opensearch.search.aggregations.Aggregations; +import org.opensearch.search.pipeline.ProcessorExecutionDetail; import org.opensearch.search.profile.ProfileShardResult; import org.opensearch.search.profile.SearchProfileShardResults; import org.opensearch.search.suggest.Suggest; @@ -65,7 +66,6 @@ public class SearchResponseSections implements ToXContentFragment { public static final ParseField EXT_FIELD = new ParseField("ext"); - protected final SearchHits hits; protected final Aggregations aggregations; protected final Suggest suggest; @@ -74,6 +74,7 @@ public class SearchResponseSections implements ToXContentFragment { protected final Boolean terminatedEarly; protected final int numReducePhases; protected final List searchExtBuilders = new ArrayList<>(); + protected final List processorResult = new ArrayList<>(); public SearchResponseSections( SearchHits hits, @@ -84,7 +85,17 @@ public SearchResponseSections( SearchProfileShardResults profileResults, int numReducePhases ) { - this(hits, aggregations, suggest, timedOut, terminatedEarly, profileResults, numReducePhases, Collections.emptyList()); + this( + hits, + aggregations, + suggest, + timedOut, + terminatedEarly, + profileResults, + numReducePhases, + Collections.emptyList(), + Collections.emptyList() + ); } public SearchResponseSections( @@ -95,7 +106,8 @@ public SearchResponseSections( Boolean terminatedEarly, SearchProfileShardResults profileResults, int numReducePhases, - List searchExtBuilders + List searchExtBuilders, + List processorResult ) { this.hits = hits; this.aggregations = aggregations; @@ -104,6 +116,7 @@ public SearchResponseSections( this.timedOut = timedOut; this.terminatedEarly = terminatedEarly; this.numReducePhases = numReducePhases; + this.processorResult.addAll(processorResult); this.searchExtBuilders.addAll(Objects.requireNonNull(searchExtBuilders, "searchExtBuilders must not be null")); } @@ -166,6 +179,10 @@ public final XContentBuilder toXContent(XContentBuilder builder, Params params) } builder.endObject(); } + + if (!processorResult.isEmpty()) { + builder.field("processor_result", processorResult); + } return builder; } @@ -173,6 +190,10 @@ public List getSearchExtBuilders() { return Collections.unmodifiableList(this.searchExtBuilders); } + public List getProcessorResult() { + return processorResult; + } + protected void writeTo(StreamOutput out) throws IOException { throw new UnsupportedOperationException(); } diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java index 05465e32631fd..8e2fa8246ac1b 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java @@ -256,6 +256,9 @@ private static void parseSearchSource(final SearchSourceBuilder searchSourceBuil if (request.hasParam("timeout")) { searchSourceBuilder.timeout(request.paramAsTime("timeout", null)); } + if (request.hasParam("verbose_pipeline")) { + searchSourceBuilder.verbosePipeline(request.paramAsBoolean("verbose_pipeline", false)); + } if (request.hasParam("terminate_after")) { int terminateAfter = request.paramAsInt("terminate_after", SearchContext.DEFAULT_TERMINATE_AFTER); if (terminateAfter < 0) { diff --git a/server/src/main/java/org/opensearch/search/SearchHits.java b/server/src/main/java/org/opensearch/search/SearchHits.java index 8232643b353f5..fab0c8fb59d9c 100644 --- a/server/src/main/java/org/opensearch/search/SearchHits.java +++ b/server/src/main/java/org/opensearch/search/SearchHits.java @@ -37,6 +37,7 @@ import org.apache.lucene.search.TotalHits.Relation; import org.opensearch.common.Nullable; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.Lucene; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -166,6 +167,22 @@ public SearchHit[] getHits() { return this.hits; } + /** + * Creates a deep copy of this SearchHits instance. + * + * @return a deep copy of the current SearchHits object + * @throws IOException if an I/O exception occurs during serialization or deserialization + */ + public SearchHits deepCopy() throws IOException { + try (BytesStreamOutput out = new BytesStreamOutput()) { + this.writeTo(out); + + try (StreamInput in = out.bytes().streamInput()) { + return new SearchHits(in); + } + } + } + /** * Return the hit as the provided position. */ diff --git a/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java b/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java index 9c438401b9fbe..db8bea7a0d8d7 100644 --- a/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java +++ b/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java @@ -136,6 +136,7 @@ public final class SearchSourceBuilder implements Writeable, ToXContentObject, R public static final ParseField SLICE = new ParseField("slice"); public static final ParseField POINT_IN_TIME = new ParseField("pit"); public static final ParseField SEARCH_PIPELINE = new ParseField("search_pipeline"); + public static final ParseField VERBOSE_SEARCH_PIPELINE = new ParseField("verbose_pipeline"); public static SearchSourceBuilder fromXContent(XContentParser parser) throws IOException { return fromXContent(parser, true); @@ -226,6 +227,8 @@ public static HighlightBuilder highlight() { private String searchPipeline; + private boolean verbosePipeline; + /** * Constructs a new search source builder. */ @@ -301,6 +304,7 @@ public SearchSourceBuilder(StreamInput in) throws IOException { } if (in.getVersion().onOrAfter(Version.V_2_18_0)) { searchPipeline = in.readOptionalString(); + verbosePipeline = in.readBoolean(); } } @@ -384,6 +388,7 @@ public void writeTo(StreamOutput out) throws IOException { } if (out.getVersion().onOrAfter(Version.V_2_18_0)) { out.writeOptionalString(searchPipeline); + out.writeOptionalBoolean(verbosePipeline); } } @@ -1142,6 +1147,26 @@ public SearchSourceBuilder pipeline(String searchPipeline) { return this; } + /** + * Enables or disables verbose mode for the search pipeline. + * + * When verbose mode is enabled, detailed information about each processor + * in the search pipeline is included in the search response. This includes + * the processor name, execution status, input, output, and time taken for processing. + * + * This parameter is primarily intended for debugging purposes, allowing users + * to track how data flows and transforms through the search pipeline. + * + */ + public SearchSourceBuilder verbosePipeline(boolean verbosePipeline) { + this.verbosePipeline = verbosePipeline; + return this; + } + + public Boolean verbosePipeline() { + return verbosePipeline; + } + /** * Rewrites this search source builder into its primitive form. e.g. by * rewriting the QueryBuilder. If the builder did not change the identity @@ -1240,6 +1265,7 @@ private SearchSourceBuilder shallowCopy( rewrittenBuilder.derivedFieldsObject = derivedFieldsObject; rewrittenBuilder.derivedFields = derivedFields; rewrittenBuilder.searchPipeline = searchPipeline; + rewrittenBuilder.verbosePipeline = verbosePipeline; return rewrittenBuilder; } @@ -1309,6 +1335,8 @@ public void parseXContent(XContentParser parser, boolean checkTrailingTokens) th profile = parser.booleanValue(); } else if (SEARCH_PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) { searchPipeline = parser.text(); + } else if (VERBOSE_SEARCH_PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) { + verbosePipeline = parser.booleanValue(); } else { throw new ParsingException( parser.getTokenLocation(), @@ -1920,7 +1948,8 @@ public int hashCode() { pointInTimeBuilder, derivedFieldsObject, derivedFields, - searchPipeline + searchPipeline, + verbosePipeline ); } @@ -1966,7 +1995,8 @@ public boolean equals(Object obj) { && Objects.equals(pointInTimeBuilder, other.pointInTimeBuilder) && Objects.equals(derivedFieldsObject, other.derivedFieldsObject) && Objects.equals(derivedFields, other.derivedFields) - && Objects.equals(searchPipeline, other.searchPipeline); + && Objects.equals(searchPipeline, other.searchPipeline) + && Objects.equals(verbosePipeline, other.verbosePipeline); } @Override diff --git a/server/src/main/java/org/opensearch/search/internal/InternalSearchResponse.java b/server/src/main/java/org/opensearch/search/internal/InternalSearchResponse.java index c9d7b0084c1e1..532575fe097fa 100644 --- a/server/src/main/java/org/opensearch/search/internal/InternalSearchResponse.java +++ b/server/src/main/java/org/opensearch/search/internal/InternalSearchResponse.java @@ -42,6 +42,7 @@ import org.opensearch.search.SearchExtBuilder; import org.opensearch.search.SearchHits; import org.opensearch.search.aggregations.InternalAggregations; +import org.opensearch.search.pipeline.ProcessorExecutionDetail; import org.opensearch.search.profile.SearchProfileShardResults; import org.opensearch.search.suggest.Suggest; @@ -73,7 +74,17 @@ public InternalSearchResponse( Boolean terminatedEarly, int numReducePhases ) { - this(hits, aggregations, suggest, profileResults, timedOut, terminatedEarly, numReducePhases, Collections.emptyList()); + this( + hits, + aggregations, + suggest, + profileResults, + timedOut, + terminatedEarly, + numReducePhases, + Collections.emptyList(), + Collections.emptyList() + ); } public InternalSearchResponse( @@ -84,9 +95,20 @@ public InternalSearchResponse( boolean timedOut, Boolean terminatedEarly, int numReducePhases, - List searchExtBuilderList + List searchExtBuilderList, + List processorResult ) { - super(hits, aggregations, suggest, timedOut, terminatedEarly, profileResults, numReducePhases, searchExtBuilderList); + super( + hits, + aggregations, + suggest, + timedOut, + terminatedEarly, + profileResults, + numReducePhases, + searchExtBuilderList, + processorResult + ); } public InternalSearchResponse(StreamInput in) throws IOException { @@ -98,7 +120,8 @@ public InternalSearchResponse(StreamInput in) throws IOException { in.readOptionalBoolean(), in.readOptionalWriteable(SearchProfileShardResults::new), in.readVInt(), - readSearchExtBuildersOnOrAfter(in) + readSearchExtBuildersOnOrAfter(in), + readProcessorResultOnOrAfter(in) ); } @@ -112,6 +135,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(profileResults); out.writeVInt(numReducePhases); writeSearchExtBuildersOnOrAfter(out, searchExtBuilders); + writeProcessorResultOnOrAfter(out, processorResult); } private static List readSearchExtBuildersOnOrAfter(StreamInput in) throws IOException { @@ -123,4 +147,15 @@ private static void writeSearchExtBuildersOnOrAfter(StreamOutput out, List readProcessorResultOnOrAfter(StreamInput in) throws IOException { + return (in.getVersion().onOrAfter(Version.V_2_18_0)) ? in.readList(ProcessorExecutionDetail::new) : Collections.emptyList(); + } + + private static void writeProcessorResultOnOrAfter(StreamOutput out, List processorResult) throws IOException { + if (out.getVersion().onOrAfter(Version.V_2_18_0)) { + out.writeCollection(processorResult, (o, detail) -> detail.writeTo(o)); + } + } + } diff --git a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java index c88dfb2060393..4600cda4e993a 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java +++ b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java @@ -23,6 +23,7 @@ import org.opensearch.search.SearchPhaseResult; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; @@ -146,11 +147,18 @@ void transformRequest(SearchRequest request, ActionListener reque final ActionListener nextListener = currentListener; SearchRequestProcessor processor = searchRequestProcessors.get(i); currentListener = ActionListener.wrap(r -> { + ProcessorExecutionDetail detail = new ProcessorExecutionDetail(processor.getType()); + detail.addInput(r.source().shallowCopy()); long start = relativeTimeSupplier.getAsLong(); beforeRequestProcessor(processor); processor.processRequestAsync(r, requestContext, ActionListener.wrap(rr -> { long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); afterRequestProcessor(processor, took); + detail.addOutput(rr.source().shallowCopy()); + detail.addTook(took); + if (rr.source().verbosePipeline()) { + requestContext.addProcessorExecutionDetail(detail); + } nextListener.onResponse(rr); }, e -> { long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); @@ -202,6 +210,14 @@ ActionListener transformResponseListener( ) { if (searchResponseProcessors.isEmpty()) { // No response transformation necessary + if (!requestContext.getProcessorExecutionDetails().isEmpty()) { + ActionListener finalResponseListener = responseListener; + return ActionListener.wrap(r -> { + List details = requestContext.getProcessorExecutionDetails(); + r.getInternalResponse().getProcessorResult().addAll(details); + finalResponseListener.onResponse(r); + }, responseListener::onFailure); + } return responseListener; } @@ -225,11 +241,19 @@ ActionListener transformResponseListener( final SearchResponseProcessor processor = searchResponseProcessors.get(i); responseListener = ActionListener.wrap(r -> { + ProcessorExecutionDetail detail = new ProcessorExecutionDetail(processor.getType()); + detail.addInput(Arrays.asList(r.getHits().deepCopy().getHits())); beforeResponseProcessor(processor); final long start = relativeTimeSupplier.getAsLong(); processor.processResponseAsync(request, r, requestContext, ActionListener.wrap(rr -> { long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); afterResponseProcessor(processor, took); + detail.addOutput(Arrays.asList(rr.getHits().deepCopy().getHits())); + detail.addTook(took); + if (request.source().verbosePipeline()) { + requestContext.addProcessorExecutionDetail(detail); + rr.getInternalResponse().getProcessorResult().add(detail); + } currentFinalListener.onResponse(rr); }, e -> { onResponseProcessorFailed(processor); @@ -254,6 +278,9 @@ ActionListener transformResponseListener( } final ActionListener chainListener = responseListener; return ActionListener.wrap(r -> { + // Adding all the request processor detail + List details = requestContext.getProcessorExecutionDetails(); + r.getInternalResponse().getProcessorResult().addAll(details); beforeTransformResponse(); pipelineStart[0] = relativeTimeSupplier.getAsLong(); chainListener.onResponse(r); diff --git a/server/src/main/java/org/opensearch/search/pipeline/PipelineProcessingContext.java b/server/src/main/java/org/opensearch/search/pipeline/PipelineProcessingContext.java index a1f2b8b99d958..af30b3cbd3dd6 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/PipelineProcessingContext.java +++ b/server/src/main/java/org/opensearch/search/pipeline/PipelineProcessingContext.java @@ -8,7 +8,10 @@ package org.opensearch.search.pipeline; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -17,6 +20,9 @@ public class PipelineProcessingContext { private final Map attributes = new HashMap<>(); + // Key for processor execution details + private static final String PROCESSOR_EXECUTION_DETAILS_KEY = "processorExecutionDetails"; + /** * Set a generic attribute in the state for this request. Overwrites any existing value. * @@ -35,4 +41,31 @@ public void setAttribute(String name, Object value) { public Object getAttribute(String name) { return attributes.get(name); } + + /** + * Add a ProcessorExecutionDetail to the list of execution details. + * + * @param detail the ProcessorExecutionDetail to add + */ + @SuppressWarnings("unchecked") + public void addProcessorExecutionDetail(ProcessorExecutionDetail detail) { + attributes.computeIfAbsent(PROCESSOR_EXECUTION_DETAILS_KEY, k -> new ArrayList()); + List details = (List) attributes.get(PROCESSOR_EXECUTION_DETAILS_KEY); + details.add(detail); + } + + /** + * Get all ProcessorExecutionDetails recorded in this context. + * + * @return an unmodifiable list of ProcessorExecutionDetails + */ + @SuppressWarnings("unchecked") + public List getProcessorExecutionDetails() { + Object details = attributes.get(PROCESSOR_EXECUTION_DETAILS_KEY); + if (details instanceof List) { + return Collections.unmodifiableList((List) details); + } + return Collections.emptyList(); + } + } diff --git a/server/src/main/java/org/opensearch/search/pipeline/ProcessorExecutionDetail.java b/server/src/main/java/org/opensearch/search/pipeline/ProcessorExecutionDetail.java new file mode 100644 index 0000000000000..3ad00c373c8b5 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/pipeline/ProcessorExecutionDetail.java @@ -0,0 +1,183 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +/** + * Detailed information about a processor execution in a search pipeline. + * + * @opensearch.internal + */ +@PublicApi(since = "1.0.0") +public class ProcessorExecutionDetail implements Writeable, ToXContentObject { + + private final String processorName; + private long durationMillis; + private Object inputData; + private Object outputData; + private static final Logger logger = LogManager.getLogger(ProcessorExecutionDetail.class); + + /** + * Constructor for ProcessorExecutionDetail + */ + public ProcessorExecutionDetail(String processorName, long durationMillis, Object inputData, Object outputData) { + this.processorName = processorName; + this.durationMillis = durationMillis; + this.inputData = inputData; + this.outputData = outputData; + } + + public ProcessorExecutionDetail(String processorName) { + this(processorName, 0, null, null); + + } + + public ProcessorExecutionDetail(StreamInput in) throws IOException { + this.processorName = in.readString(); + this.durationMillis = in.readLong(); + this.inputData = in.readGenericValue(); + this.outputData = in.readGenericValue(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(processorName); + out.writeLong(durationMillis); + out.writeGenericValue(inputData); + out.writeGenericValue(outputData); + } + + public String getProcessorName() { + return processorName; + } + + public long getDurationMillis() { + return durationMillis; + } + + public Object getInputData() { + return inputData; + } + + public Object getOutputData() { + return outputData; + } + + /** + * Adds or updates the input data for this processor execution detail. + * + * @param inputData the new input data + */ + public void addInput(Object inputData) { + this.inputData = inputData; + } + + /** + * Adds or updates the output data for this processor execution detail. + * + * @param outputData the new output data + */ + public void addOutput(Object outputData) { + this.outputData = outputData; + } + + /** + * Adds or updates the duration of the processor execution. + * + * @param durationMillis the new duration in milliseconds + */ + public void addTook(long durationMillis) { + this.durationMillis = durationMillis; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("processor_name", processorName); + builder.field("duration_millis", durationMillis); + + addFieldToXContent(builder, "input_data", inputData, params); + + addFieldToXContent(builder, "output_data", outputData, params); + + builder.endObject(); + return builder; + } + + private void addFieldToXContent(XContentBuilder builder, String fieldName, Object data, Params params) throws IOException { + if (data == null) { + builder.nullField(fieldName); + return; + } + + if (data instanceof List) { + builder.startArray(fieldName); + for (Object item : (List) data) { + writeItemToXContent(builder, item, params); + } + builder.endArray(); + } else if (data instanceof ToXContentObject) { + builder.field(fieldName); + ((ToXContentObject) data).toXContent(builder, params); + } else { + builder.field(fieldName, data); + } + } + + private void writeItemToXContent(XContentBuilder builder, Object item, Params params) throws IOException { + if (item instanceof ToXContentObject) { + ((ToXContentObject) item).toXContent(builder, params); + } else { + builder.value(item); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ProcessorExecutionDetail that = (ProcessorExecutionDetail) o; + return durationMillis == that.durationMillis + && Objects.equals(processorName, that.processorName) + && Objects.equals(inputData, that.inputData) + && Objects.equals(outputData, that.outputData); + } + + @Override + public int hashCode() { + return Objects.hash(processorName, durationMillis, inputData, outputData); + } + + @Override + public String toString() { + return "ProcessorExecutionDetail{" + + "processorName='" + + processorName + + '\'' + + ", durationMillis=" + + durationMillis + + ", inputData=" + + inputData + + ", outputData=" + + outputData + + '}'; + } +} diff --git a/server/src/test/java/org/opensearch/action/search/SearchResponseTests.java b/server/src/test/java/org/opensearch/action/search/SearchResponseTests.java index c9e59ab4ea04d..228773d04ab2c 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchResponseTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchResponseTests.java @@ -176,7 +176,8 @@ public SearchResponse createTestItem( timedOut, terminatedEarly, numReducePhases, - searchExtBuilders + searchExtBuilders, + Collections.emptyList() ); } else { internalSearchResponse = InternalSearchResponse.empty(); @@ -321,7 +322,8 @@ public void testToXContent() { false, null, 1, - List.of(new DummySearchExtBuilder(dummyId)) + List.of(new DummySearchExtBuilder(dummyId)), + Collections.emptyList() ), null, 0, diff --git a/server/src/test/java/org/opensearch/search/GenericSearchExtBuilderTests.java b/server/src/test/java/org/opensearch/search/GenericSearchExtBuilderTests.java index 8fb1814962155..8d910d4cc57d3 100644 --- a/server/src/test/java/org/opensearch/search/GenericSearchExtBuilderTests.java +++ b/server/src/test/java/org/opensearch/search/GenericSearchExtBuilderTests.java @@ -40,6 +40,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -251,7 +252,8 @@ public SearchResponse createTestItem( timedOut, terminatedEarly, numReducePhases, - searchExtBuilders + searchExtBuilders, + Collections.emptyList() ); } else { internalSearchResponse = InternalSearchResponse.empty(); diff --git a/server/src/test/java/org/opensearch/search/builder/SearchSourceBuilderTests.java b/server/src/test/java/org/opensearch/search/builder/SearchSourceBuilderTests.java index da8ccc9e121e0..90962a5c613f1 100644 --- a/server/src/test/java/org/opensearch/search/builder/SearchSourceBuilderTests.java +++ b/server/src/test/java/org/opensearch/search/builder/SearchSourceBuilderTests.java @@ -703,6 +703,30 @@ public void testParseFromAndSize() throws IOException { } } + public void testVerbosePipeline() throws IOException { + { + String restContent = "{ \"verbose_pipeline\": true }"; + try (XContentParser parser = createParser(JsonXContent.jsonXContent, restContent)) { + SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.fromXContent(parser); + assertTrue(searchSourceBuilder.verbosePipeline()); + } + } + { + String restContent = "{ \"verbose_pipeline\": false }"; + try (XContentParser parser = createParser(JsonXContent.jsonXContent, restContent)) { + SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.fromXContent(parser); + assertFalse(searchSourceBuilder.verbosePipeline()); + } + } + { + String restContent = "{ \"query\": { \"match_all\": {} } }"; + try (XContentParser parser = createParser(JsonXContent.jsonXContent, restContent)) { + SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.fromXContent(parser); + assertFalse(searchSourceBuilder.verbosePipeline()); + } + } + } + private void assertIndicesBoostParseErrorMessage(String restContent, String expectedErrorMessage) throws IOException { try (XContentParser parser = createParser(JsonXContent.jsonXContent, restContent)) { ParsingException e = expectThrows(ParsingException.class, () -> SearchSourceBuilder.fromXContent(parser)); diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java index b52205996f34b..99461bdf5c3be 100644 --- a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java @@ -669,12 +669,14 @@ public void testTransformResponse() throws Exception { // First try without specifying a pipeline, which should be a no-op. SearchRequest searchRequest = new SearchRequest(); + searchRequest.source(createDefaultSearchSourceBuilder()); PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); SearchResponse notTransformedResponse = syncTransformResponse(pipelinedRequest, searchResponse); assertSame(searchResponse, notTransformedResponse); // Now apply a pipeline searchRequest = new SearchRequest().pipeline("p1"); + searchRequest.source(createDefaultSearchSourceBuilder()); pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); SearchResponse transformedResponse = syncTransformResponse(pipelinedRequest, searchResponse); assertEquals(size, transformedResponse.getHits().getHits().length); @@ -1105,7 +1107,7 @@ public void testExceptionOnResponseProcessing() throws Exception { PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); - SearchResponse response = new SearchResponse(null, null, 0, 0, 0, 0, null, null); + SearchResponse response = createDefaultSearchResponse(); // Exception thrown when processing response expectThrows(SearchPipelineProcessingException.class, () -> syncTransformResponse(pipelinedRequest, response)); } @@ -1169,7 +1171,7 @@ public void testCatchExceptionOnResponseProcessing() throws Exception { PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); - SearchResponse response = new SearchResponse(null, null, 0, 0, 0, 0, null, null); + SearchResponse response = createDefaultSearchResponse(); // Caught Exception thrown when processing response and produced warn level logging message try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(Pipeline.class))) { @@ -1209,7 +1211,8 @@ public void testStats() throws Exception { SearchPipelineService searchPipelineService = getSearchPipelineService(requestProcessors, responseProcessors); SearchRequest request = new SearchRequest(); - SearchResponse response = new SearchResponse(null, null, 0, 0, 0, 0, null, null); + request.source(createDefaultSearchSourceBuilder()); + SearchResponse response = createDefaultSearchResponse(); syncExecutePipeline( searchPipelineService.resolvePipeline(request.pipeline("good_request_pipeline"), indexNameExpressionResolver), @@ -1307,7 +1310,8 @@ public void testStatsEnabledIgnoreFailure() throws Exception { ); SearchRequest request = new SearchRequest(); - SearchResponse response = new SearchResponse(null, null, 0, 0, 0, 0, null, null); + request.source(createDefaultSearchSourceBuilder()); + SearchResponse response = createDefaultSearchResponse(); syncExecutePipeline( searchPipelineService.resolvePipeline(request.pipeline("good_request_pipeline"), indexNameExpressionResolver), @@ -1579,8 +1583,9 @@ public void testStatefulProcessors() throws Exception { searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousState)); PipelinedRequest request = searchPipelineService.resolvePipeline(new SearchRequest().pipeline("p1"), indexNameExpressionResolver); + request.source(createDefaultSearchSourceBuilder()); assertNull(contextHolder.get()); - syncExecutePipeline(request, new SearchResponse(null, null, 0, 0, 0, 0, null, null)); + syncExecutePipeline(request, createDefaultSearchResponse()); assertNotNull(contextHolder.get()); assertEquals("b", contextHolder.get()); } @@ -1757,4 +1762,71 @@ public void testInvalidIndexResolveIndexDefaultPipeline() throws Exception { assertEquals(5, pipelinedRequest.source().size()); } + public void testVerbosePipelineExecution() throws Exception { + SearchPipelineService searchPipelineService = createWithProcessors(); + + SearchPipelineMetadata metadata = new SearchPipelineMetadata( + Map.of( + "verbose_pipeline", + new PipelineConfiguration( + "verbose_pipeline", + new BytesArray( + "{" + + "\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : 2 } } ]," + + "\"response_processors\": [ { \"fixed_score\": { \"score\": 5.0 } } ]" + + "}" + ), + MediaTypeRegistry.JSON + ) + ) + ); + + ClusterState initialState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState updatedState = ClusterState.builder(initialState) + .metadata(Metadata.builder().putCustom(SearchPipelineMetadata.TYPE, metadata)) + .build(); + + searchPipelineService.applyClusterState(new ClusterChangedEvent("clusterStateUpdated", updatedState, initialState)); + + SearchRequest searchRequest = new SearchRequest().source(SearchSourceBuilder.searchSource().size(10)).pipeline("verbose_pipeline"); + searchRequest.source().verbosePipeline(true); + + PipelinedRequest pipelinedRequest = syncTransformRequest( + searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver) + ); + + SearchResponseSections sections = new SearchResponseSections( + new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0.0f), + null, + null, + false, + null, + null, + 1, + List.of(), + List.of() + ); + + SearchResponse searchResponse = new SearchResponse(sections, null, 0, 0, 0, 0, null, null); + SearchResponse transformedResponse = syncTransformResponse(pipelinedRequest, searchResponse); + List executionDetails = transformedResponse.getInternalResponse().getProcessorResult(); + + assertNotNull(executionDetails); + assertEquals(2, executionDetails.size()); + assertEquals("scale_request_size", executionDetails.get(0).getProcessorName()); + assertEquals("fixed_score", executionDetails.get(1).getProcessorName()); + } + + private SearchSourceBuilder createDefaultSearchSourceBuilder() { + return SearchSourceBuilder.searchSource().size(10); + } + + private SearchResponse createDefaultSearchResponse() { + SearchHits searchHits = new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0.0f); + + SearchResponseSections sections = new SearchResponseSections(searchHits, null, null, false, null, null, 1, List.of(), List.of()); + + return new SearchResponse(sections, null, 0, 0, 0, 0, null, null); + } + } From d931750f3f246d3b8762a9965e6fec2368b8a252 Mon Sep 17 00:00:00 2001 From: Junwei Dai Date: Thu, 12 Dec 2024 16:06:24 -0800 Subject: [PATCH 2/8] add change log Signed-off-by: Junwei Dai --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b76a3d50cb0d..868e845ef15c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add new configuration setting `synonym_analyzer`, to the `synonym` and `synonym_graph` filters, enabling the specification of a custom analyzer for reading the synonym file ([#16488](https://github.com/opensearch-project/OpenSearch/pull/16488)). - Add stats for remote publication failure and move download failure stats to remote methods([#16682](https://github.com/opensearch-project/OpenSearch/pull/16682/)) - Added a precaution to handle extreme date values during sorting to prevent `arithmetic_exception: long overflow` ([#16812](https://github.com/opensearch-project/OpenSearch/pull/16812)). +- Add `verbose_pipeline` parameter to output each processor's execution details ([#14745](https://github.com/opensearch-project/OpenSearch/pull/14745)). - Add search replica stats to segment replication stats API ([#16678](https://github.com/opensearch-project/OpenSearch/pull/16678)) ### Dependencies @@ -67,7 +68,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bound the size of cache in deprecation logger ([16702](https://github.com/opensearch-project/OpenSearch/issues/16702)) - Ensure consistency of system flag on IndexMetadata after diff is applied ([#16644](https://github.com/opensearch-project/OpenSearch/pull/16644)) - Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state ([#16763](https://github.com/opensearch-project/OpenSearch/pull/16763)) -- Fix _list/shards API failing when closed indices are present ([#16606](https://github.com/opensearch-project/OpenSearch/pull/16606)) ### Security From 1f879c10694d9dba304d99771cf319ff0c26017a Mon Sep 17 00:00:00 2001 From: Junwei Dai Date: Thu, 12 Dec 2024 16:32:02 -0800 Subject: [PATCH 3/8] Add merge missing change log Signed-off-by: Junwei Dai --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 868e845ef15c8..69b0b9081ef00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,7 +68,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bound the size of cache in deprecation logger ([16702](https://github.com/opensearch-project/OpenSearch/issues/16702)) - Ensure consistency of system flag on IndexMetadata after diff is applied ([#16644](https://github.com/opensearch-project/OpenSearch/pull/16644)) - Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state ([#16763](https://github.com/opensearch-project/OpenSearch/pull/16763)) - +- Fix _list/shards API failing when closed indices are present ([#16606](https://github.com/opensearch-project/OpenSearch/pull/16606)) ### Security [Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.18...2.x From 488377f28ec30ae149488e05630ad95b367475d0 Mon Sep 17 00:00:00 2001 From: Junwei Dai Date: Thu, 12 Dec 2024 17:06:04 -0800 Subject: [PATCH 4/8] Add missing line Signed-off-by: Junwei Dai --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 69b0b9081ef00..7367bb5b9870b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -69,6 +69,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Ensure consistency of system flag on IndexMetadata after diff is applied ([#16644](https://github.com/opensearch-project/OpenSearch/pull/16644)) - Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state ([#16763](https://github.com/opensearch-project/OpenSearch/pull/16763)) - Fix _list/shards API failing when closed indices are present ([#16606](https://github.com/opensearch-project/OpenSearch/pull/16606)) + ### Security [Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.18...2.x From e4e30f5c8b9697533362df1dc4c2caae30afd070 Mon Sep 17 00:00:00 2001 From: Junwei Dai Date: Thu, 12 Dec 2024 19:04:47 -0800 Subject: [PATCH 5/8] Refactor ProcessorExecutionDetail to improve field handling Signed-off-by: Junwei Dai --- .../action/search/SearchResponse.java | 6 ++ .../action/search/SearchResponseSections.java | 3 +- .../pipeline/ProcessorExecutionDetail.java | 78 ++++++++++++++++--- 3 files changed, 76 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/search/SearchResponse.java b/server/src/main/java/org/opensearch/action/search/SearchResponse.java index 90d8232a82c8b..0d55fbf2e7f88 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchResponse.java +++ b/server/src/main/java/org/opensearch/action/search/SearchResponse.java @@ -74,6 +74,7 @@ import java.util.function.Supplier; import static org.opensearch.action.search.SearchResponseSections.EXT_FIELD; +import static org.opensearch.action.search.SearchResponseSections.PROCESSOR_RESULT_FIELD; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; /** @@ -519,6 +520,11 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE extBuilders.add(searchExtBuilder); } } + } else if (PROCESSOR_RESULT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { + while ((token = parser.nextToken()) != Token.END_ARRAY) { + ProcessorExecutionDetail detail = ProcessorExecutionDetail.fromXContent(parser); + processorResult.add(detail); + } } else { parser.skipChildren(); } diff --git a/server/src/main/java/org/opensearch/action/search/SearchResponseSections.java b/server/src/main/java/org/opensearch/action/search/SearchResponseSections.java index fa4b0030148f5..5eb305d91ee04 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchResponseSections.java +++ b/server/src/main/java/org/opensearch/action/search/SearchResponseSections.java @@ -66,6 +66,7 @@ public class SearchResponseSections implements ToXContentFragment { public static final ParseField EXT_FIELD = new ParseField("ext"); + public static final ParseField PROCESSOR_RESULT_FIELD = new ParseField("processor_results"); protected final SearchHits hits; protected final Aggregations aggregations; protected final Suggest suggest; @@ -181,7 +182,7 @@ public final XContentBuilder toXContent(XContentBuilder builder, Params params) } if (!processorResult.isEmpty()) { - builder.field("processor_result", processorResult); + builder.field(PROCESSOR_RESULT_FIELD.getPreferredName(), processorResult); } return builder; } diff --git a/server/src/main/java/org/opensearch/search/pipeline/ProcessorExecutionDetail.java b/server/src/main/java/org/opensearch/search/pipeline/ProcessorExecutionDetail.java index 3ad00c373c8b5..81d906b075097 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/ProcessorExecutionDetail.java +++ b/server/src/main/java/org/opensearch/search/pipeline/ProcessorExecutionDetail.java @@ -8,16 +8,17 @@ package org.opensearch.search.pipeline; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.ParseField; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -33,7 +34,10 @@ public class ProcessorExecutionDetail implements Writeable, ToXContentObject { private long durationMillis; private Object inputData; private Object outputData; - private static final Logger logger = LogManager.getLogger(ProcessorExecutionDetail.class); + public static final ParseField PROCESSOR_NAME_FIELD = new ParseField("processor_name"); + public static final ParseField DURATION_MILLIS_FIELD = new ParseField("duration_millis"); + public static final ParseField INPUT_DATA_FIELD = new ParseField("input_data"); + public static final ParseField OUTPUT_DATA_FIELD = new ParseField("output_data"); /** * Constructor for ProcessorExecutionDetail @@ -111,13 +115,10 @@ public void addTook(long durationMillis) { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field("processor_name", processorName); - builder.field("duration_millis", durationMillis); - - addFieldToXContent(builder, "input_data", inputData, params); - - addFieldToXContent(builder, "output_data", outputData, params); - + builder.field(PROCESSOR_NAME_FIELD.getPreferredName(), processorName); + builder.field(DURATION_MILLIS_FIELD.getPreferredName(), durationMillis); + addFieldToXContent(builder, INPUT_DATA_FIELD.getPreferredName(), inputData, params); + addFieldToXContent(builder, OUTPUT_DATA_FIELD.getPreferredName(), outputData, params); builder.endObject(); return builder; } @@ -161,6 +162,63 @@ public boolean equals(Object o) { && Objects.equals(outputData, that.outputData); } + public static ProcessorExecutionDetail fromXContent(XContentParser parser) throws IOException { + String processorName = null; + long durationMillis = 0; + Object inputData = null; + Object outputData = null; + + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); + + if (PROCESSOR_NAME_FIELD.match(fieldName, parser.getDeprecationHandler())) { + processorName = parser.text(); + } else if (DURATION_MILLIS_FIELD.match(fieldName, parser.getDeprecationHandler())) { + durationMillis = parser.longValue(); + } else if (INPUT_DATA_FIELD.match(fieldName, parser.getDeprecationHandler())) { + inputData = parseFieldFromXContent(parser); + } else if (OUTPUT_DATA_FIELD.match(fieldName, parser.getDeprecationHandler())) { + outputData = parseFieldFromXContent(parser); + } else { + parser.skipChildren(); + } + } + + if (processorName == null) { + throw new IllegalArgumentException("Processor name is required"); + } + + return new ProcessorExecutionDetail(processorName, durationMillis, inputData, outputData); + } + + private static Object parseFieldFromXContent(XContentParser parser) throws IOException { + XContentParser.Token token = parser.currentToken(); + if (token == XContentParser.Token.VALUE_NULL) { + return null; + } else if (token == XContentParser.Token.START_ARRAY) { + return parseArrayFromXContent(parser); + } else if (token == XContentParser.Token.START_OBJECT) { + return parser.map(); + } else { + return parser.textOrNull(); + } + } + + private static List parseArrayFromXContent(XContentParser parser) throws IOException { + List list = new ArrayList<>(); + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + if (parser.currentToken() == XContentParser.Token.START_OBJECT) { + list.add(parser.map()); + } else if (parser.currentToken() == XContentParser.Token.START_ARRAY) { + list.add(parseArrayFromXContent(parser)); + } else { + list.add(parser.textOrNull()); + } + } + return list; + } + @Override public int hashCode() { return Objects.hash(processorName, durationMillis, inputData, outputData); From 615a4b6ae7a2c8320e9cb157e9493243c66a3a80 Mon Sep 17 00:00:00 2001 From: Junwei Dai Date: Tue, 17 Dec 2024 13:22:30 -0800 Subject: [PATCH 6/8] Fix ITtest Fail Signed-off-by: Junwei Dai --- .../org/opensearch/search/builder/SearchSourceBuilder.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java b/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java index db8bea7a0d8d7..301e195dda672 100644 --- a/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java +++ b/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java @@ -304,6 +304,8 @@ public SearchSourceBuilder(StreamInput in) throws IOException { } if (in.getVersion().onOrAfter(Version.V_2_18_0)) { searchPipeline = in.readOptionalString(); + } + if (in.getVersion().onOrAfter(Version.CURRENT)) { verbosePipeline = in.readBoolean(); } } @@ -388,6 +390,8 @@ public void writeTo(StreamOutput out) throws IOException { } if (out.getVersion().onOrAfter(Version.V_2_18_0)) { out.writeOptionalString(searchPipeline); + } + if (out.getVersion().onOrAfter(Version.CURRENT)) { out.writeOptionalBoolean(verbosePipeline); } } From 0ae4d06fe3617936a08b20ddf8263bf1d2865e40 Mon Sep 17 00:00:00 2001 From: Junwei Dai Date: Wed, 18 Dec 2024 11:31:30 -0800 Subject: [PATCH 7/8] Add more unit test Signed-off-by: Junwei Dai --- .../pipeline/ProcessorExecutionDetail.java | 7 +- .../action/search/SearchResponseTests.java | 23 +++- .../ProcessorExecutionDetailTests.java | 122 ++++++++++++++++++ .../pipeline/SearchPipelineServiceTests.java | 100 ++++++++++++++ 4 files changed, 250 insertions(+), 2 deletions(-) create mode 100644 server/src/test/java/org/opensearch/search/pipeline/ProcessorExecutionDetailTests.java diff --git a/server/src/main/java/org/opensearch/search/pipeline/ProcessorExecutionDetail.java b/server/src/main/java/org/opensearch/search/pipeline/ProcessorExecutionDetail.java index 81d906b075097..2f4a044c513e0 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/ProcessorExecutionDetail.java +++ b/server/src/main/java/org/opensearch/search/pipeline/ProcessorExecutionDetail.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.Objects; +import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; + /** * Detailed information about a processor execution in a search pipeline. * @@ -167,7 +169,10 @@ public static ProcessorExecutionDetail fromXContent(XContentParser parser) throw long durationMillis = 0; Object inputData = null; Object outputData = null; - + if (parser.currentToken() != XContentParser.Token.START_OBJECT) { + parser.nextToken(); + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + } while (parser.nextToken() != XContentParser.Token.END_OBJECT) { String fieldName = parser.currentName(); parser.nextToken(); diff --git a/server/src/test/java/org/opensearch/action/search/SearchResponseTests.java b/server/src/test/java/org/opensearch/action/search/SearchResponseTests.java index 228773d04ab2c..48c206719cc28 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchResponseTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchResponseTests.java @@ -62,6 +62,7 @@ import org.opensearch.search.aggregations.AggregationsTests; import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.internal.InternalSearchResponse; +import org.opensearch.search.pipeline.ProcessorExecutionDetail; import org.opensearch.search.profile.SearchProfileShardResults; import org.opensearch.search.profile.SearchProfileShardResultsTests; import org.opensearch.search.suggest.Suggest; @@ -312,6 +313,10 @@ public void testToXContent() { hit.score(2.0f); SearchHit[] hits = new SearchHit[] { hit }; String dummyId = UUID.randomUUID().toString(); + List processorResults = List.of( + new ProcessorExecutionDetail("processor1", 50, List.of(1), List.of(1)), + new ProcessorExecutionDetail("processor2", 30, List.of(3), List.of(3)) + ); { SearchResponse response = new SearchResponse( new InternalSearchResponse( @@ -323,7 +328,7 @@ public void testToXContent() { null, 1, List.of(new DummySearchExtBuilder(dummyId)), - Collections.emptyList() + processorResults ), null, 0, @@ -356,8 +361,24 @@ public void testToXContent() { { expectedString.append("{\"dummy\":\"" + dummyId + "\"}"); } + expectedString.append(",\"processor_results\":"); + expectedString.append("["); + for (int i = 0; i < processorResults.size(); i++) { + ProcessorExecutionDetail detail = processorResults.get(i); + expectedString.append("{"); + expectedString.append("\"processor_name\":\"").append(detail.getProcessorName()).append("\","); + expectedString.append("\"duration_millis\":").append(detail.getDurationMillis()).append(","); + expectedString.append("\"input_data\":").append(detail.getInputData()).append(","); + expectedString.append("\"output_data\":").append(detail.getOutputData()); + expectedString.append("}"); + if (i < processorResults.size() - 1) { + expectedString.append(","); + } + } + expectedString.append("]"); } expectedString.append("}"); + assertEquals(expectedString.toString(), Strings.toString(MediaTypeRegistry.JSON, response)); List searchExtBuilders = response.getInternalResponse().getSearchExtBuilders(); assertEquals(1, searchExtBuilders.size()); diff --git a/server/src/test/java/org/opensearch/search/pipeline/ProcessorExecutionDetailTests.java b/server/src/test/java/org/opensearch/search/pipeline/ProcessorExecutionDetailTests.java new file mode 100644 index 0000000000000..76ee99fc2f201 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/pipeline/ProcessorExecutionDetailTests.java @@ -0,0 +1,122 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.xcontent.DeprecationHandler; +import org.opensearch.core.xcontent.MediaType; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class ProcessorExecutionDetailTests extends OpenSearchTestCase { + + public void testSerializationRoundtrip() throws IOException { + ProcessorExecutionDetail detail = new ProcessorExecutionDetail("testProcessor", 123L, Map.of("key", "value"), List.of(1, 2, 3)); + ProcessorExecutionDetail deserialized; + try (BytesStreamOutput output = new BytesStreamOutput()) { + detail.writeTo(output); + try (StreamInput input = output.bytes().streamInput()) { + deserialized = new ProcessorExecutionDetail(input); + } + } + assertEquals("testProcessor", deserialized.getProcessorName()); + assertEquals(123L, deserialized.getDurationMillis()); + assertEquals(Map.of("key", "value"), deserialized.getInputData()); + assertEquals(List.of(1, 2, 3), deserialized.getOutputData()); + } + + public void testAddMethods() { + ProcessorExecutionDetail detail = new ProcessorExecutionDetail("testProcessor"); + detail.addTook(456L); + detail.addInput(Map.of("newKey", "newValue")); + detail.addOutput(List.of(4, 5, 6)); + assertEquals(456L, detail.getDurationMillis()); + assertEquals(Map.of("newKey", "newValue"), detail.getInputData()); + assertEquals(List.of(4, 5, 6), detail.getOutputData()); + } + + public void testEqualsAndHashCode() { + ProcessorExecutionDetail detail1 = new ProcessorExecutionDetail("processor1", 100L, "input1", "output1"); + ProcessorExecutionDetail detail2 = new ProcessorExecutionDetail("processor1", 100L, "input1", "output1"); + ProcessorExecutionDetail detail3 = new ProcessorExecutionDetail("processor2", 200L, "input2", "output2"); + + assertEquals(detail1, detail2); + assertNotEquals(detail1, detail3); + assertEquals(detail1.hashCode(), detail2.hashCode()); + assertNotEquals(detail1.hashCode(), detail3.hashCode()); + } + + public void testToString() { + ProcessorExecutionDetail detail = new ProcessorExecutionDetail("processorZ", 500L, "inputData", "outputData"); + String expected = + "ProcessorExecutionDetail{processorName='processorZ', durationMillis=500, inputData=inputData, outputData=outputData}"; + assertEquals(expected, detail.toString()); + } + + public void testToXContent() throws IOException { + ProcessorExecutionDetail detail = new ProcessorExecutionDetail("testProcessor", 123L, Map.of("key1", "value1"), List.of(1, 2, 3)); + + XContentBuilder actualBuilder = XContentBuilder.builder(JsonXContent.jsonXContent); + detail.toXContent(actualBuilder, ToXContent.EMPTY_PARAMS); + + String expected = "{" + + " \"processor_name\": \"testProcessor\"," + + " \"duration_millis\": 123," + + " \"input_data\": {\"key1\": \"value1\"}," + + " \"output_data\": [1, 2, 3]" + + "}"; + + XContentParser expectedParser = JsonXContent.jsonXContent.createParser( + this.xContentRegistry(), + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + expected + ); + XContentBuilder expectedBuilder = XContentBuilder.builder(JsonXContent.jsonXContent); + expectedBuilder.generator().copyCurrentStructure(expectedParser); + + assertEquals( + XContentHelper.convertToMap(BytesReference.bytes(expectedBuilder), false, (MediaType) MediaTypeRegistry.JSON), + XContentHelper.convertToMap(BytesReference.bytes(actualBuilder), false, (MediaType) MediaTypeRegistry.JSON) + ); + } + + public void testFromXContent() throws IOException { + String json = "{" + + " \"processor_name\": \"testProcessor\"," + + " \"duration_millis\": 123," + + " \"input_data\": {\"key1\": \"value1\"}," + + " \"output_data\": [1, 2, 3]" + + "}"; + + try ( + XContentParser parser = JsonXContent.jsonXContent.createParser( + this.xContentRegistry(), + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + json + ) + ) { + ProcessorExecutionDetail detail = ProcessorExecutionDetail.fromXContent(parser); + + assertEquals("testProcessor", detail.getProcessorName()); + assertEquals(123L, detail.getDurationMillis()); + assertEquals(Map.of("key1", "value1"), detail.getInputData()); + } + } +} diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java index 99461bdf5c3be..d5842351e0b74 100644 --- a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java @@ -1817,6 +1817,106 @@ public void testVerbosePipelineExecution() throws Exception { assertEquals("fixed_score", executionDetails.get(1).getProcessorName()); } + public void testVerbosePipelineWithRequestProcessorOnly() throws Exception { + SearchPipelineService searchPipelineService = createWithProcessors(); + + SearchPipelineMetadata metadata = new SearchPipelineMetadata( + Map.of( + "request_only_pipeline", + new PipelineConfiguration( + "request_only_pipeline", + new BytesArray("{" + "\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : 2 } } ]" + "}"), + MediaTypeRegistry.JSON + ) + ) + ); + + ClusterState initialState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState updatedState = ClusterState.builder(initialState) + .metadata(Metadata.builder().putCustom(SearchPipelineMetadata.TYPE, metadata)) + .build(); + + searchPipelineService.applyClusterState(new ClusterChangedEvent("clusterStateUpdated", updatedState, initialState)); + + SearchRequest searchRequest = new SearchRequest().source(SearchSourceBuilder.searchSource().size(10)) + .pipeline("request_only_pipeline"); + searchRequest.source().verbosePipeline(true); + + PipelinedRequest pipelinedRequest = syncTransformRequest( + searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver) + ); + + SearchResponseSections sections = new SearchResponseSections( + new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0.0f), + null, + null, + false, + null, + null, + 1, + List.of(), + List.of() + ); + + SearchResponse searchResponse = new SearchResponse(sections, null, 0, 0, 0, 0, null, null); + SearchResponse transformedResponse = syncTransformResponse(pipelinedRequest, searchResponse); + List executionDetails = transformedResponse.getInternalResponse().getProcessorResult(); + + assertNotNull(executionDetails); + assertEquals(1, executionDetails.size()); + assertEquals("scale_request_size", executionDetails.get(0).getProcessorName()); + } + + public void testVerbosePipelineWithResponseProcessorOnly() throws Exception { + SearchPipelineService searchPipelineService = createWithProcessors(); + + SearchPipelineMetadata metadata = new SearchPipelineMetadata( + Map.of( + "response_only_pipeline", + new PipelineConfiguration( + "response_only_pipeline", + new BytesArray("{" + "\"response_processors\": [ { \"fixed_score\": { \"score\": 5.0 } } ]" + "}"), + MediaTypeRegistry.JSON + ) + ) + ); + + ClusterState initialState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState updatedState = ClusterState.builder(initialState) + .metadata(Metadata.builder().putCustom(SearchPipelineMetadata.TYPE, metadata)) + .build(); + + searchPipelineService.applyClusterState(new ClusterChangedEvent("clusterStateUpdated", updatedState, initialState)); + + SearchRequest searchRequest = new SearchRequest().source(SearchSourceBuilder.searchSource().size(10)) + .pipeline("response_only_pipeline"); + searchRequest.source().verbosePipeline(true); + + PipelinedRequest pipelinedRequest = syncTransformRequest( + searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver) + ); + + SearchResponseSections sections = new SearchResponseSections( + new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0.0f), + null, + null, + false, + null, + null, + 1, + List.of(), + List.of() + ); + + SearchResponse searchResponse = new SearchResponse(sections, null, 0, 0, 0, 0, null, null); + SearchResponse transformedResponse = syncTransformResponse(pipelinedRequest, searchResponse); + List executionDetails = transformedResponse.getInternalResponse().getProcessorResult(); + + assertNotNull(executionDetails); + assertEquals(1, executionDetails.size()); + assertEquals("fixed_score", executionDetails.get(0).getProcessorName()); + } + private SearchSourceBuilder createDefaultSearchSourceBuilder() { return SearchSourceBuilder.searchSource().size(10); } From 500ac7ca6c815acdd9f0f1358c503664654f8810 Mon Sep 17 00:00:00 2001 From: Junwei Dai Date: Thu, 26 Dec 2024 09:57:01 -0800 Subject: [PATCH 8/8] resolve comments Signed-off-by: Junwei Dai --- CHANGELOG.md | 2 +- .../search/builder/SearchSourceBuilder.java | 2 +- .../internal/InternalSearchResponse.java | 6 +++--- .../opensearch/search/pipeline/Pipeline.java | 18 +++++++++++------- .../pipeline/PipelineProcessingContext.java | 8 +++++--- 5 files changed, 21 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7367bb5b9870b..d27f0fa592868 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,7 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add new configuration setting `synonym_analyzer`, to the `synonym` and `synonym_graph` filters, enabling the specification of a custom analyzer for reading the synonym file ([#16488](https://github.com/opensearch-project/OpenSearch/pull/16488)). - Add stats for remote publication failure and move download failure stats to remote methods([#16682](https://github.com/opensearch-project/OpenSearch/pull/16682/)) - Added a precaution to handle extreme date values during sorting to prevent `arithmetic_exception: long overflow` ([#16812](https://github.com/opensearch-project/OpenSearch/pull/16812)). -- Add `verbose_pipeline` parameter to output each processor's execution details ([#14745](https://github.com/opensearch-project/OpenSearch/pull/14745)). +- Add `verbose_pipeline` parameter to output each processor's execution details ([#16843](https://github.com/opensearch-project/OpenSearch/pull/16843)). - Add search replica stats to segment replication stats API ([#16678](https://github.com/opensearch-project/OpenSearch/pull/16678)) ### Dependencies diff --git a/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java b/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java index 301e195dda672..ebd3a3cb9b92b 100644 --- a/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java +++ b/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java @@ -1167,7 +1167,7 @@ public SearchSourceBuilder verbosePipeline(boolean verbosePipeline) { return this; } - public Boolean verbosePipeline() { + public boolean verbosePipeline() { return verbosePipeline; } diff --git a/server/src/main/java/org/opensearch/search/internal/InternalSearchResponse.java b/server/src/main/java/org/opensearch/search/internal/InternalSearchResponse.java index 532575fe097fa..0d0247542345d 100644 --- a/server/src/main/java/org/opensearch/search/internal/InternalSearchResponse.java +++ b/server/src/main/java/org/opensearch/search/internal/InternalSearchResponse.java @@ -149,12 +149,12 @@ private static void writeSearchExtBuildersOnOrAfter(StreamOutput out, List readProcessorResultOnOrAfter(StreamInput in) throws IOException { - return (in.getVersion().onOrAfter(Version.V_2_18_0)) ? in.readList(ProcessorExecutionDetail::new) : Collections.emptyList(); + return (in.getVersion().onOrAfter(Version.CURRENT)) ? in.readList(ProcessorExecutionDetail::new) : Collections.emptyList(); } private static void writeProcessorResultOnOrAfter(StreamOutput out, List processorResult) throws IOException { - if (out.getVersion().onOrAfter(Version.V_2_18_0)) { - out.writeCollection(processorResult, (o, detail) -> detail.writeTo(o)); + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeList(processorResult); } } diff --git a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java index 4600cda4e993a..c0794f42dfc79 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java +++ b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java @@ -148,15 +148,17 @@ void transformRequest(SearchRequest request, ActionListener reque SearchRequestProcessor processor = searchRequestProcessors.get(i); currentListener = ActionListener.wrap(r -> { ProcessorExecutionDetail detail = new ProcessorExecutionDetail(processor.getType()); - detail.addInput(r.source().shallowCopy()); + if (r.source().verbosePipeline()) { + detail.addInput(r.source().shallowCopy()); + } long start = relativeTimeSupplier.getAsLong(); beforeRequestProcessor(processor); processor.processRequestAsync(r, requestContext, ActionListener.wrap(rr -> { long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); afterRequestProcessor(processor, took); - detail.addOutput(rr.source().shallowCopy()); - detail.addTook(took); if (rr.source().verbosePipeline()) { + detail.addOutput(rr.source().shallowCopy()); + detail.addTook(took); requestContext.addProcessorExecutionDetail(detail); } nextListener.onResponse(rr); @@ -210,7 +212,7 @@ ActionListener transformResponseListener( ) { if (searchResponseProcessors.isEmpty()) { // No response transformation necessary - if (!requestContext.getProcessorExecutionDetails().isEmpty()) { + if (request.source() != null && request.source().verbosePipeline()) { ActionListener finalResponseListener = responseListener; return ActionListener.wrap(r -> { List details = requestContext.getProcessorExecutionDetails(); @@ -242,15 +244,17 @@ ActionListener transformResponseListener( responseListener = ActionListener.wrap(r -> { ProcessorExecutionDetail detail = new ProcessorExecutionDetail(processor.getType()); - detail.addInput(Arrays.asList(r.getHits().deepCopy().getHits())); + if (request.source().verbosePipeline()) { + detail.addInput(Arrays.asList(r.getHits().deepCopy().getHits())); + } beforeResponseProcessor(processor); final long start = relativeTimeSupplier.getAsLong(); processor.processResponseAsync(request, r, requestContext, ActionListener.wrap(rr -> { long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); afterResponseProcessor(processor, took); - detail.addOutput(Arrays.asList(rr.getHits().deepCopy().getHits())); - detail.addTook(took); if (request.source().verbosePipeline()) { + detail.addOutput(Arrays.asList(rr.getHits().deepCopy().getHits())); + detail.addTook(took); requestContext.addProcessorExecutionDetail(detail); rr.getInternalResponse().getProcessorResult().add(detail); } diff --git a/server/src/main/java/org/opensearch/search/pipeline/PipelineProcessingContext.java b/server/src/main/java/org/opensearch/search/pipeline/PipelineProcessingContext.java index af30b3cbd3dd6..52f29011725cb 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/PipelineProcessingContext.java +++ b/server/src/main/java/org/opensearch/search/pipeline/PipelineProcessingContext.java @@ -47,10 +47,12 @@ public Object getAttribute(String name) { * * @param detail the ProcessorExecutionDetail to add */ - @SuppressWarnings("unchecked") public void addProcessorExecutionDetail(ProcessorExecutionDetail detail) { - attributes.computeIfAbsent(PROCESSOR_EXECUTION_DETAILS_KEY, k -> new ArrayList()); - List details = (List) attributes.get(PROCESSOR_EXECUTION_DETAILS_KEY); + @SuppressWarnings("unchecked") + List details = (List) attributes.computeIfAbsent( + PROCESSOR_EXECUTION_DETAILS_KEY, + k -> new ArrayList<>() + ); details.add(detail); }