Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve freshness verbosity #60

Merged
merged 2 commits into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private HttpUrl address(String... paths) {
private Map<String, Object> request(String... paths) throws IOException {
Response response = null;
HttpUrl url = address(paths);
LOG.debug("GET {}", url);
LOG.trace("GET {}", url);
Request request = new Request.Builder()
.url(url)
.get().build();
Expand All @@ -92,7 +92,7 @@ private Map<String, Object> request(String... paths) throws IOException {
// burrow will build a valid map with the body for invalid responses (i.e. consumer group not found), so we
// need to check that the response was failure.
if (!response.isSuccessful()) {
throw new IOException("Response was not successful: " + response);
throw new UnsuccessfulResponseException(response);
}
return MAPPER.readValue(response.body().byteStream(), Map.class);
} catch (IOException e) {
Expand Down Expand Up @@ -148,4 +148,16 @@ public String getCluster() {
return cluster;
}
}

/**
* Exception when the request returned unsuccessfully.
*/
public class UnsuccessfulResponseException extends IOException {
public final Response response;

public UnsuccessfulResponseException(Response response) {
super("Response was not successful: " + response);
this.response = response;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,17 @@ private ListenableFuture<List<PartitionResult>> measureConsumer(Burrow.ClusterCl
status = burrow.getConsumerGroupStatus(consumerGroup);
Preconditions.checkState(status.get("partitions") != null,
"Burrow response is missing partitions, got {}", status);
} catch (Burrow.UnsuccessfulResponseException e) {
// Sometimes the consumer is missing (e.g. due to include/exclude rules in burrow), which can clog the logs and
// make it hard to see actual problems. In that case, we just log the message, rather than the full stack trace.
if (e.response.code() == 404) {
LOG.error("Failed to read Burrow status for consumer {}. Skipping\n{}", consumerGroup, e.getMessage());
} else {
LOG.error("Failed to read Burrow status for consumer {}. Skipping", consumerGroup, e);
}
metrics.error.labels(burrow.getCluster(), consumerGroup).inc();
return Futures.immediateFuture(Collections.emptyList());
} catch (IOException | IllegalStateException e) {
// this happens sometimes, when burrow is acting up (e.g. "bad" consumer names)
LOG.error("Failed to read Burrow status for consumer {}. Skipping", consumerGroup, e);
metrics.error.labels(burrow.getCluster(), consumerGroup).inc();
return Futures.immediateFuture(Collections.emptyList());
Expand Down Expand Up @@ -374,6 +383,7 @@ private ListenableFuture<List<PartitionResult>> measureConsumer(Burrow.ClusterCl
@Override
public PartitionResult call() {
try {
LOG.debug("Calculating freshness for {}", consumerState);
tracker.run();
return new PartitionResult(consumerState);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public void run() {
}
Instant now = Instant.now(clock);
freshness = Math.max(now.toEpochMilli() - record.timestamp(), 0);
LOG.debug("Found freshness of {} from first uncommitted record {} for {}", freshness, record, consumer);
} else {
// up-to-date consumers are at the LEO (i.e. lag == 0), so we don't have a 'first uncommitted record' that
// would be interesting to log, so just log the freshness == 0. Its in an 'else' to avoid too double logging
// for consumers that are not up-to-date.
LOG.debug("{} recording {} ms freshness", consumer, freshness);
}
metrics.freshness.labels(this.consumer.cluster, this.consumer.group, this.from.topic(),
Integer.toString(this.from.partition()))
Expand Down