Skip to content

Commit

Permalink
Request level latency tracking
Browse files Browse the repository at this point in the history
Signed-off-by: David Zane <[email protected]>
  • Loading branch information
dzane17 committed Oct 3, 2023
1 parent beb25b1 commit 65c3905
Show file tree
Hide file tree
Showing 13 changed files with 402 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ public final void start() {
0,
0,
buildTookInMillis(),
timeProvider.getPhaseTook(),
ShardSearchFailure.EMPTY_ARRAY,
clusters,
null
Expand Down Expand Up @@ -662,6 +663,7 @@ protected final SearchResponse buildSearchResponse(
successfulOps.get(),
skippedOps.get(),
buildTookInMillis(),
timeProvider.getPhaseTook(),
failures,
clusters,
searchContextId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ public static void readMultiLineFormat(
} else if ("cancel_after_time_interval".equals(entry.getKey())
|| "cancelAfterTimeInterval".equals(entry.getKey())) {
searchRequest.setCancelAfterTimeInterval(nodeTimeValue(value, null));
} else if ("phase_took".equals(entry.getKey())) {
searchRequest.setPhaseTook(SearchRequest.parseParamValue(value));
} else {
throw new IllegalArgumentException("key [" + entry.getKey() + "] is not supported in the metadata section");
}
Expand Down Expand Up @@ -374,6 +376,9 @@ public static void writeSearchRequestParams(SearchRequest request, XContentBuild
if (request.getCancelAfterTimeInterval() != null) {
xContentBuilder.field("cancel_after_time_interval", request.getCancelAfterTimeInterval().getStringRep());
}
if (request.isPhaseTook() != null) {
xContentBuilder.field("phase_took", request.isPhaseTook());
}
xContentBuilder.endObject();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla

private String pipeline;

private Boolean phaseTook = null;

public SearchRequest() {
this.localClusterAlias = null;
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
Expand Down Expand Up @@ -209,6 +211,7 @@ private SearchRequest(
this.absoluteStartMillis = absoluteStartMillis;
this.finalReduce = finalReduce;
this.cancelAfterTimeInterval = searchRequest.cancelAfterTimeInterval;
this.phaseTook = searchRequest.phaseTook;
}

/**
Expand Down Expand Up @@ -253,6 +256,9 @@ public SearchRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_7_0)) {
pipeline = in.readOptionalString();
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
phaseTook = in.readOptionalBoolean();
}
}

@Override
Expand Down Expand Up @@ -284,6 +290,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_7_0)) {
out.writeOptionalString(pipeline);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalBoolean(phaseTook);
}
}

@Override
Expand Down Expand Up @@ -615,6 +624,43 @@ public void setPreFilterShardSize(int preFilterShardSize) {
this.preFilterShardSize = preFilterShardSize;
}

enum ParamValue {
TRUE,
FALSE,
UNSET
}

public static Boolean parseParamValue(Object str) {
if ("TRUE".equals(str)) {
return true;
} else if ("FALSE".equals(str)) {
return false;
} else {
return null;
}
}

/**
* Returns value of user-provided phase_took query parameter for this search request.
* Defaults to <code>false</code>.
*/
public ParamValue isPhaseTook() {
if (phaseTook == null) {
return ParamValue.UNSET;
} else if (phaseTook == true) {
return ParamValue.TRUE;
} else {
return ParamValue.FALSE;
}
}

/**
* Sets value of phase_took query param if provided by user. Defaults to <code>null</code>.
*/
public void setPhaseTook(Boolean phaseTook) {
this.phaseTook = phaseTook;
}

/**
* Returns a threshold that enforces a pre-filter roundtrip to pre-filter search shards based on query rewriting if the number of shards
* the search request expands to exceeds the threshold, or <code>null</code> if the threshold is unspecified.
Expand Down Expand Up @@ -719,7 +765,8 @@ public boolean equals(Object o) {
&& absoluteStartMillis == that.absoluteStartMillis
&& ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips
&& Objects.equals(cancelAfterTimeInterval, that.cancelAfterTimeInterval)
&& Objects.equals(pipeline, that.pipeline);
&& Objects.equals(pipeline, that.pipeline)
&& Objects.equals(phaseTook, that.phaseTook);
}

@Override
Expand All @@ -740,7 +787,8 @@ public int hashCode() {
localClusterAlias,
absoluteStartMillis,
ccsMinimizeRoundtrips,
cancelAfterTimeInterval
cancelAfterTimeInterval,
phaseTook
);
}

Expand Down Expand Up @@ -783,6 +831,8 @@ public String toString() {
+ cancelAfterTimeInterval
+ ", pipeline="
+ pipeline
+ ", phaseTook="
+ phaseTook
+ "}";
}
}
133 changes: 132 additions & 1 deletion server/src/main/java/org/opensearch/action/search/SearchResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.action.search;

import org.apache.lucene.search.TotalHits;
import org.opensearch.Version;
import org.opensearch.common.Nullable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.StatusToXContentObject;
Expand Down Expand Up @@ -63,7 +64,9 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
Expand Down Expand Up @@ -94,6 +97,7 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
private final ShardSearchFailure[] shardFailures;
private final Clusters clusters;
private final long tookInMillis;
private final PhaseTook phaseTook;

public SearchResponse(StreamInput in) throws IOException {
super(in);
Expand All @@ -112,6 +116,11 @@ public SearchResponse(StreamInput in) throws IOException {
clusters = new Clusters(in);
scrollId = in.readOptionalString();
tookInMillis = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
phaseTook = in.readOptionalWriteable(PhaseTook::new);
} else {
phaseTook = null;
}
skippedShards = in.readVInt();
pointInTimeId = in.readOptionalString();
}
Expand All @@ -126,7 +135,32 @@ public SearchResponse(
ShardSearchFailure[] shardFailures,
Clusters clusters
) {
this(internalResponse, scrollId, totalShards, successfulShards, skippedShards, tookInMillis, shardFailures, clusters, null);
this(internalResponse, scrollId, totalShards, successfulShards, skippedShards, tookInMillis, null, shardFailures, clusters, null);
}

public SearchResponse(
SearchResponseSections internalResponse,
String scrollId,
int totalShards,
int successfulShards,
int skippedShards,
long tookInMillis,
ShardSearchFailure[] shardFailures,
Clusters clusters,
String pointInTimeId
) {
this(
internalResponse,
scrollId,
totalShards,
successfulShards,
skippedShards,
tookInMillis,
null,
shardFailures,
clusters,
pointInTimeId
);
}

public SearchResponse(
Expand All @@ -136,6 +170,7 @@ public SearchResponse(
int successfulShards,
int skippedShards,
long tookInMillis,
PhaseTook phaseTook,
ShardSearchFailure[] shardFailures,
Clusters clusters,
String pointInTimeId
Expand All @@ -148,6 +183,7 @@ public SearchResponse(
this.successfulShards = successfulShards;
this.skippedShards = skippedShards;
this.tookInMillis = tookInMillis;
this.phaseTook = phaseTook;
this.shardFailures = shardFailures;
assert skippedShards <= totalShards : "skipped: " + skippedShards + " total: " + totalShards;
assert scrollId == null || pointInTimeId == null : "SearchResponse can't have both scrollId ["
Expand Down Expand Up @@ -210,6 +246,13 @@ public TimeValue getTook() {
return new TimeValue(tookInMillis);
}

/**
* How long the request took in each search phase.
*/
public PhaseTook getPhaseTook() {
return phaseTook;
}

/**
* The total number of shards the search was executed on.
*/
Expand Down Expand Up @@ -298,6 +341,9 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
builder.field(POINT_IN_TIME_ID.getPreferredName(), pointInTimeId);
}
builder.field(TOOK.getPreferredName(), tookInMillis);
if (phaseTook != null) {
phaseTook.toXContent(builder, params);
}
builder.field(TIMED_OUT.getPreferredName(), isTimedOut());
if (isTerminatedEarly() != null) {
builder.field(TERMINATED_EARLY.getPreferredName(), isTerminatedEarly());
Expand Down Expand Up @@ -337,6 +383,7 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
Boolean terminatedEarly = null;
int numReducePhases = 1;
long tookInMillis = -1;
PhaseTook phaseTook = null;
int successfulShards = -1;
int totalShards = -1;
int skippedShards = 0; // 0 for BWC
Expand Down Expand Up @@ -401,6 +448,24 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
parser.skipChildren();
}
}
} else if (PhaseTook.PHASE_TOOK.match(currentFieldName, parser.getDeprecationHandler())) {
Map<String, Long> phaseTookMap = new HashMap<>();

while ((token = parser.nextToken()) != Token.END_OBJECT) {
if (token == Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
try {
SearchPhaseName.valueOf(currentFieldName.toUpperCase(Locale.ROOT));
phaseTookMap.put(currentFieldName, parser.longValue());
} catch (final IllegalArgumentException ex) {
parser.skipChildren();
}
} else {
parser.skipChildren();
}
}
phaseTook = new PhaseTook(phaseTookMap);
} else if (Clusters._CLUSTERS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
int successful = -1;
int total = -1;
Expand Down Expand Up @@ -472,6 +537,7 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
successfulShards,
skippedShards,
tookInMillis,
phaseTook,
failures.toArray(ShardSearchFailure.EMPTY_ARRAY),
clusters,
searchContextId
Expand All @@ -491,6 +557,9 @@ public void writeTo(StreamOutput out) throws IOException {
clusters.writeTo(out);
out.writeOptionalString(scrollId);
out.writeVLong(tookInMillis);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalWriteable(phaseTook);
}
out.writeVInt(skippedShards);
out.writeOptionalString(pointInTimeId);
}
Expand Down Expand Up @@ -604,6 +673,68 @@ public String toString() {
}
}

/**
* Holds info about the clusters that the search was executed on: how many in total, how many of them were successful
* and how many of them were skipped.
*
* @opensearch.internal
*/
public static class PhaseTook implements ToXContentFragment, Writeable {
static final ParseField PHASE_TOOK = new ParseField("phase_took");
private final Map<String, Long> phaseTookMap;

public PhaseTook(Map<String, Long> phaseTookMap) {
this.phaseTookMap = phaseTookMap;
}

private PhaseTook(StreamInput in) throws IOException {
this(in.readMap(StreamInput::readString, StreamInput::readLong));
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(phaseTookMap, StreamOutput::writeString, StreamOutput::writeLong);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(PHASE_TOOK.getPreferredName());

for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
if (phaseTookMap.containsKey(searchPhaseName.getName())) {
builder.field(searchPhaseName.getName(), phaseTookMap.get(searchPhaseName.getName()));
} else {
builder.field(searchPhaseName.getName(), 0);
}
}
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PhaseTook phaseTook = (PhaseTook) o;

for (String searchPhaseNameString : phaseTookMap.keySet()) {
if (!phaseTookMap.get(searchPhaseNameString).equals(phaseTook.phaseTookMap.get(searchPhaseNameString))) {
return false;
}
}
return true;
}

@Override
public int hashCode() {
return Objects.hash(phaseTookMap);
}
}

static SearchResponse empty(Supplier<Long> tookInMillisSupplier, Clusters clusters) {
SearchHits searchHits = new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), Float.NaN);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ SearchResponse getMergedResponse(SearchResponse.Clusters clusters) {
successfulShards,
skippedShards,
tookInMillis,
searchTimeProvider.getPhaseTook(),
shardFailures,
clusters,
null
Expand Down
Loading

0 comments on commit 65c3905

Please sign in to comment.