Skip to content

Commit

Permalink
Finish netty span when request is cancelled (#7900)
Browse files Browse the repository at this point in the history
* Finish netty span when request is cancelled

* Propagate channelInactive to next handler
  • Loading branch information
amarziali authored Nov 8, 2024
1 parent 3a207ff commit 9e8d0dc
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,25 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
DECORATE.onError(span, throwable);
DECORATE.beforeFinish(span);
span.finish(); // Finish the span manually since finishSpanOnClose was false
ctx.channel().attr(SPAN_ATTRIBUTE_KEY).remove();
throw throwable;
}
}
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
try {
super.channelInactive(ctx);
} finally {
try {
final AgentSpan span = ctx.channel().attr(SPAN_ATTRIBUTE_KEY).getAndRemove();
if (span != null && span.phasedFinish()) {
// at this point we can just publish this span to avoid loosing the rest of the trace
span.publish();
}
} catch (final Throwable ignored) {
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann
DECORATE.onError(span, throwable);
span.setHttpStatusCode(500);
span.finish(); // Finish the span manually since finishSpanOnClose was false
ctx.channel().attr(SPAN_ATTRIBUTE_KEY).remove();
throw throwable;
}
if (response.getStatus() != HttpResponseStatus.CONTINUE) {
DECORATE.onResponse(span, response);
DECORATE.beforeFinish(span);
ctx.channel().attr(SPAN_ATTRIBUTE_KEY).remove();
span.finish(); // Finish the span manually since finishSpanOnClose was false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,25 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
DECORATE.onError(span, throwable);
DECORATE.beforeFinish(span);
span.finish(); // Finish the span manually since finishSpanOnClose was false
ctx.channel().attr(SPAN_ATTRIBUTE_KEY).remove();
throw throwable;
}
}
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
try {
super.channelInactive(ctx);
} finally {
try {
final AgentSpan span = ctx.channel().attr(SPAN_ATTRIBUTE_KEY).getAndRemove();
if (span != null && span.phasedFinish()) {
// at this point we can just publish this span to avoid loosing the rest of the trace
span.publish();
}
} catch (final Throwable ignored) {
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann
DECORATE.onError(span, throwable);
span.setHttpStatusCode(500);
span.finish(); // Finish the span manually since finishSpanOnClose was false
ctx.channel().attr(SPAN_ATTRIBUTE_KEY).remove();
throw throwable;
}
if (response.status() != HttpResponseStatus.CONTINUE
Expand All @@ -44,6 +45,7 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann
DECORATE.onResponse(span, response);
DECORATE.beforeFinish(span);
span.finish(); // Finish the span manually since finishSpanOnClose was false
ctx.channel().attr(SPAN_ATTRIBUTE_KEY).remove();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import reactor.core.publisher.Mono
import reactor.netty.http.HttpProtocol
import reactor.netty.http.client.HttpClient

import java.time.Duration

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
classes = [SpringWebFluxTestApplication],
properties = "server.http2.enabled=true")
Expand Down Expand Up @@ -656,6 +658,59 @@ class SpringWebfluxHttp11Test extends AgentTestRunner {
"annotation API delayed response" | "/foo-delayed" | "/foo-delayed" | "getFooDelayed" | new FooModel(3L, "delayed").toString()
}

def "Cancellation should always release the server span"() {
setup:
String url = "http://localhost:$port/very-delayed"
when:
def response = client.get().uri(url).exchange().timeout(Duration.ofSeconds(2)).block()

then:
thrown Exception
assert response == null
assertTraces(2) {
def traceParent
sortSpansByStart()
trace(2) {
clientSpan(it, null, "http.request", "spring-webflux-client", "GET", URI.create(url), null, false, null, false,
[ "message":"The subscription was cancelled", "event":"cancelled"])
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url), null)
}
trace(2) {
span {
resourceName "GET /very-delayed"
operationName "netty.request"
spanType DDSpanTypes.HTTP_SERVER
childOf(traceParent)
tags {
"$Tags.COMPONENT" "netty"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_SERVER
"$Tags.PEER_HOST_IPV4" "127.0.0.1"
"$Tags.PEER_PORT" Integer
"$Tags.HTTP_URL" url
"$Tags.HTTP_HOSTNAME" "localhost"
"$Tags.HTTP_METHOD" "GET"
"$Tags.HTTP_USER_AGENT" String
"$Tags.HTTP_CLIENT_IP" "127.0.0.1"
"$Tags.HTTP_ROUTE" "/very-delayed"
defaultTags(true)
}
}
span {
resourceName "TestController.getVeryDelayedMono"
operationName "TestController.getVeryDelayedMono"
spanType DDSpanTypes.HTTP_SERVER
childOfPrevious()
tags {
"$Tags.COMPONENT" "spring-webflux-controller"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_SERVER
"handler.type" TestController.getName()
defaultTags()
}
}
}
}
}

def clientSpan(
TraceAssert trace,
Object parentSpan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import datadog.trace.api.Trace
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.reactive.function.server.ServerResponse
import reactor.core.publisher.Mono

import java.time.Duration
Expand Down Expand Up @@ -56,6 +57,12 @@ class TestController {
return Mono.just(id).delayElement(Duration.ofMillis(100)).map { i -> tracedMethod(i) }
}

@GetMapping("/very-delayed")
Mono<ServerResponse> getVeryDelayedMono() {
return Mono.delay(Duration.ofSeconds(30)) // long enough not to finish
.then(ServerResponse.status(200).build())
}

@Trace()
private FooModel tracedMethod(long id) {
return new FooModel(id, "tracedMethod")
Expand Down

0 comments on commit 9e8d0dc

Please sign in to comment.