Skip to content

Commit

Permalink
Fixes accidental hot partition with empty service name
Browse files Browse the repository at this point in the history
In porting over the all-services query, I accidentally created a piping
hot partition. This re-instates code similar to what we had in the old
schema, except that we can avoid an all-service fan out in more cases.
  • Loading branch information
Adrian Cole authored and adriancole committed Nov 14, 2017
1 parent a6764f3 commit 5bb25f5
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 95 deletions.
10 changes: 7 additions & 3 deletions zipkin-storage/zipkin2_cassandra/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ span in trace ID 1 named "get" created by "service1", taking 20 milliseconds
results in the following rows:

1. `service=service1, span=targz, trace_id=1, duration=200`
2. `service=, span=targz, trace_id=1, duration=200`
3. `service=, span=, trace_id=1, duration=200`
2. `service=service1, span=, trace_id=1, duration=200`

Here are corresponding queries that relate to the above rows:
1. `GET /api/v2/traces?serviceName=service1&spanName=targz`
1. `GET /api/v2/traces?serviceName=service1&spanName=targz&minDuration=200000`
1. `GET /api/v2/traces?serviceName=service1&minDuration=200000`
2. `GET /api/v2/traces?spanName=targz`
3. `GET /api/v2/traces?duration=199500`
2. `GET /api/v2/traces?duration=199500`

As you'll notice, the duration component is optional, and stored in
millisecond resolution as opposed to microsecond (which the query represents).
Expand All @@ -96,6 +96,10 @@ search granularity is millisecond, original duration data remains microsecond
granularity. Meanwhile, write performance is dramatically better than writing
discrete values, due to fewer distinct writes.

You might wonder how the last two queries work, considering they don't know
the service name associated with index rows. When needed, this implementation
performs a service name fetch, resulting in a fan-out composition over row 2.

### Time-To_live
Time-To-Live is default now at the table level. It can not be overridden in write requests.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.utils.UUIDs;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.LinkedHashSet;
Expand Down Expand Up @@ -82,37 +81,27 @@ public Call<Void> accept(List<Span> input) {

// Empty values allow for api queries with blank service or span name
String service = s.localServiceName() != null ? s.localServiceName() : "";
String span = null != s.name() ? s.name() : "";
String span = null != s.name() ? s.name() : ""; // Empty value allows for api queries without span name

// service span index is refreshed regardless of timestamp
if (null != s.remoteServiceName()) { // allows getServices to return remote service names
// TODO: this is busy-work as there's no query by remote service name!
serviceSpans.add(insertServiceSpanName.newInput(s.remoteServiceName(), span));
}
if (null == s.localServiceName()) continue; // don't index further w/o a service name

if (!service.equals("")) {
serviceSpans.add(insertServiceSpanName.newInput(service, span));
}
serviceSpans.add(insertServiceSpanName.newInput(service, span));

if (ts_micro == 0L) continue; // search is only valid with a timestamp, don't index w/o it!
int bucket = durationIndexBucket(ts_micro); // duration index is milliseconds not microseconds
Long duration = null != s.duration() ? TimeUnit.MICROSECONDS.toMillis(s.duration()) : null;
traceByServiceSpans.add(
insertTraceServiceSpanName.newInput(service, span, bucket, ts_uuid, s.traceId(), duration)
);
if (!service.isEmpty()) {
traceByServiceSpans.add( // Allows lookup without the service name
insertTraceServiceSpanName.newInput("", span, bucket, ts_uuid, s.traceId(), duration)
);
}
if (span.isEmpty()) continue;
traceByServiceSpans.add( // Allows lookup without the span name
insertTraceServiceSpanName.newInput(service, "", bucket, ts_uuid, s.traceId(), duration)
);
if (!service.isEmpty()) {
traceByServiceSpans.add( // Allows lookup without the service name
insertTraceServiceSpanName.newInput("", "", bucket, ts_uuid, s.traceId(), duration)
);
}
}
List<Call<ResultSet>> calls = new ArrayList<>();
for (InsertSpan.Input span : spans) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import zipkin2.Span;
import zipkin2.storage.QueryRequest;
import zipkin2.storage.SpanStore;
import zipkin2.storage.cassandra.internal.call.AggregateIntoSet;
import zipkin2.storage.cassandra.internal.call.IntersectKeySets;

import static zipkin2.storage.cassandra.CassandraUtil.traceIdsSortedByDescTimestamp;
Expand Down Expand Up @@ -70,17 +69,15 @@ final class CassandraSpanStore implements SpanStore {
}

/**
* This fans out into a number of requests. The returned future will fail if any of the inputs
* fail.
* This fans out into a number of requests corresponding to query input. In simplest case, there
* is less than a day of data queried, and only one expression. This implies one call to fetch
* trace IDs and another to retrieve the span details.
*
* <p>When {@link QueryRequest#serviceName service name} is unset, service names will be fetched
* eagerly, implying an additional query.
*
* <p>The duration query is the most expensive query in cassandra, as it turns into 1 request per
* hour of {@link QueryRequest#lookback lookback}. Because many times lookback is set to a day,
* this means 24 requests to the backend!
*
* <p>See https://github.com/openzipkin/zipkin-java/issues/200
* <p>The amount of backend calls increase in dimensions of query complexity, days of data, and
* limit of traces requested. For example, a query like "http.path=/foo and error" will be two
* select statements for the expression, possibly follow-up calls for pagination (when over 5K
* rows match). Once IDs are parsed, there's one call for each 5K rows of span data. This means
* "http.path=/foo and error" is minimally 3 network calls, the first two in parallel.
*/
@Override
public Call<List<List<Span>>> getTraces(QueryRequest request) {
Expand All @@ -97,20 +94,6 @@ Call<List<List<Span>>> doGetTraces(QueryRequest request) {
// If we have to make multiple queries, over fetch on indexes as they don't return distinct
// (trace id, timestamp) rows. This mitigates intersection resulting in < limit traces
final int traceIndexFetchSize = request.limit() * indexFetchMultiplier;

// Allows GET /api/v2/traces
if (request.serviceName() == null && request.minDuration() == null
&& request.spanName() == null && request.annotationQuery().isEmpty()) {
// NOTE: When we scan the span table, we can't shortcut this and just return spans that match.
// If we did, we'd only return pieces of the trace as opposed to the entire trace.
return spanTable.newCall(timestampRange, traceIndexFetchSize)
.map(collapseToMap)
.map(traceIdsSortedByDescTimestamp())
.flatMap(spans.newFlatMapper(request.limit()));
}

// While a valid port of the scala cassandra span store (from zipkin 1.35), there is a fault.
// each annotation key is an intersection, meaning we likely return < traceIndexFetchSize.
List<Call<Map<String, Long>>> callsToIntersect = new ArrayList<>();

List<String> annotationKeys = CassandraUtil.annotationKeys(request);
Expand All @@ -123,8 +106,39 @@ Call<List<List<Span>>> doGetTraces(QueryRequest request) {
).map(collapseToMap));
}

// trace_by_service_span adds special empty-string keys in order to search by all
String serviceName = null != request.serviceName() ? request.serviceName() : "";
// Bucketed calls can be expensive when service name isn't specified. This guards against abuse.
if (request.spanName() != null || request.minDuration() != null || callsToIntersect.isEmpty()) {
Call<Set<Entry<String, Long>>> bucketedTraceIdCall =
newBucketedTraceIdCall(request, timestampRange, traceIndexFetchSize);
callsToIntersect.add(bucketedTraceIdCall.map(collapseToMap));
}

if (callsToIntersect.size() == 1) {
return callsToIntersect.get(0)
.map(traceIdsSortedByDescTimestamp())
.flatMap(spans.newFlatMapper(request.limit()));
}

// We achieve the AND goal, by intersecting each of the key sets.
IntersectKeySets intersectedTraceIds = new IntersectKeySets(callsToIntersect);
// @xxx the sorting by timestamp desc is broken here^
return intersectedTraceIds.flatMap(spans.newFlatMapper(request.limit()));
}

/**
* Creates a call representing one or more queries against {@link Schema#TABLE_TRACE_BY_SERVICE_SPAN}.
* The result will be an aggregate if the input requests's serviceName is null or there's more
* than one day of data in the timestamp range.
*
* <p>Note that when {@link QueryRequest#serviceName()} is null, the returned query composes over
* {@link #getServiceNames()}. This means that if you have 1000 service names, you will end up
* with a composition of at least 1000 calls.
*/
// TODO: smartly handle when serviceName is null. For example, rank recently written serviceNames
// and speculatively query those first.
Call<Set<Entry<String, Long>>> newBucketedTraceIdCall(QueryRequest request,
TimestampRange timestampRange, int traceIndexFetchSize) {
// trace_by_service_span adds special empty-string span name in order to search by all
String spanName = null != request.spanName() ? request.spanName() : "";
Long minDuration = request.minDuration(), maxDuration = request.maxDuration();
int startBucket = CassandraUtil.durationIndexBucket(timestampRange.startMillis * 1000);
Expand All @@ -134,12 +148,14 @@ Call<List<List<Span>>> doGetTraces(QueryRequest request) {
"Start bucket (" + startBucket + ") > end bucket (" + endBucket + ")");
}

// template input with an empty service name, potentially revisiting later
String serviceName = null != request.serviceName() ? request.serviceName() : "";

// TODO: ideally, the buckets are traversed backwards, only spawning queries for older buckets
// if younger buckets are empty. This will be an async continuation, punted for now.

List<Call<Set<Entry<String, Long>>>> bucketedTraceIdCalls = new ArrayList<>();
List<SelectTraceIdsFromServiceSpan.Input> bucketedTraceIdInputs = new ArrayList<>();
for (int bucket = endBucket; bucket >= startBucket; bucket--) {
bucketedTraceIdCalls.add(traceIdsFromServiceSpan.newCall(
bucketedTraceIdInputs.add(traceIdsFromServiceSpan.newInput(
serviceName,
spanName,
bucket,
Expand All @@ -149,23 +165,17 @@ Call<List<List<Span>>> doGetTraces(QueryRequest request) {
traceIndexFetchSize)
);
}
// Unlikely, but we could have a single bucket
callsToIntersect.add((bucketedTraceIdCalls.size() == 1
? bucketedTraceIdCalls.get(0)
: new AggregateIntoSet<>(bucketedTraceIdCalls)
).map(collapseToMap));

assert !callsToIntersect.isEmpty() : request + " resulted in no trace ID calls";
if (callsToIntersect.size() == 1) {
return callsToIntersect.get(0)
.map(traceIdsSortedByDescTimestamp())
.flatMap(spans.newFlatMapper(request.limit()));
Call<Set<Entry<String, Long>>> bucketedTraceIdCall;
if ("".equals(serviceName)) {
// If we have no service name, we have to lookup service names before running trace ID queries
bucketedTraceIdCall = getServiceNames().flatMap(
traceIdsFromServiceSpan.newFlatMapper(bucketedTraceIdInputs)
);
} else {
bucketedTraceIdCall = traceIdsFromServiceSpan.newCall(bucketedTraceIdInputs);
}

// We achieve the AND goal, by intersecting each of the key sets.
IntersectKeySets intersectedTraceIds = new IntersectKeySets(callsToIntersect);
// @xxx the sorting by timestamp desc is broken here^
return intersectedTraceIds.flatMap(spans.newFlatMapper(request.limit()));
return bucketedTraceIdCall;
}

@Override public Call<List<Span>> getTrace(String traceId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ final class CassandraUtil {
*/
static final int LONGEST_VALUE_TO_INDEX = 256;

// Time window covered by a single bucket of the Span Duration Index, in seconds. Default: 1 day
/**
* Time window covered by a single bucket of the {@link Schema#TABLE_TRACE_BY_SERVICE_SPAN}, in
* seconds. Default: 1 day
*/
private static final long DURATION_INDEX_BUCKET_WINDOW_SECONDS
= Long.getLong("zipkin.store.cassandra.internal.durationIndexBucket", 24 * 60 * 60);

Expand Down Expand Up @@ -100,9 +103,12 @@ enum TraceIdsSortedByDescTimestamp implements Call.Mapper<Map<String, Long>, Set
@Override public Set<String> map(Map<String, Long> map) {
// timestamps can collide, so we need to add some random digits on end before using them as serviceSpanKeys
SortedMap<BigInteger, String> sorted = new TreeMap<>(Collections.reverseOrder());
map.forEach((key, value) -> sorted.put(
BigInteger.valueOf(value).multiply(OFFSET).add(BigInteger.valueOf(RAND.nextInt())), key)
);
for (Map.Entry<String, Long> entry : map.entrySet()) {
BigInteger uncollided = BigInteger.valueOf(entry.getValue())
.multiply(OFFSET)
.add(BigInteger.valueOf(RAND.nextInt()));
sorted.put(uncollided, entry.getKey());
}
return new LinkedHashSet<>(sorted.values());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import com.datastax.driver.core.utils.UUIDs;
import com.google.auto.value.AutoValue;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
Expand All @@ -32,6 +34,7 @@
import zipkin2.internal.Nullable;
import zipkin2.storage.cassandra.CassandraSpanStore.TimestampRange;
import zipkin2.storage.cassandra.internal.call.AccumulateAllResults;
import zipkin2.storage.cassandra.internal.call.AggregateIntoSet;
import zipkin2.storage.cassandra.internal.call.ResultSetFutureCall;

import static zipkin2.storage.cassandra.Schema.TABLE_TRACE_BY_SERVICE_SPAN;
Expand All @@ -53,6 +56,19 @@ final class SelectTraceIdsFromServiceSpan extends ResultSetFutureCall {
abstract UUID end_ts();

abstract int limit_();

Input withService(String service) {
return new AutoValue_SelectTraceIdsFromServiceSpan_Input(
service,
span(),
bucket(),
start_duration(),
end_duration(),
start_ts(),
end_ts(),
limit_()
);
}
}

static class Factory {
Expand Down Expand Up @@ -88,7 +104,7 @@ static class Factory {
);
}

Call<Set<Entry<String, Long>>> newCall(
Input newInput(
String serviceName,
String spanName,
int bucket,
Expand All @@ -101,8 +117,7 @@ Call<Set<Entry<String, Long>>> newCall(
start_duration = minDurationMicros / 1000L;
end_duration = maxDurationMicros != null ? maxDurationMicros / 1000L : Long.MAX_VALUE;
}

Input input = new AutoValue_SelectTraceIdsFromServiceSpan_Input(
return new AutoValue_SelectTraceIdsFromServiceSpan_Input(
serviceName,
spanName,
bucket,
Expand All @@ -112,14 +127,59 @@ Call<Set<Entry<String, Long>>> newCall(
timestampRange.endUUID,
limit
);
}

Call<Set<Entry<String, Long>>> newCall(List<Input> inputs) {
if (inputs.size() == 1) return newCall(inputs.get(0));

List<Call<Set<Entry<String, Long>>>> bucketedTraceIdCalls = new ArrayList<>();
for (SelectTraceIdsFromServiceSpan.Input input : inputs) {
bucketedTraceIdCalls.add(newCall(input));
}
return new AggregateIntoSet<>(bucketedTraceIdCalls);
}

/** Applies all deferred service names to all input templates */
FlatMapper<List<String>, Set<Entry<String, Long>>> newFlatMapper(List<Input> inputTemplates) {
return new FlatMapServicesToInputs(inputTemplates);
}

Call<Set<Entry<String, Long>>> newCall(Input input) {
return new SelectTraceIdsFromServiceSpan(
this,
minDurationMicros != null
input.start_duration() != null
? selectTraceIdsByServiceSpanNameAndDuration
: selectTraceIdsByServiceSpanName,
input
).flatMap(new AccumulateTraceIdTsUuid());
}

class FlatMapServicesToInputs implements FlatMapper<List<String>, Set<Entry<String, Long>>> {
final List<SelectTraceIdsFromServiceSpan.Input> inputTemplates;

FlatMapServicesToInputs(List<SelectTraceIdsFromServiceSpan.Input> inputTemplates) {
this.inputTemplates = inputTemplates;
}

@Override public Call<Set<Entry<String, Long>>> map(List<String> serviceNames) {
List<Call<Set<Entry<String, Long>>>> bucketedTraceIdCalls = new ArrayList<>();

for (String service : serviceNames) { // fan out every input for each service name
List<SelectTraceIdsFromServiceSpan.Input> scopedInputs = new ArrayList<>();
for (SelectTraceIdsFromServiceSpan.Input input : inputTemplates) {
scopedInputs.add(input.withService(service));
}
bucketedTraceIdCalls.add(newCall(scopedInputs));
}

if (bucketedTraceIdCalls.size() == 1) return bucketedTraceIdCalls.get(0);
return new AggregateIntoSet<>(bucketedTraceIdCalls);
}

@Override public String toString() {
return "FlatMapServicesToInputs{" + inputTemplates + "}";
}
}
}

final Factory factory;
Expand Down
Loading

0 comments on commit 5bb25f5

Please sign in to comment.