Skip to content

Commit

Permalink
Fixes thread safety bug on in-memory storage regarding accept count (#…
Browse files Browse the repository at this point in the history
…2779)

Before, we were using an unsafe means to increment a counter for the
in-memory storage. This contributed to a test flake. This fixes the
state bug and also tightens up the flakey test.

The flakey test still exists, so is disabled for now.
  • Loading branch information
adriancole authored Aug 24, 2019
1 parent 6dde6e4 commit afd63b5
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@

import com.linecorp.armeria.server.Server;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -38,17 +40,21 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static zipkin2.TestObjects.DAY;
import static zipkin2.TestObjects.TRACE;
import static zipkin2.TestObjects.TODAY;
import static zipkin2.server.internal.ITZipkinServer.url;

/**
* This class is flaky for as yet unknown reasons. For example, in Travis, sometimes assertions fail
* due to incomplete traces. Hence, it includes more assertion customization than normal.
*/
@SpringBootTest(
classes = ZipkinServer.class,
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
properties = {
"spring.config.name=zipkin-server",
"zipkin.self-tracing.enabled=true",
"zipkin.self-tracing.message-timeout=1ms",
"zipkin.self-tracing.traces-per-second=10"
"zipkin.self-tracing.message-timeout=100ms",
"zipkin.self-tracing.traces-per-second=100"
})
@RunWith(SpringRunner.class)
public class ITZipkinSelfTracing {
Expand All @@ -67,36 +73,38 @@ InMemoryStorage inMemoryStorage() {
}

@Test public void getIsTraced_v2() throws Exception {
assertThat(get("v2").body().string()).isEqualTo("[]");
assertThat(getServices("v2").body().string()).isEqualTo("[]");

awaitSpans();
List<List<Span>> traces = awaitSpans(2);

assertThat(getTraces(QueryRequest.newBuilder()
.annotationQuery(singletonMap("http.path", "/api/v2/services")))).isNotEmpty();
assertQueryReturnsResults(QueryRequest.newBuilder()
.annotationQuery(singletonMap("http.path", "/api/v2/services")), traces);

assertThat(getTraces(QueryRequest.newBuilder().spanName("get-service-names"))).isNotEmpty();
assertQueryReturnsResults(QueryRequest.newBuilder().spanName("get-service-names"), traces);
}

@Test public void postIsTraced_v1() throws Exception {
post("v1");
@Test @Ignore("https://github.com/openzipkin/zipkin/issues/2781")
public void postIsTraced_v1() throws Exception {
postSpan("v1");

awaitSpans();
List<List<Span>> traces = awaitSpans(3); // test span + POST + accept-spans

assertThat(getTraces(QueryRequest.newBuilder()
.annotationQuery(singletonMap("http.path", "/api/v1/spans")))).isNotEmpty();
assertQueryReturnsResults(QueryRequest.newBuilder()
.annotationQuery(singletonMap("http.path", "/api/v1/spans")), traces);

assertThat(getTraces(QueryRequest.newBuilder().spanName("accept-spans"))).isNotEmpty();
assertQueryReturnsResults(QueryRequest.newBuilder().spanName("accept-spans"), traces);
}

@Test public void postIsTraced_v2() throws Exception {
post("v2");
@Test @Ignore("https://github.com/openzipkin/zipkin/issues/2781")
public void postIsTraced_v2() throws Exception {
postSpan("v2");

awaitSpans();
List<List<Span>> traces = awaitSpans(3); // test span + POST + accept-spans

assertThat(getTraces(QueryRequest.newBuilder()
.annotationQuery(singletonMap("http.path", "/api/v2/spans")))).isNotEmpty();
assertQueryReturnsResults(QueryRequest.newBuilder()
.annotationQuery(singletonMap("http.path", "/api/v2/spans")), traces);

assertThat(getTraces(QueryRequest.newBuilder().spanName("accept-spans"))).isNotEmpty();
assertQueryReturnsResults(QueryRequest.newBuilder().spanName("accept-spans"), traces);
}

/**
Expand All @@ -110,31 +118,58 @@ InMemoryStorage inMemoryStorage() {
assertThat(reporter).hasToString("AsyncReporter{StorageComponent}");
}

void awaitSpans() {
await().untilAsserted(// wait for spans
() -> assertThat(inMemoryStorage().acceptedSpanCount()).isGreaterThanOrEqualTo(1));
List<List<Span>> awaitSpans(int count) {
await().untilAsserted(() -> { // wait for spans
List<List<Span>> traces = inMemoryStorage().getTraces();
long received = traces.stream().flatMap(List::stream).count();
assertThat(inMemoryStorage().acceptedSpanCount())
.withFailMessage("Wanted %s spans: got %s. Current traces: %s", count, received, traces)
.isGreaterThanOrEqualTo(count);
});
return inMemoryStorage().getTraces();
}

void assertQueryReturnsResults(QueryRequest.Builder builder, List<List<Span>> traces)
throws IOException {
QueryRequest query = builder.endTs(System.currentTimeMillis()).lookback(DAY).limit(2).build();
assertThat(inMemoryStorage().getTraces(query).execute())
.withFailMessage("Expected results from %s. Current traces: %s", query, traces)
.isNotEmpty();
}

void post(String version) throws IOException {
/**
* This POSTs a single span. Afterwards, we expect this trace in storage, and also the self-trace
* of POSTing it.
*/
void postSpan(String version) throws IOException {
SpanBytesEncoder encoder =
"v1".equals(version) ? SpanBytesEncoder.JSON_V1 : SpanBytesEncoder.JSON_V2;
client.newCall(new Request.Builder()

List<Span> testTrace = Collections.singletonList(
Span.newBuilder().timestamp(TODAY).traceId("1").id("2").name("test-trace").build()
);

Response response = client.newCall(new Request.Builder()
.url(url(server, "/api/" + version + "/spans"))
.post(RequestBody.create(null, encoder.encodeList(TRACE)))
.post(RequestBody.create(null, encoder.encodeList(testTrace)))
.build())
.execute();
assertSuccessful(response);
}

List<List<Span>> getTraces(QueryRequest.Builder request) throws IOException {
return inMemoryStorage().getTraces(
request.endTs(System.currentTimeMillis()).lookback(DAY).limit(2).build()
).execute();
}

Response get(String version) throws IOException {
return client.newCall(new Request.Builder()
Response getServices(String version) throws IOException {
Response response = client.newCall(new Request.Builder()
.url(url(server, "/api/" + version + "/services"))
.build())
.execute();
assertSuccessful(response);
return response;
}

static void assertSuccessful(Response response) throws IOException {
assertThat(response.isSuccessful())
.withFailMessage("unsuccessful %s: %s", response.request(),
response.peekBody(Long.MAX_VALUE).string())
.isTrue();
}
}
33 changes: 16 additions & 17 deletions zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import zipkin2.Call;
import zipkin2.Callback;
import zipkin2.DependencyLink;
Expand Down Expand Up @@ -163,7 +165,7 @@ Collection<String> valueContainer() {
final int maxSpanCount;
final Call<List<String>> autocompleteKeysCall;
final Set<String> autocompleteKeys;
volatile int acceptedSpanCount;
final AtomicInteger acceptedSpanCount = new AtomicInteger();

InMemoryStorage(Builder builder) {
this.strictTraceId = builder.strictTraceId;
Expand All @@ -174,11 +176,11 @@ Collection<String> valueContainer() {
}

public int acceptedSpanCount() {
return acceptedSpanCount;
return acceptedSpanCount.get();
}

public synchronized void clear() {
acceptedSpanCount = 0;
acceptedSpanCount.set(0);
traceIdToTraceIdTimeStamps.clear();
spansByTraceIdTimeStamp.clear();
serviceToTraceIds.clear();
Expand All @@ -193,6 +195,8 @@ public synchronized void clear() {

synchronized void doAccept(List<Span> spans) {
int delta = spans.size();
acceptedSpanCount.addAndGet(delta);

int spansToRecover = (spansByTraceIdTimeStamp.size() + delta) - maxSpanCount;
evictToRecoverSpans(spansToRecover);
for (Span span : spans) {
Expand All @@ -201,7 +205,6 @@ synchronized void doAccept(List<Span> spans) {
TraceIdTimestamp traceIdTimeStamp = new TraceIdTimestamp(lowTraceId, timestamp);
spansByTraceIdTimeStamp.put(traceIdTimeStamp, span);
traceIdToTraceIdTimeStamps.put(lowTraceId, traceIdTimeStamp);
acceptedSpanCount++;

if (!searchEnabled) continue;
String serviceName = span.localServiceName();
Expand Down Expand Up @@ -285,8 +288,7 @@ private int deleteOldestTrace() {
return spansEvicted;
}

@Override
public synchronized Call<List<List<Span>>> getTraces(QueryRequest request) {
@Override public Call<List<List<Span>>> getTraces(QueryRequest request) {
return getTraces(request, strictTraceId);
}

Expand Down Expand Up @@ -340,7 +342,7 @@ public synchronized List<List<Span>> getTraces() {
}

/** Used for testing. Returns all dependency links unconditionally. */
public synchronized List<DependencyLink> getDependencies() {
public List<DependencyLink> getDependencies() {
return LinkDependencies.INSTANCE.map(getTraces());
}

Expand All @@ -365,8 +367,7 @@ Set<String> traceIdsDescendingByTimestamp(QueryRequest request) {
return Collections.unmodifiableSet(result);
}

@Override
public synchronized Call<List<Span>> getTrace(String traceId) {
@Override public synchronized Call<List<Span>> getTrace(String traceId) {
traceId = Span.normalizeTraceId(traceId);
List<Span> spans = spansByTraceId(lowTraceId(traceId));
if (spans == null || spans.isEmpty()) return Call.emptyList();
Expand All @@ -382,26 +383,24 @@ public synchronized Call<List<Span>> getTrace(String traceId) {
return Call.create(filtered);
}

@Override public Call<List<String>> getServiceNames() {
@Override public synchronized Call<List<String>> getServiceNames() {
if (!searchEnabled) return Call.emptyList();
return Call.create(new ArrayList<>(serviceToTraceIds.keySet()));
}

@Override public Call<List<String>> getRemoteServiceNames(String service) {
@Override public synchronized Call<List<String>> getRemoteServiceNames(String service) {
if (service.isEmpty() || !searchEnabled) return Call.emptyList();
service = service.toLowerCase(Locale.ROOT); // service names are always lowercase!
return Call.create(new ArrayList<>(serviceToRemoteServiceNames.get(service)));
}

@Override
public synchronized Call<List<String>> getSpanNames(String service) {
@Override public synchronized Call<List<String>> getSpanNames(String service) {
if (service.isEmpty() || !searchEnabled) return Call.emptyList();
service = service.toLowerCase(Locale.ROOT); // service names are always lowercase!
return Call.create(new ArrayList<>(serviceToSpanNames.get(service)));
}

@Override
public synchronized Call<List<DependencyLink>> getDependencies(long endTs, long lookback) {
@Override public Call<List<DependencyLink>> getDependencies(long endTs, long lookback) {
QueryRequest request =
QueryRequest.newBuilder().endTs(endTs).lookback(lookback).limit(Integer.MAX_VALUE).build();

Expand All @@ -411,12 +410,12 @@ public synchronized Call<List<DependencyLink>> getDependencies(long endTs, long
return getTracesCall.map(LinkDependencies.INSTANCE);
}

@Override public Call<List<String>> getKeys() {
@Override public synchronized Call<List<String>> getKeys() {
if (!searchEnabled) return Call.emptyList();
return autocompleteKeysCall.clone();
}

@Override public Call<List<String>> getValues(String key) {
@Override public synchronized Call<List<String>> getValues(String key) {
if (key == null) throw new NullPointerException("key == null");
if (key.isEmpty()) throw new IllegalArgumentException("key was empty");
if (!searchEnabled) return Call.emptyList();
Expand Down

0 comments on commit afd63b5

Please sign in to comment.