Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finish netty span when request is cancelled #7900

Merged
merged 3 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,25 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
DECORATE.onError(span, throwable);
DECORATE.beforeFinish(span);
span.finish(); // Finish the span manually since finishSpanOnClose was false
ctx.channel().attr(SPAN_ATTRIBUTE_KEY).remove();
ygree marked this conversation as resolved.
Show resolved Hide resolved
throw throwable;
}
}
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
try {
super.channelInactive(ctx);
} finally {
try {
final AgentSpan span = ctx.channel().attr(SPAN_ATTRIBUTE_KEY).getAndRemove();
if (span != null && span.phasedFinish()) {
// at this point we can just publish this span to avoid loosing the rest of the trace
span.publish();
}
} catch (final Throwable ignored) {
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann
DECORATE.onError(span, throwable);
span.setHttpStatusCode(500);
span.finish(); // Finish the span manually since finishSpanOnClose was false
ctx.channel().attr(SPAN_ATTRIBUTE_KEY).remove();
throw throwable;
}
if (response.getStatus() != HttpResponseStatus.CONTINUE) {
DECORATE.onResponse(span, response);
DECORATE.beforeFinish(span);
ctx.channel().attr(SPAN_ATTRIBUTE_KEY).remove();
span.finish(); // Finish the span manually since finishSpanOnClose was false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,25 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
DECORATE.onError(span, throwable);
DECORATE.beforeFinish(span);
span.finish(); // Finish the span manually since finishSpanOnClose was false
ctx.channel().attr(SPAN_ATTRIBUTE_KEY).remove();
throw throwable;
}
}
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
try {
super.channelInactive(ctx);
} finally {
try {
final AgentSpan span = ctx.channel().attr(SPAN_ATTRIBUTE_KEY).getAndRemove();
if (span != null && span.phasedFinish()) {
// at this point we can just publish this span to avoid loosing the rest of the trace
span.publish();
}
} catch (final Throwable ignored) {
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann
DECORATE.onError(span, throwable);
span.setHttpStatusCode(500);
span.finish(); // Finish the span manually since finishSpanOnClose was false
ctx.channel().attr(SPAN_ATTRIBUTE_KEY).remove();
throw throwable;
}
if (response.status() != HttpResponseStatus.CONTINUE
Expand All @@ -44,6 +45,7 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann
DECORATE.onResponse(span, response);
DECORATE.beforeFinish(span);
span.finish(); // Finish the span manually since finishSpanOnClose was false
ctx.channel().attr(SPAN_ATTRIBUTE_KEY).remove();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import reactor.core.publisher.Mono
import reactor.netty.http.HttpProtocol
import reactor.netty.http.client.HttpClient

import java.time.Duration

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
classes = [SpringWebFluxTestApplication],
properties = "server.http2.enabled=true")
Expand Down Expand Up @@ -656,6 +658,59 @@ class SpringWebfluxHttp11Test extends AgentTestRunner {
"annotation API delayed response" | "/foo-delayed" | "/foo-delayed" | "getFooDelayed" | new FooModel(3L, "delayed").toString()
}

def "Cancellation should always release the server span"() {
setup:
String url = "http://localhost:$port/very-delayed"
when:
def response = client.get().uri(url).exchange().timeout(Duration.ofSeconds(2)).block()

then:
thrown Exception
assert response == null
assertTraces(2) {
def traceParent
sortSpansByStart()
trace(2) {
clientSpan(it, null, "http.request", "spring-webflux-client", "GET", URI.create(url), null, false, null, false,
[ "message":"The subscription was cancelled", "event":"cancelled"])
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url), null)
}
trace(2) {
span {
resourceName "GET /very-delayed"
operationName "netty.request"
spanType DDSpanTypes.HTTP_SERVER
childOf(traceParent)
tags {
"$Tags.COMPONENT" "netty"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_SERVER
"$Tags.PEER_HOST_IPV4" "127.0.0.1"
"$Tags.PEER_PORT" Integer
"$Tags.HTTP_URL" url
"$Tags.HTTP_HOSTNAME" "localhost"
"$Tags.HTTP_METHOD" "GET"
"$Tags.HTTP_USER_AGENT" String
"$Tags.HTTP_CLIENT_IP" "127.0.0.1"
"$Tags.HTTP_ROUTE" "/very-delayed"
defaultTags(true)
}
}
span {
resourceName "TestController.getVeryDelayedMono"
operationName "TestController.getVeryDelayedMono"
spanType DDSpanTypes.HTTP_SERVER
childOfPrevious()
tags {
"$Tags.COMPONENT" "spring-webflux-controller"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_SERVER
"handler.type" TestController.getName()
defaultTags()
}
}
}
}
}

def clientSpan(
TraceAssert trace,
Object parentSpan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import datadog.trace.api.Trace
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.reactive.function.server.ServerResponse
import reactor.core.publisher.Mono

import java.time.Duration
Expand Down Expand Up @@ -56,6 +57,12 @@ class TestController {
return Mono.just(id).delayElement(Duration.ofMillis(100)).map { i -> tracedMethod(i) }
}

@GetMapping("/very-delayed")
Mono<ServerResponse> getVeryDelayedMono() {
return Mono.delay(Duration.ofSeconds(30)) // long enough not to finish
.then(ServerResponse.status(200).build())
}

@Trace()
private FooModel tracedMethod(long id) {
return new FooModel(id, "tracedMethod")
Expand Down