Skip to content

Commit

Permalink
Fix failing tests and review changes
Browse files Browse the repository at this point in the history
Signed-off-by: Shreyansh Ray <[email protected]>
  • Loading branch information
rayshrey committed Oct 3, 2023
1 parent f7f3374 commit 530c15d
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 65 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Tracing Framework] Add support for SpanKind. ([#10122](https://github.com/opensearch-project/OpenSearch/pull/10122))
- Pass parent filter to inner query in nested query ([#10246](https://github.com/opensearch-project/OpenSearch/pull/10246))
- Disable concurrent segment search when terminate_after is used ([#10200](https://github.com/opensearch-project/OpenSearch/pull/10200))
- Add instrumentation for indexing in transport bulk action and transport shard bulk action. ([#10273](https://github.com/opensearch-project/OpenSearch/pull/10273))

### Deprecated

Expand All @@ -141,4 +142,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -654,58 +654,59 @@ protected void doRun() {
);

final Span span = tracer.startSpan(SpanBuilder.from("bulkShardAction", nodeId, bulkShardRequest));
ActionListener traceableActionListener = TraceableActionListener.create(
ActionListener.runBefore(new ActionListener<BulkShardResponse>() {
@Override
public void onResponse(BulkShardResponse bulkShardResponse) {
for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
// we may have no response if item failed
if (bulkItemResponse.getResponse() != null) {
bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
}
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
shardBulkAction.execute(
bulkShardRequest,
TraceableActionListener.create(ActionListener.runBefore(new ActionListener<BulkShardResponse>() {
@Override
public void onResponse(BulkShardResponse bulkShardResponse) {
for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
// we may have no response if item failed
if (bulkItemResponse.getResponse() != null) {
bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
}

docStatusStats.inc(bulkItemResponse.status());
responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
}
docStatusStats.inc(bulkItemResponse.status());
responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
}

if (counter.decrementAndGet() == 0) {
finishHim();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
}

@Override
public void onFailure(Exception e) {
// create failures for all relevant requests
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
final DocWriteRequest<?> docWriteRequest = request.request();
final BulkItemResponse bulkItemResponse = new BulkItemResponse(
request.id(),
docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)
);
@Override
public void onFailure(Exception e) {
// create failures for all relevant requests
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
final DocWriteRequest<?> docWriteRequest = request.request();
final BulkItemResponse bulkItemResponse = new BulkItemResponse(
request.id(),
docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)
);

docStatusStats.inc(bulkItemResponse.status());
responses.set(request.id(), bulkItemResponse);
}

docStatusStats.inc(bulkItemResponse.status());
responses.set(request.id(), bulkItemResponse);
if (counter.decrementAndGet() == 0) {
finishHim();
}
}

if (counter.decrementAndGet() == 0) {
finishHim();
private void finishHim() {
indicesService.addDocStatusStats(docStatusStats);
listener.onResponse(
new BulkResponse(
responses.toArray(new BulkItemResponse[responses.length()]),
buildTookInMillis(startTimeNanos)
)
);
}
}

private void finishHim() {
indicesService.addDocStatusStats(docStatusStats);
listener.onResponse(
new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos))
);
}
}, releasable::close),
span,
tracer
);
try(SpanScope spanScope = tracer.withSpanInScope(span)) {
shardBulkAction.execute(bulkShardRequest, traceableActionListener);
}, releasable::close), span, tracer)
);
}
}
bulkRequest = null; // allow memory for bulk request items to be reclaimed before all items have been completed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,10 @@ protected void handlePrimaryRequest(final ConcreteShardRequest<Request> request,
releasable::close
);

final Span span = getTracer().startSpan(SpanBuilder.from("shardPrimaryWrite", clusterService.localNode().getId(), request.getRequest().shardId()));
try(SpanScope spanScope = getTracer().withSpanInScope(span)) {
final Span span = getTracer().startSpan(
SpanBuilder.from("shardPrimaryWrite", clusterService.localNode().getId(), request.getRequest().shardId())
);
try (SpanScope spanScope = getTracer().withSpanInScope(span)) {
new AsyncPrimaryAction(request, TraceableActionListener.create(listener, span, getTracer()), (ReplicationTask) task).run();
} catch (RuntimeException e) {
listener.onFailure(e);
Expand Down Expand Up @@ -698,10 +700,12 @@ protected void handleReplicaRequest(
releasable::close
);

final Span span = getTracer().startSpan(SpanBuilder.from(
"shardReplicaWrite", clusterService.localNode().getId(), replicaRequest.getRequest().shardId()));
try(SpanScope spanScope = getTracer().withSpanInScope(span)) {
new AsyncReplicaAction(replicaRequest, TraceableActionListener.create(listener, span, getTracer()), (ReplicationTask) task).run();
final Span span = getTracer().startSpan(
SpanBuilder.from("shardReplicaWrite", clusterService.localNode().getId(), replicaRequest.getRequest().shardId())
);
try (SpanScope spanScope = getTracer().withSpanInScope(span)) {
new AsyncReplicaAction(replicaRequest, TraceableActionListener.create(listener, span, getTracer()), (ReplicationTask) task)
.run();
} catch (RuntimeException e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private AttributeNames() {
/**
* Number of request items in bulk request
*/
public static final String NUM_REQUEST_ITEMS ="num_request_items";
public static final String NUM_BULK_ITEMS = "num_bulk_items";

/**
* Node ID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,15 @@ private static Attributes buildSpanAttributes(String action, Transport.Connectio

private static Attributes buildSpanAttributes(String nodeId, BulkShardRequest bulkShardRequest) {
Attributes attributes = buildSpanAttributes(nodeId, bulkShardRequest.shardId());
attributes.addAttribute(AttributeNames.NUM_REQUEST_ITEMS, bulkShardRequest.items().length);
attributes.addAttribute(AttributeNames.NUM_BULK_ITEMS, bulkShardRequest.items().length);
return attributes;
}

private static Attributes buildSpanAttributes(String nodeId, ShardId shardId) {
Attributes attributes = Attributes.create()
.addAttribute(AttributeNames.NODE_ID, nodeId)
.addAttribute(AttributeNames.INDEX, (shardId!=null)?shardId.getIndexName():"NULL")
.addAttribute(AttributeNames.SHARD_ID, (shardId!=null)?shardId.getId():-1);
.addAttribute(AttributeNames.INDEX, (shardId != null) ? shardId.getIndexName() : "NULL")
.addAttribute(AttributeNames.SHARD_ID, (shardId != null) ? shardId.getId() : -1);
return attributes;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.index.VersionType;
import org.opensearch.indices.SystemIndices;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.VersionUtils;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -155,7 +156,8 @@ private void indicesThatCannotBeCreatedTestCase(
new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null)
),
null,
new SystemIndices(emptyMap())
new SystemIndices(emptyMap()),
NoopTracer.INSTANCE
) {
@Override
void executeBulk(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.opensearch.indices.SystemIndices;
import org.opensearch.ingest.IngestService;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.VersionUtils;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -172,7 +173,8 @@ class TestTransportBulkAction extends TransportBulkAction {
new ClusterService(SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null)
),
null,
new SystemIndices(emptyMap())
new SystemIndices(emptyMap()),
NoopTracer.INSTANCE
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ class TestTransportBulkAction extends TransportBulkAction {
new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver(), new SystemIndices(emptyMap())),
new IndexingPressureService(Settings.EMPTY, clusterService),
mock(IndicesService.class),
new SystemIndices(emptyMap())
new SystemIndices(emptyMap()),
NoopTracer.INSTANCE
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ static class TestTransportBulkAction extends TransportBulkAction {
new IndexingPressureService(Settings.EMPTY, clusterService),
null,
new SystemIndices(emptyMap()),
relativeTimeProvider
relativeTimeProvider,
NoopTracer.INSTANCE
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndices;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPool.Names;
Expand Down Expand Up @@ -1074,7 +1075,8 @@ public void testHandlePrimaryTermValidationRequestWithDifferentAllocationId() {
mock(IndexingPressureService.class),
mock(SegmentReplicationPressureService.class),
mock(RemoteStorePressureService.class),
mock(SystemIndices.class)
mock(SystemIndices.class),
NoopTracer.INSTANCE
);
action.handlePrimaryTermValidationRequest(
new TransportShardBulkAction.PrimaryTermValidationRequest(aId + "-1", 1, shardId),
Expand Down Expand Up @@ -1105,7 +1107,8 @@ public void testHandlePrimaryTermValidationRequestWithOlderPrimaryTerm() {
mock(IndexingPressureService.class),
mock(SegmentReplicationPressureService.class),
mock(RemoteStorePressureService.class),
mock(SystemIndices.class)
mock(SystemIndices.class),
NoopTracer.INSTANCE
);
action.handlePrimaryTermValidationRequest(
new TransportShardBulkAction.PrimaryTermValidationRequest(aId, 1, shardId),
Expand Down Expand Up @@ -1136,7 +1139,8 @@ public void testHandlePrimaryTermValidationRequestSuccess() {
mock(IndexingPressureService.class),
mock(SegmentReplicationPressureService.class),
mock(RemoteStorePressureService.class),
mock(SystemIndices.class)
mock(SystemIndices.class),
NoopTracer.INSTANCE
);
action.handlePrimaryTermValidationRequest(
new TransportShardBulkAction.PrimaryTermValidationRequest(aId, 1, shardId),
Expand Down Expand Up @@ -1178,7 +1182,8 @@ private TransportShardBulkAction createAction() {
mock(IndexingPressureService.class),
mock(SegmentReplicationPressureService.class),
mock(RemoteStorePressureService.class),
mock(SystemIndices.class)
mock(SystemIndices.class),
NoopTracer.INSTANCE
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2187,7 +2187,8 @@ public void onFailure(final Exception e) {
mock(ThreadPool.class)
),
mock(RemoteStorePressureService.class),
new SystemIndices(emptyMap())
new SystemIndices(emptyMap()),
NoopTracer.INSTANCE
);
actions.put(
BulkAction.INSTANCE,
Expand All @@ -2211,7 +2212,8 @@ public void onFailure(final Exception e) {
new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, new SystemIndices(emptyMap())),
new IndexingPressureService(settings, clusterService),
mock(IndicesService.class),
new SystemIndices(emptyMap())
new SystemIndices(emptyMap()),
NoopTracer.INSTANCE
)
);
final RestoreService restoreService = new RestoreService(
Expand Down

0 comments on commit 530c15d

Please sign in to comment.