diff --git a/zipkin-server/src/test/java/zipkin2/server/internal/brave/ITZipkinSelfTracing.java b/zipkin-server/src/test/java/zipkin2/server/internal/brave/ITZipkinSelfTracing.java index ab3fb248125..b25485ce2cf 100644 --- a/zipkin-server/src/test/java/zipkin2/server/internal/brave/ITZipkinSelfTracing.java +++ b/zipkin-server/src/test/java/zipkin2/server/internal/brave/ITZipkinSelfTracing.java @@ -15,12 +15,14 @@ import com.linecorp.armeria.server.Server; import java.io.IOException; +import java.util.Collections; import java.util.List; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.RequestBody; import okhttp3.Response; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; @@ -38,17 +40,21 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static zipkin2.TestObjects.DAY; -import static zipkin2.TestObjects.TRACE; +import static zipkin2.TestObjects.TODAY; import static zipkin2.server.internal.ITZipkinServer.url; +/** + * This class is flaky for as yet unknown reasons. For example, in Travis, sometimes assertions fail + * due to incomplete traces. Hence, it includes more assertion customization than normal. + */ @SpringBootTest( classes = ZipkinServer.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = { "spring.config.name=zipkin-server", "zipkin.self-tracing.enabled=true", - "zipkin.self-tracing.message-timeout=1ms", - "zipkin.self-tracing.traces-per-second=10" + "zipkin.self-tracing.message-timeout=100ms", + "zipkin.self-tracing.traces-per-second=100" }) @RunWith(SpringRunner.class) public class ITZipkinSelfTracing { @@ -67,36 +73,38 @@ InMemoryStorage inMemoryStorage() { } @Test public void getIsTraced_v2() throws Exception { - assertThat(get("v2").body().string()).isEqualTo("[]"); + assertThat(getServices("v2").body().string()).isEqualTo("[]"); - awaitSpans(); + List> traces = awaitSpans(2); - assertThat(getTraces(QueryRequest.newBuilder() - .annotationQuery(singletonMap("http.path", "/api/v2/services")))).isNotEmpty(); + assertQueryReturnsResults(QueryRequest.newBuilder() + .annotationQuery(singletonMap("http.path", "/api/v2/services")), traces); - assertThat(getTraces(QueryRequest.newBuilder().spanName("get-service-names"))).isNotEmpty(); + assertQueryReturnsResults(QueryRequest.newBuilder().spanName("get-service-names"), traces); } - @Test public void postIsTraced_v1() throws Exception { - post("v1"); + @Test @Ignore("https://github.com/openzipkin/zipkin/issues/2781") + public void postIsTraced_v1() throws Exception { + postSpan("v1"); - awaitSpans(); + List> traces = awaitSpans(3); // test span + POST + accept-spans - assertThat(getTraces(QueryRequest.newBuilder() - .annotationQuery(singletonMap("http.path", "/api/v1/spans")))).isNotEmpty(); + assertQueryReturnsResults(QueryRequest.newBuilder() + .annotationQuery(singletonMap("http.path", "/api/v1/spans")), traces); - assertThat(getTraces(QueryRequest.newBuilder().spanName("accept-spans"))).isNotEmpty(); + assertQueryReturnsResults(QueryRequest.newBuilder().spanName("accept-spans"), traces); } - @Test public void postIsTraced_v2() throws Exception { - post("v2"); + @Test @Ignore("https://github.com/openzipkin/zipkin/issues/2781") + public void postIsTraced_v2() throws Exception { + postSpan("v2"); - awaitSpans(); + List> traces = awaitSpans(3); // test span + POST + accept-spans - assertThat(getTraces(QueryRequest.newBuilder() - .annotationQuery(singletonMap("http.path", "/api/v2/spans")))).isNotEmpty(); + assertQueryReturnsResults(QueryRequest.newBuilder() + .annotationQuery(singletonMap("http.path", "/api/v2/spans")), traces); - assertThat(getTraces(QueryRequest.newBuilder().spanName("accept-spans"))).isNotEmpty(); + assertQueryReturnsResults(QueryRequest.newBuilder().spanName("accept-spans"), traces); } /** @@ -110,31 +118,58 @@ InMemoryStorage inMemoryStorage() { assertThat(reporter).hasToString("AsyncReporter{StorageComponent}"); } - void awaitSpans() { - await().untilAsserted(// wait for spans - () -> assertThat(inMemoryStorage().acceptedSpanCount()).isGreaterThanOrEqualTo(1)); + List> awaitSpans(int count) { + await().untilAsserted(() -> { // wait for spans + List> traces = inMemoryStorage().getTraces(); + long received = traces.stream().flatMap(List::stream).count(); + assertThat(inMemoryStorage().acceptedSpanCount()) + .withFailMessage("Wanted %s spans: got %s. Current traces: %s", count, received, traces) + .isGreaterThanOrEqualTo(count); + }); + return inMemoryStorage().getTraces(); + } + + void assertQueryReturnsResults(QueryRequest.Builder builder, List> traces) + throws IOException { + QueryRequest query = builder.endTs(System.currentTimeMillis()).lookback(DAY).limit(2).build(); + assertThat(inMemoryStorage().getTraces(query).execute()) + .withFailMessage("Expected results from %s. Current traces: %s", query, traces) + .isNotEmpty(); } - void post(String version) throws IOException { + /** + * This POSTs a single span. Afterwards, we expect this trace in storage, and also the self-trace + * of POSTing it. + */ + void postSpan(String version) throws IOException { SpanBytesEncoder encoder = "v1".equals(version) ? SpanBytesEncoder.JSON_V1 : SpanBytesEncoder.JSON_V2; - client.newCall(new Request.Builder() + + List testTrace = Collections.singletonList( + Span.newBuilder().timestamp(TODAY).traceId("1").id("2").name("test-trace").build() + ); + + Response response = client.newCall(new Request.Builder() .url(url(server, "/api/" + version + "/spans")) - .post(RequestBody.create(null, encoder.encodeList(TRACE))) + .post(RequestBody.create(null, encoder.encodeList(testTrace))) .build()) .execute(); + assertSuccessful(response); } - List> getTraces(QueryRequest.Builder request) throws IOException { - return inMemoryStorage().getTraces( - request.endTs(System.currentTimeMillis()).lookback(DAY).limit(2).build() - ).execute(); - } - - Response get(String version) throws IOException { - return client.newCall(new Request.Builder() + Response getServices(String version) throws IOException { + Response response = client.newCall(new Request.Builder() .url(url(server, "/api/" + version + "/services")) .build()) .execute(); + assertSuccessful(response); + return response; + } + + static void assertSuccessful(Response response) throws IOException { + assertThat(response.isSuccessful()) + .withFailMessage("unsuccessful %s: %s", response.request(), + response.peekBody(Long.MAX_VALUE).string()) + .isTrue(); } } diff --git a/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java b/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java index f0bd8237904..1304e1207af 100644 --- a/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java +++ b/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java @@ -26,6 +26,8 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import zipkin2.Call; import zipkin2.Callback; import zipkin2.DependencyLink; @@ -163,7 +165,7 @@ Collection valueContainer() { final int maxSpanCount; final Call> autocompleteKeysCall; final Set autocompleteKeys; - volatile int acceptedSpanCount; + final AtomicInteger acceptedSpanCount = new AtomicInteger(); InMemoryStorage(Builder builder) { this.strictTraceId = builder.strictTraceId; @@ -174,11 +176,11 @@ Collection valueContainer() { } public int acceptedSpanCount() { - return acceptedSpanCount; + return acceptedSpanCount.get(); } public synchronized void clear() { - acceptedSpanCount = 0; + acceptedSpanCount.set(0); traceIdToTraceIdTimeStamps.clear(); spansByTraceIdTimeStamp.clear(); serviceToTraceIds.clear(); @@ -193,6 +195,8 @@ public synchronized void clear() { synchronized void doAccept(List spans) { int delta = spans.size(); + acceptedSpanCount.addAndGet(delta); + int spansToRecover = (spansByTraceIdTimeStamp.size() + delta) - maxSpanCount; evictToRecoverSpans(spansToRecover); for (Span span : spans) { @@ -201,7 +205,6 @@ synchronized void doAccept(List spans) { TraceIdTimestamp traceIdTimeStamp = new TraceIdTimestamp(lowTraceId, timestamp); spansByTraceIdTimeStamp.put(traceIdTimeStamp, span); traceIdToTraceIdTimeStamps.put(lowTraceId, traceIdTimeStamp); - acceptedSpanCount++; if (!searchEnabled) continue; String serviceName = span.localServiceName(); @@ -285,8 +288,7 @@ private int deleteOldestTrace() { return spansEvicted; } - @Override - public synchronized Call>> getTraces(QueryRequest request) { + @Override public Call>> getTraces(QueryRequest request) { return getTraces(request, strictTraceId); } @@ -340,7 +342,7 @@ public synchronized List> getTraces() { } /** Used for testing. Returns all dependency links unconditionally. */ - public synchronized List getDependencies() { + public List getDependencies() { return LinkDependencies.INSTANCE.map(getTraces()); } @@ -365,8 +367,7 @@ Set traceIdsDescendingByTimestamp(QueryRequest request) { return Collections.unmodifiableSet(result); } - @Override - public synchronized Call> getTrace(String traceId) { + @Override public synchronized Call> getTrace(String traceId) { traceId = Span.normalizeTraceId(traceId); List spans = spansByTraceId(lowTraceId(traceId)); if (spans == null || spans.isEmpty()) return Call.emptyList(); @@ -382,26 +383,24 @@ public synchronized Call> getTrace(String traceId) { return Call.create(filtered); } - @Override public Call> getServiceNames() { + @Override public synchronized Call> getServiceNames() { if (!searchEnabled) return Call.emptyList(); return Call.create(new ArrayList<>(serviceToTraceIds.keySet())); } - @Override public Call> getRemoteServiceNames(String service) { + @Override public synchronized Call> getRemoteServiceNames(String service) { if (service.isEmpty() || !searchEnabled) return Call.emptyList(); service = service.toLowerCase(Locale.ROOT); // service names are always lowercase! return Call.create(new ArrayList<>(serviceToRemoteServiceNames.get(service))); } - @Override - public synchronized Call> getSpanNames(String service) { + @Override public synchronized Call> getSpanNames(String service) { if (service.isEmpty() || !searchEnabled) return Call.emptyList(); service = service.toLowerCase(Locale.ROOT); // service names are always lowercase! return Call.create(new ArrayList<>(serviceToSpanNames.get(service))); } - @Override - public synchronized Call> getDependencies(long endTs, long lookback) { + @Override public Call> getDependencies(long endTs, long lookback) { QueryRequest request = QueryRequest.newBuilder().endTs(endTs).lookback(lookback).limit(Integer.MAX_VALUE).build(); @@ -411,12 +410,12 @@ public synchronized Call> getDependencies(long endTs, long return getTracesCall.map(LinkDependencies.INSTANCE); } - @Override public Call> getKeys() { + @Override public synchronized Call> getKeys() { if (!searchEnabled) return Call.emptyList(); return autocompleteKeysCall.clone(); } - @Override public Call> getValues(String key) { + @Override public synchronized Call> getValues(String key) { if (key == null) throw new NullPointerException("key == null"); if (key.isEmpty()) throw new IllegalArgumentException("key was empty"); if (!searchEnabled) return Call.emptyList();