diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a37866012ba1..eea7acde5ec0f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -124,6 +124,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Tracing Framework] Add support for SpanKind. ([#10122](https://github.com/opensearch-project/OpenSearch/pull/10122)) - Pass parent filter to inner query in nested query ([#10246](https://github.com/opensearch-project/OpenSearch/pull/10246)) - Disable concurrent segment search when terminate_after is used ([#10200](https://github.com/opensearch-project/OpenSearch/pull/10200)) +- Add instrumentation for indexing in transport bulk action and transport shard bulk action. ([#10273](https://github.com/opensearch-project/OpenSearch/pull/10273)) ### Deprecated @@ -141,4 +142,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Security [Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD -[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x \ No newline at end of file +[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java index bfd4f48a7748d..6ce51ee136861 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java @@ -654,58 +654,59 @@ protected void doRun() { ); final Span span = tracer.startSpan(SpanBuilder.from("bulkShardAction", nodeId, bulkShardRequest)); - ActionListener traceableActionListener = TraceableActionListener.create( - ActionListener.runBefore(new ActionListener() { - @Override - public void onResponse(BulkShardResponse bulkShardResponse) { - for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) { - // we may have no response if item failed - if (bulkItemResponse.getResponse() != null) { - bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo()); - } + try (SpanScope spanScope = tracer.withSpanInScope(span)) { + shardBulkAction.execute( + bulkShardRequest, + TraceableActionListener.create(ActionListener.runBefore(new ActionListener() { + @Override + public void onResponse(BulkShardResponse bulkShardResponse) { + for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) { + // we may have no response if item failed + if (bulkItemResponse.getResponse() != null) { + bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo()); + } - docStatusStats.inc(bulkItemResponse.status()); - responses.set(bulkItemResponse.getItemId(), bulkItemResponse); - } + docStatusStats.inc(bulkItemResponse.status()); + responses.set(bulkItemResponse.getItemId(), bulkItemResponse); + } - if (counter.decrementAndGet() == 0) { - finishHim(); + if (counter.decrementAndGet() == 0) { + finishHim(); + } } - } - @Override - public void onFailure(Exception e) { - // create failures for all relevant requests - for (BulkItemRequest request : requests) { - final String indexName = concreteIndices.getConcreteIndex(request.index()).getName(); - final DocWriteRequest docWriteRequest = request.request(); - final BulkItemResponse bulkItemResponse = new BulkItemResponse( - request.id(), - docWriteRequest.opType(), - new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e) - ); + @Override + public void onFailure(Exception e) { + // create failures for all relevant requests + for (BulkItemRequest request : requests) { + final String indexName = concreteIndices.getConcreteIndex(request.index()).getName(); + final DocWriteRequest docWriteRequest = request.request(); + final BulkItemResponse bulkItemResponse = new BulkItemResponse( + request.id(), + docWriteRequest.opType(), + new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e) + ); + + docStatusStats.inc(bulkItemResponse.status()); + responses.set(request.id(), bulkItemResponse); + } - docStatusStats.inc(bulkItemResponse.status()); - responses.set(request.id(), bulkItemResponse); + if (counter.decrementAndGet() == 0) { + finishHim(); + } } - if (counter.decrementAndGet() == 0) { - finishHim(); + private void finishHim() { + indicesService.addDocStatusStats(docStatusStats); + listener.onResponse( + new BulkResponse( + responses.toArray(new BulkItemResponse[responses.length()]), + buildTookInMillis(startTimeNanos) + ) + ); } - } - - private void finishHim() { - indicesService.addDocStatusStats(docStatusStats); - listener.onResponse( - new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)) - ); - } - }, releasable::close), - span, - tracer - ); - try(SpanScope spanScope = tracer.withSpanInScope(span)) { - shardBulkAction.execute(bulkShardRequest, traceableActionListener); + }, releasable::close), span, tracer) + ); } } bulkRequest = null; // allow memory for bulk request items to be reclaimed before all items have been completed diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java index 55f874217b0c7..68598cad802b3 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java @@ -416,8 +416,10 @@ protected void handlePrimaryRequest(final ConcreteShardRequest request, releasable::close ); - final Span span = getTracer().startSpan(SpanBuilder.from("shardPrimaryWrite", clusterService.localNode().getId(), request.getRequest().shardId())); - try(SpanScope spanScope = getTracer().withSpanInScope(span)) { + final Span span = getTracer().startSpan( + SpanBuilder.from("shardPrimaryWrite", clusterService.localNode().getId(), request.getRequest().shardId()) + ); + try (SpanScope spanScope = getTracer().withSpanInScope(span)) { new AsyncPrimaryAction(request, TraceableActionListener.create(listener, span, getTracer()), (ReplicationTask) task).run(); } catch (RuntimeException e) { listener.onFailure(e); @@ -698,10 +700,12 @@ protected void handleReplicaRequest( releasable::close ); - final Span span = getTracer().startSpan(SpanBuilder.from( - "shardReplicaWrite", clusterService.localNode().getId(), replicaRequest.getRequest().shardId())); - try(SpanScope spanScope = getTracer().withSpanInScope(span)) { - new AsyncReplicaAction(replicaRequest, TraceableActionListener.create(listener, span, getTracer()), (ReplicationTask) task).run(); + final Span span = getTracer().startSpan( + SpanBuilder.from("shardReplicaWrite", clusterService.localNode().getId(), replicaRequest.getRequest().shardId()) + ); + try (SpanScope spanScope = getTracer().withSpanInScope(span)) { + new AsyncReplicaAction(replicaRequest, TraceableActionListener.create(listener, span, getTracer()), (ReplicationTask) task) + .run(); } catch (RuntimeException e) { listener.onFailure(e); } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java b/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java index d459d9bbcf500..dad5e51d16788 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java @@ -78,7 +78,7 @@ private AttributeNames() { /** * Number of request items in bulk request */ - public static final String NUM_REQUEST_ITEMS ="num_request_items"; + public static final String NUM_BULK_ITEMS = "num_bulk_items"; /** * Node ID diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java b/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java index 13d3210368dd8..70ff6e7a8360c 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java @@ -139,15 +139,15 @@ private static Attributes buildSpanAttributes(String action, Transport.Connectio private static Attributes buildSpanAttributes(String nodeId, BulkShardRequest bulkShardRequest) { Attributes attributes = buildSpanAttributes(nodeId, bulkShardRequest.shardId()); - attributes.addAttribute(AttributeNames.NUM_REQUEST_ITEMS, bulkShardRequest.items().length); + attributes.addAttribute(AttributeNames.NUM_BULK_ITEMS, bulkShardRequest.items().length); return attributes; } private static Attributes buildSpanAttributes(String nodeId, ShardId shardId) { Attributes attributes = Attributes.create() .addAttribute(AttributeNames.NODE_ID, nodeId) - .addAttribute(AttributeNames.INDEX, (shardId!=null)?shardId.getIndexName():"NULL") - .addAttribute(AttributeNames.SHARD_ID, (shardId!=null)?shardId.getId():-1); + .addAttribute(AttributeNames.INDEX, (shardId != null) ? shardId.getIndexName() : "NULL") + .addAttribute(AttributeNames.SHARD_ID, (shardId != null) ? shardId.getId() : -1); return attributes; } diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index 0f67eff26cbde..cf7080ab2fc06 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -54,6 +54,7 @@ import org.opensearch.index.VersionType; import org.opensearch.indices.SystemIndices; import org.opensearch.tasks.Task; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; import org.opensearch.threadpool.ThreadPool; @@ -155,7 +156,8 @@ private void indicesThatCannotBeCreatedTestCase( new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) ), null, - new SystemIndices(emptyMap()) + new SystemIndices(emptyMap()), + NoopTracer.INSTANCE ) { @Override void executeBulk( diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java index 515f6eae28a34..141c630b94020 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java @@ -70,6 +70,7 @@ import org.opensearch.indices.SystemIndices; import org.opensearch.ingest.IngestService; import org.opensearch.tasks.Task; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; import org.opensearch.threadpool.ThreadPool; @@ -172,7 +173,8 @@ class TestTransportBulkAction extends TransportBulkAction { new ClusterService(SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) ), null, - new SystemIndices(emptyMap()) + new SystemIndices(emptyMap()), + NoopTracer.INSTANCE ); } diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java index 10cad6fb147a2..6bbd740df7f9c 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java @@ -118,7 +118,8 @@ class TestTransportBulkAction extends TransportBulkAction { new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver(), new SystemIndices(emptyMap())), new IndexingPressureService(Settings.EMPTY, clusterService), mock(IndicesService.class), - new SystemIndices(emptyMap()) + new SystemIndices(emptyMap()), + NoopTracer.INSTANCE ); } diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java index 852e3837e1e7a..9d5b4430ea395 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java @@ -282,7 +282,8 @@ static class TestTransportBulkAction extends TransportBulkAction { new IndexingPressureService(Settings.EMPTY, clusterService), null, new SystemIndices(emptyMap()), - relativeTimeProvider + relativeTimeProvider, + NoopTracer.INSTANCE ); } diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java index fe0fdd07025d9..b325cfa197933 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java @@ -88,6 +88,7 @@ import org.opensearch.index.translog.Translog; import org.opensearch.indices.IndicesService; import org.opensearch.indices.SystemIndices; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.threadpool.ThreadPool.Names; @@ -1074,7 +1075,8 @@ public void testHandlePrimaryTermValidationRequestWithDifferentAllocationId() { mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), mock(RemoteStorePressureService.class), - mock(SystemIndices.class) + mock(SystemIndices.class), + NoopTracer.INSTANCE ); action.handlePrimaryTermValidationRequest( new TransportShardBulkAction.PrimaryTermValidationRequest(aId + "-1", 1, shardId), @@ -1105,7 +1107,8 @@ public void testHandlePrimaryTermValidationRequestWithOlderPrimaryTerm() { mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), mock(RemoteStorePressureService.class), - mock(SystemIndices.class) + mock(SystemIndices.class), + NoopTracer.INSTANCE ); action.handlePrimaryTermValidationRequest( new TransportShardBulkAction.PrimaryTermValidationRequest(aId, 1, shardId), @@ -1136,7 +1139,8 @@ public void testHandlePrimaryTermValidationRequestSuccess() { mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), mock(RemoteStorePressureService.class), - mock(SystemIndices.class) + mock(SystemIndices.class), + NoopTracer.INSTANCE ); action.handlePrimaryTermValidationRequest( new TransportShardBulkAction.PrimaryTermValidationRequest(aId, 1, shardId), @@ -1178,7 +1182,8 @@ private TransportShardBulkAction createAction() { mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), mock(RemoteStorePressureService.class), - mock(SystemIndices.class) + mock(SystemIndices.class), + NoopTracer.INSTANCE ); } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 09f5c1bea1a5e..a3fb7795afb44 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2187,7 +2187,8 @@ public void onFailure(final Exception e) { mock(ThreadPool.class) ), mock(RemoteStorePressureService.class), - new SystemIndices(emptyMap()) + new SystemIndices(emptyMap()), + NoopTracer.INSTANCE ); actions.put( BulkAction.INSTANCE, @@ -2211,7 +2212,8 @@ public void onFailure(final Exception e) { new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, new SystemIndices(emptyMap())), new IndexingPressureService(settings, clusterService), mock(IndicesService.class), - new SystemIndices(emptyMap()) + new SystemIndices(emptyMap()), + NoopTracer.INSTANCE ) ); final RestoreService restoreService = new RestoreService(