Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add verbose pipeline parameter to output each processor's execution details #16843

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add new configuration setting `synonym_analyzer`, to the `synonym` and `synonym_graph` filters, enabling the specification of a custom analyzer for reading the synonym file ([#16488](https://github.com/opensearch-project/OpenSearch/pull/16488)).
- Add stats for remote publication failure and move download failure stats to remote methods([#16682](https://github.com/opensearch-project/OpenSearch/pull/16682/))
- Added a precaution to handle extreme date values during sorting to prevent `arithmetic_exception: long overflow` ([#16812](https://github.com/opensearch-project/OpenSearch/pull/16812)).
- Add `verbose_pipeline` parameter to output each processor's execution details ([#14745](https://github.com/opensearch-project/OpenSearch/pull/14745)).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

link has to point to PR #16843, current one is for the issue

- Add search replica stats to segment replication stats API ([#16678](https://github.com/opensearch-project/OpenSearch/pull/16678))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.opensearch.search.aggregations.Aggregations;
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.internal.InternalSearchResponse;
import org.opensearch.search.pipeline.ProcessorExecutionDetail;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.profile.SearchProfileShardResults;
import org.opensearch.search.suggest.Suggest;
Expand All @@ -73,6 +74,7 @@
import java.util.function.Supplier;

import static org.opensearch.action.search.SearchResponseSections.EXT_FIELD;
import static org.opensearch.action.search.SearchResponseSections.PROCESSOR_RESULT_FIELD;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;

/**
Expand Down Expand Up @@ -394,6 +396,7 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
List<ShardSearchFailure> failures = new ArrayList<>();
Clusters clusters = Clusters.EMPTY;
List<SearchExtBuilder> extBuilders = new ArrayList<>();
List<ProcessorExecutionDetail> processorResult = new ArrayList<>();
for (Token token = parser.nextToken(); token != Token.END_OBJECT; token = parser.nextToken()) {
if (token == Token.FIELD_NAME) {
currentFieldName = parser.currentName();
Expand Down Expand Up @@ -517,6 +520,11 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
extBuilders.add(searchExtBuilder);
}
}
} else if (PROCESSOR_RESULT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
while ((token = parser.nextToken()) != Token.END_ARRAY) {
ProcessorExecutionDetail detail = ProcessorExecutionDetail.fromXContent(parser);
processorResult.add(detail);
}
} else {
parser.skipChildren();
}
Expand All @@ -530,7 +538,8 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
terminatedEarly,
profile,
numReducePhases,
extBuilders
extBuilders,
processorResult
);
return new SearchResponse(
searchResponseSections,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.search.SearchExtBuilder;
import org.opensearch.search.SearchHits;
import org.opensearch.search.aggregations.Aggregations;
import org.opensearch.search.pipeline.ProcessorExecutionDetail;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.profile.SearchProfileShardResults;
import org.opensearch.search.suggest.Suggest;
Expand All @@ -65,7 +66,7 @@
public class SearchResponseSections implements ToXContentFragment {

public static final ParseField EXT_FIELD = new ParseField("ext");

public static final ParseField PROCESSOR_RESULT_FIELD = new ParseField("processor_results");
protected final SearchHits hits;
protected final Aggregations aggregations;
protected final Suggest suggest;
Expand All @@ -74,6 +75,7 @@ public class SearchResponseSections implements ToXContentFragment {
protected final Boolean terminatedEarly;
protected final int numReducePhases;
protected final List<SearchExtBuilder> searchExtBuilders = new ArrayList<>();
protected final List<ProcessorExecutionDetail> processorResult = new ArrayList<>();

public SearchResponseSections(
SearchHits hits,
Expand All @@ -84,7 +86,17 @@ public SearchResponseSections(
SearchProfileShardResults profileResults,
int numReducePhases
) {
this(hits, aggregations, suggest, timedOut, terminatedEarly, profileResults, numReducePhases, Collections.emptyList());
this(
hits,
aggregations,
suggest,
timedOut,
terminatedEarly,
profileResults,
numReducePhases,
Collections.emptyList(),
Collections.emptyList()
);
}

public SearchResponseSections(
Expand All @@ -95,7 +107,8 @@ public SearchResponseSections(
Boolean terminatedEarly,
SearchProfileShardResults profileResults,
int numReducePhases,
List<SearchExtBuilder> searchExtBuilders
List<SearchExtBuilder> searchExtBuilders,
List<ProcessorExecutionDetail> processorResult
) {
this.hits = hits;
this.aggregations = aggregations;
Expand All @@ -104,6 +117,7 @@ public SearchResponseSections(
this.timedOut = timedOut;
this.terminatedEarly = terminatedEarly;
this.numReducePhases = numReducePhases;
this.processorResult.addAll(processorResult);
this.searchExtBuilders.addAll(Objects.requireNonNull(searchExtBuilders, "searchExtBuilders must not be null"));
}

Expand Down Expand Up @@ -166,13 +180,21 @@ public final XContentBuilder toXContent(XContentBuilder builder, Params params)
}
builder.endObject();
}

if (!processorResult.isEmpty()) {
builder.field(PROCESSOR_RESULT_FIELD.getPreferredName(), processorResult);
}
return builder;
}

public List<SearchExtBuilder> getSearchExtBuilders() {
return Collections.unmodifiableList(this.searchExtBuilders);
}

public List<ProcessorExecutionDetail> getProcessorResult() {
return processorResult;
}

protected void writeTo(StreamOutput out) throws IOException {
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ private static void parseSearchSource(final SearchSourceBuilder searchSourceBuil
if (request.hasParam("timeout")) {
searchSourceBuilder.timeout(request.paramAsTime("timeout", null));
}
if (request.hasParam("verbose_pipeline")) {
searchSourceBuilder.verbosePipeline(request.paramAsBoolean("verbose_pipeline", false));
}
if (request.hasParam("terminate_after")) {
int terminateAfter = request.paramAsInt("terminate_after", SearchContext.DEFAULT_TERMINATE_AFTER);
if (terminateAfter < 0) {
Expand Down
17 changes: 17 additions & 0 deletions server/src/main/java/org/opensearch/search/SearchHits.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.lucene.search.TotalHits.Relation;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -166,6 +167,22 @@ public SearchHit[] getHits() {
return this.hits;
}

/**
* Creates a deep copy of this SearchHits instance.
*
* @return a deep copy of the current SearchHits object
* @throws IOException if an I/O exception occurs during serialization or deserialization
*/
public SearchHits deepCopy() throws IOException {
junweid62 marked this conversation as resolved.
Show resolved Hide resolved
try (BytesStreamOutput out = new BytesStreamOutput()) {
this.writeTo(out);

try (StreamInput in = out.bytes().streamInput()) {
return new SearchHits(in);
}
}
}

/**
* Return the hit as the provided position.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public final class SearchSourceBuilder implements Writeable, ToXContentObject, R
public static final ParseField SLICE = new ParseField("slice");
public static final ParseField POINT_IN_TIME = new ParseField("pit");
public static final ParseField SEARCH_PIPELINE = new ParseField("search_pipeline");
public static final ParseField VERBOSE_SEARCH_PIPELINE = new ParseField("verbose_pipeline");

public static SearchSourceBuilder fromXContent(XContentParser parser) throws IOException {
return fromXContent(parser, true);
Expand Down Expand Up @@ -226,6 +227,8 @@ public static HighlightBuilder highlight() {

private String searchPipeline;

private boolean verbosePipeline;

/**
* Constructs a new search source builder.
*/
Expand Down Expand Up @@ -302,6 +305,9 @@ public SearchSourceBuilder(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_18_0)) {
searchPipeline = in.readOptionalString();
}
if (in.getVersion().onOrAfter(Version.CURRENT)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to have exact version here, CURRENT will change with every next release

verbosePipeline = in.readBoolean();
}
}

@Override
Expand Down Expand Up @@ -385,6 +391,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_18_0)) {
out.writeOptionalString(searchPipeline);
}
if (out.getVersion().onOrAfter(Version.CURRENT)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as in previous comment

out.writeOptionalBoolean(verbosePipeline);
}
}

/**
Expand Down Expand Up @@ -1142,6 +1151,26 @@ public SearchSourceBuilder pipeline(String searchPipeline) {
return this;
}

/**
* Enables or disables verbose mode for the search pipeline.
*
* When verbose mode is enabled, detailed information about each processor
* in the search pipeline is included in the search response. This includes
* the processor name, execution status, input, output, and time taken for processing.
*
* This parameter is primarily intended for debugging purposes, allowing users
* to track how data flows and transforms through the search pipeline.
*
*/
public SearchSourceBuilder verbosePipeline(boolean verbosePipeline) {
this.verbosePipeline = verbosePipeline;
return this;
}

public Boolean verbosePipeline() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why return type is a wrapper around primitive Boolean?

return verbosePipeline;
}

/**
* Rewrites this search source builder into its primitive form. e.g. by
* rewriting the QueryBuilder. If the builder did not change the identity
Expand Down Expand Up @@ -1240,6 +1269,7 @@ private SearchSourceBuilder shallowCopy(
rewrittenBuilder.derivedFieldsObject = derivedFieldsObject;
rewrittenBuilder.derivedFields = derivedFields;
rewrittenBuilder.searchPipeline = searchPipeline;
rewrittenBuilder.verbosePipeline = verbosePipeline;
return rewrittenBuilder;
}

Expand Down Expand Up @@ -1309,6 +1339,8 @@ public void parseXContent(XContentParser parser, boolean checkTrailingTokens) th
profile = parser.booleanValue();
} else if (SEARCH_PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
searchPipeline = parser.text();
} else if (VERBOSE_SEARCH_PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
verbosePipeline = parser.booleanValue();
} else {
throw new ParsingException(
parser.getTokenLocation(),
Expand Down Expand Up @@ -1920,7 +1952,8 @@ public int hashCode() {
pointInTimeBuilder,
derivedFieldsObject,
derivedFields,
searchPipeline
searchPipeline,
verbosePipeline
);
}

Expand Down Expand Up @@ -1966,7 +1999,8 @@ public boolean equals(Object obj) {
&& Objects.equals(pointInTimeBuilder, other.pointInTimeBuilder)
&& Objects.equals(derivedFieldsObject, other.derivedFieldsObject)
&& Objects.equals(derivedFields, other.derivedFields)
&& Objects.equals(searchPipeline, other.searchPipeline);
&& Objects.equals(searchPipeline, other.searchPipeline)
&& Objects.equals(verbosePipeline, other.verbosePipeline);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.search.SearchExtBuilder;
import org.opensearch.search.SearchHits;
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.pipeline.ProcessorExecutionDetail;
import org.opensearch.search.profile.SearchProfileShardResults;
import org.opensearch.search.suggest.Suggest;

Expand Down Expand Up @@ -73,7 +74,17 @@ public InternalSearchResponse(
Boolean terminatedEarly,
int numReducePhases
) {
this(hits, aggregations, suggest, profileResults, timedOut, terminatedEarly, numReducePhases, Collections.emptyList());
this(
hits,
aggregations,
suggest,
profileResults,
timedOut,
terminatedEarly,
numReducePhases,
Collections.emptyList(),
Collections.emptyList()
);
}

public InternalSearchResponse(
Expand All @@ -84,9 +95,20 @@ public InternalSearchResponse(
boolean timedOut,
Boolean terminatedEarly,
int numReducePhases,
List<SearchExtBuilder> searchExtBuilderList
List<SearchExtBuilder> searchExtBuilderList,
List<ProcessorExecutionDetail> processorResult
) {
super(hits, aggregations, suggest, timedOut, terminatedEarly, profileResults, numReducePhases, searchExtBuilderList);
super(
hits,
aggregations,
suggest,
timedOut,
terminatedEarly,
profileResults,
numReducePhases,
searchExtBuilderList,
processorResult
);
}

public InternalSearchResponse(StreamInput in) throws IOException {
Expand All @@ -98,7 +120,8 @@ public InternalSearchResponse(StreamInput in) throws IOException {
in.readOptionalBoolean(),
in.readOptionalWriteable(SearchProfileShardResults::new),
in.readVInt(),
readSearchExtBuildersOnOrAfter(in)
readSearchExtBuildersOnOrAfter(in),
readProcessorResultOnOrAfter(in)
);
}

Expand All @@ -112,6 +135,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(profileResults);
out.writeVInt(numReducePhases);
writeSearchExtBuildersOnOrAfter(out, searchExtBuilders);
writeProcessorResultOnOrAfter(out, processorResult);
}

private static List<SearchExtBuilder> readSearchExtBuildersOnOrAfter(StreamInput in) throws IOException {
Expand All @@ -123,4 +147,15 @@ private static void writeSearchExtBuildersOnOrAfter(StreamOutput out, List<Searc
out.writeNamedWriteableList(searchExtBuilders);
}
}

private static List<ProcessorExecutionDetail> readProcessorResultOnOrAfter(StreamInput in) throws IOException {
return (in.getVersion().onOrAfter(Version.V_2_18_0)) ? in.readList(ProcessorExecutionDetail::new) : Collections.emptyList();
}

private static void writeProcessorResultOnOrAfter(StreamOutput out, List<ProcessorExecutionDetail> processorResult) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_18_0)) {
out.writeCollection(processorResult, (o, detail) -> detail.writeTo(o));
junweid62 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to Josh's comment, please use out.writeList()

}
}

}
Loading
Loading