Skip to content

Commit

Permalink
Add reactor samples and doc (#7906)
Browse files Browse the repository at this point in the history
* Add reactor samples and doc

* Apply suggestions from code review

Co-authored-by: Bruce Bujon <[email protected]>

* review

---------

Co-authored-by: Bruce Bujon <[email protected]>
  • Loading branch information
amarziali and PerfectSlayer authored Nov 12, 2024
1 parent f0b7459 commit 6082863
Show file tree
Hide file tree
Showing 6 changed files with 319 additions and 0 deletions.
116 changes: 116 additions & 0 deletions docs/reactor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Reactor with dd-trace-java

This project shows basic examples of manual tracing with reactor and the datadog java tracer.
The examples are provided in the form of JUnit tests. Manual tracing is achieved using the OpenTelemetry APIs.

## Get started

The project is configured to run with a JDK 17 and Apache Maven.
To get started on a command line, just type `mvn test`. It will generate and log traces in a json format on the
console.

## How the context is propagated

The Reactor context propagates bottom up from the first subscriber (last publisher) to the last subscriber (first publisher).
The datadog tracer captures the active span when the subscription happens (i.e. methods like `subcribe` or `block` are called)
and activates them when a publisher emits if there is no an already active span.

This works out of the box for bottom-up propagation. For the opposite, top-down, the reactor context has to be used in order
to let know the publisher which span needs to be activated when emitting downstream. The span has to be
added to the context using the key `dd.span`.

## Use cases

The use cases are reflecting the tests in this project

### Standard bottom-up context propagation

The sample leverages the `@Trace` annotation to create a span when a method returning a `Mono` or `Flux` is called.
The annotation is available as part of the `dd-trace-api` library. Alternatively, the OpenTelemetry equivalent `@WithSpan` can
also be used.

```java
@Trace(operationName = "mono", resourceName = "mono")
private Mono<String> monoMethod() {
// ...
}
```

Since the tracer runs with the option `-Ddd.trace.annotation.async=true`, it will finish the span when the `Mono`
will complete and not when the method will return.

In this test the context is captured when `block()` is called and every other span opened by the upstream operators
will have it as parent.

The diagram below shows the span propagated onSubscribe (up arrows)
and the span propagated downstream onNext/onError/onComplete (down arrows).

```mermaid
graph TD;
m1[Mono.just<br><em>creates mono</em>]-->|parent|m2;
m2[Mono.map<br><em>creates child</em>]-->|parent|m3[Mono.block];
m3-->|parent|m2;
m2-->|parent|m1;
```

The resulting trace:

![img.png](img.png)


### Top Down context propagation

The context propagation can be changed by advising the span to publish via the reactor `Context`.
The span should be put under the key `dd.span`.
As soon as a publisher emits this span, all the downstream operators will also have that span as active.
It is important to use `ContextWrite` in the right places in the reactive chain for this reason.

Relating to the `testSimpleDownstreamPropagation` test case, the reactive chains will capture `parent` as bottom-up propagation
when `block` is called, but then the propagation is changed when `contextWrite("dd.span", ...)` is called.

The diagram below shows what's happening:

```mermaid
graph TD;
m1[Mono.defer+contextWrite<br><em>creates mono</em>]-->|mono|m2;
m2[Mono.map<br><em>creates child</em>]-->|mono|m3[Mono.block];
m3-->|parent|m2;
m2-->|parent|m1;
```

The resulting trace:

![img_1.png](img_1.png)

### A more complex scenario

`ContextWrite` can be called several times in the chain in order to change the active span that will be propagated.
In fact, generally speaking, when a span is put in the context, it will be propagated by all the upstream publishers
that will have visibility to that reactor context.

Referring to the `testComplexDownstreamPropagation` test case, the propagation is resumed in the following (simplified) diagram:

```mermaid
graph TD;
m1[Mono.flatmap<br><em>creates first</em>]-->|first|m2;
m2[Mono.contextWrite<br><em>set first</em>]-->|first|m3;
m3[Mono.map<br><em>creates child</em>]-->|first|m4;
m4[Mono.flatmap<br><em>creates second</em>]-->|second|m5;
m5[Mono.contextWrite<br><em>set second</em>]-->|second|m6;
m6[Mono.flatmap+contextWrite<br><em>creates third</em>]-->|third|m7;
m7[Mono.flatmap+contextWrite<br><em>creates third</em>]-->|third|m8;
m8[Mono.map<br><em>creates child</em>]-->|third|m9[Mono.block];
m6-->|parent|m5;
m5-->|parent|m4;
m4-->|third|m3;
m3-->|second|m2;
m2-->|second|m1;
m10[start]-->|parent|m1;
```

The graph starts with parent since it's the span captured when `block` is called.
Each flatmap changes the context's active span when `onNext` is signaled

The resulting trace:
![img_2.png](img_2.png)

Binary file added docs/reactor/img.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/reactor/img_1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/reactor/img_2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
83 changes: 83 additions & 0 deletions docs/reactor/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>dd-trace-java-reactor-examples</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.6.11</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.11.2</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.5.11</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>1.43.0</version>
</dependency>
<dependency>
<groupId>com.datadoghq</groupId>
<artifactId>dd-trace-api</artifactId>
<version>1.42.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.8.0</version>
<executions>
<execution>
<id>copy-agent</id>
<phase>process-test-classes</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>com.datadoghq</groupId>
<artifactId>dd-java-agent</artifactId>
<version>1.42.0</version>
<outputDirectory>${project.build.directory}/agents</outputDirectory>
<destFileName>dd-java-agent.jar</destFileName>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.12.4</version>
<configuration>
<argLine>-javaagent:${project.build.directory}/agents/dd-java-agent.jar -Ddd.trace.otel.enabled=true -Ddd.trace.annotation.async=true</argLine>
</configuration>
</plugin>
</plugins>
</build>

</project>
120 changes: 120 additions & 0 deletions docs/reactor/src/test/java/test/ReactorTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package test;

import datadog.trace.api.Trace;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import java.time.Duration;
import java.util.Objects;
import java.util.Random;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

public class ReactorTest {

@Trace(operationName = "child", resourceName = "child")
private String doSomeMapping(String s) {
try {
// simulate some work
Thread.sleep(500 + new Random(System.currentTimeMillis()).nextInt(1000));
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
return s;
}

@Trace(operationName = "mono", resourceName = "mono")
private Mono<String> monoMethod() {
// This mono will complete when the delay is expired
return Mono.delay(Duration.ofSeconds(1)).map(ignored -> "Hello World");
}

@Trace(operationName = "mono", resourceName = "mono")
private Mono<String> monoMethodDownstreamPropagate() {
// here the active span is the one created by the @Trace annotation before the method executes
return Mono.just("Hello World").contextWrite(Context.of("dd.span", Span.current()));
}

private <T> Mono<T> tracedMono(
final Tracer tracer, final String spanName, Span parentSpan, final Mono<T> mono) {
SpanBuilder spanBuilder = tracer.spanBuilder(spanName);
if (parentSpan != null) {
spanBuilder.setParent(io.opentelemetry.context.Context.current().with(parentSpan));
}
final Span span = spanBuilder.startSpan();
return mono //
.contextWrite(Context.of("dd.span", span))
.doFinally(ignored -> span.end());
}

@Test
public void testSimpleUpstreamPropagation() {
final Tracer tracer = GlobalOpenTelemetry.getTracer("");
final Span parent = tracer.spanBuilder("parent").startSpan();
assert io.opentelemetry.context.Context.current() == io.opentelemetry.context.Context.root();
try (final Scope parentScope = parent.makeCurrent()) {
// monoMethod will start a trace when called but that span will complete only when the
// returned mono completes.
// doSomeMapping will open a span that's child of parent because it's the active one when we
// subscribe
assert Objects.equals(monoMethod().map(this::doSomeMapping).block(), "Hello World");
} finally {
parent.end();
}
}

@Test
public void testSimpleDownstreamPropagation() {
final Tracer tracer = GlobalOpenTelemetry.getTracer("");
final Span parent = tracer.spanBuilder("parent").startSpan();
assert io.opentelemetry.context.Context.current() == io.opentelemetry.context.Context.root();
try (final Scope parentScope = parent.makeCurrent()) {
// monoMethod will start a trace when called but that span will complete only when the
// returned mono completes.
// doSomeMapping will open a span that's child of parent because it's the active one when we
// subscribe
assert Objects.equals(
Mono.defer(this::monoMethodDownstreamPropagate).map(this::doSomeMapping).block(),
"Hello World");
} finally {
parent.end();
}
}

@Test
public void testComplexDownstreamPropagation() {
final Tracer tracer = GlobalOpenTelemetry.getTracer("");
final Span parent = tracer.spanBuilder("parent").startSpan();
assert io.opentelemetry.context.Context.current() == io.opentelemetry.context.Context.root();

Mono<String> mono =
// here we have no active span. when the mono is emitted we propagate the context captured
// onSubscribe
Mono.just("Hello World") //
// change the downstream propagated span to that new one called 'first'
// first will be child of parent since parent was captured onSubscribe
// (when block is called) and propagated upstream
.flatMap(s -> tracedMono(tracer, "first", null, Mono.just(s + ", GoodBye ")))
// map will use the active one (first) hence the child will be under first
.map(this::doSomeMapping)
// we change again the downstream active span to 'second' that's child of 'first'
.flatMap(
s ->
tracedMono(
tracer, "second", null, Mono.create(sink -> sink.success(s + "World"))))
// now we let the downstream propagate third child of parent
.flatMap(s -> tracedMono(tracer, "third", parent, Mono.just(s + "!")))
// third is the active span downstream
.map(this::doSomeMapping) // will create a child span having third as parent
.doOnNext(System.out::println);
try (final Scope parentScope = parent.makeCurrent()) {
// block, like subscribe will capture the current scope (parent here) and propagate upstream
assert Objects.equals(mono.block(), "Hello World, GoodBye World!");
} finally {
parent.end();
}
}
}

0 comments on commit 6082863

Please sign in to comment.