From 6be6c43d1ea6e0c16e66bd69177ec0ca260ee8aa Mon Sep 17 00:00:00 2001 From: Jesse Yates Date: Thu, 30 Jun 2022 10:02:35 +0200 Subject: [PATCH] Improve freshness verbosity (#60) * Burrow 404 stack traces are omitted They end up clogging the logs as often due to burrow configs consumers can be returned but not have any lag information available. This can cause the logs to be filled with useless stack traces of REST 404 errors from the burrow lag queries. Instead, we just log the one-line message containing the Response information - everything that is actually useful in debugging a 404. * Adding LOG.debug statements for calculating consumer freshness This gives consumers a flag to toggle - turning on DEBUG level logging - that though it is verbose, will allow them to see what is actually happening under the hood in the freshness tracker. This can be very useful for debugging 'weird' results from freshness computations. This also meant turning down the Burrow GET request logging from DEBUG to TRACE as we really dont need to see the requests in the logs at DEBUG when we are getting that same response information in ConsumerFreshness (albeit slightly massaged). --- .../tesla/data/consumer/freshness/Burrow.java | 16 ++++++++++++++-- .../consumer/freshness/ConsumerFreshness.java | 12 +++++++++++- .../consumer/freshness/FreshnessTracker.java | 6 ++++++ 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/Burrow.java b/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/Burrow.java index 9209581..3ee891b 100644 --- a/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/Burrow.java +++ b/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/Burrow.java @@ -82,7 +82,7 @@ private HttpUrl address(String... paths) { private Map request(String... paths) throws IOException { Response response = null; HttpUrl url = address(paths); - LOG.debug("GET {}", url); + LOG.trace("GET {}", url); Request request = new Request.Builder() .url(url) .get().build(); @@ -92,7 +92,7 @@ private Map request(String... paths) throws IOException { // burrow will build a valid map with the body for invalid responses (i.e. consumer group not found), so we // need to check that the response was failure. if (!response.isSuccessful()) { - throw new IOException("Response was not successful: " + response); + throw new UnsuccessfulResponseException(response); } return MAPPER.readValue(response.body().byteStream(), Map.class); } catch (IOException e) { @@ -148,4 +148,16 @@ public String getCluster() { return cluster; } } + + /** + * Exception when the request returned unsuccessfully. + */ + public class UnsuccessfulResponseException extends IOException { + public final Response response; + + public UnsuccessfulResponseException(Response response) { + super("Response was not successful: " + response); + this.response = response; + } + } } diff --git a/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/ConsumerFreshness.java b/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/ConsumerFreshness.java index a7c730b..48ca651 100644 --- a/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/ConsumerFreshness.java +++ b/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/ConsumerFreshness.java @@ -339,8 +339,17 @@ private ListenableFuture> measureConsumer(Burrow.ClusterCl status = burrow.getConsumerGroupStatus(consumerGroup); Preconditions.checkState(status.get("partitions") != null, "Burrow response is missing partitions, got {}", status); + } catch (Burrow.UnsuccessfulResponseException e) { + // Sometimes the consumer is missing (e.g. due to include/exclude rules in burrow), which can clog the logs and + // make it hard to see actual problems. In that case, we just log the message, rather than the full stack trace. + if (e.response.code() == 404) { + LOG.error("Failed to read Burrow status for consumer {}. Skipping\n{}", consumerGroup, e.getMessage()); + } else { + LOG.error("Failed to read Burrow status for consumer {}. Skipping", consumerGroup, e); + } + metrics.error.labels(burrow.getCluster(), consumerGroup).inc(); + return Futures.immediateFuture(Collections.emptyList()); } catch (IOException | IllegalStateException e) { - // this happens sometimes, when burrow is acting up (e.g. "bad" consumer names) LOG.error("Failed to read Burrow status for consumer {}. Skipping", consumerGroup, e); metrics.error.labels(burrow.getCluster(), consumerGroup).inc(); return Futures.immediateFuture(Collections.emptyList()); @@ -374,6 +383,7 @@ private ListenableFuture> measureConsumer(Burrow.ClusterCl @Override public PartitionResult call() { try { + LOG.debug("Calculating freshness for {}", consumerState); tracker.run(); return new PartitionResult(consumerState); } catch (Exception e) { diff --git a/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/FreshnessTracker.java b/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/FreshnessTracker.java index 0f581a4..45cf8b4 100644 --- a/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/FreshnessTracker.java +++ b/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/FreshnessTracker.java @@ -70,6 +70,12 @@ public void run() { } Instant now = Instant.now(clock); freshness = Math.max(now.toEpochMilli() - record.timestamp(), 0); + LOG.debug("Found freshness of {} from first uncommitted record {} for {}", freshness, record, consumer); + } else { + // up-to-date consumers are at the LEO (i.e. lag == 0), so we don't have a 'first uncommitted record' that + // would be interesting to log, so just log the freshness == 0. Its in an 'else' to avoid too double logging + // for consumers that are not up-to-date. + LOG.debug("{} recording {} ms freshness", consumer, freshness); } metrics.freshness.labels(this.consumer.cluster, this.consumer.group, this.from.topic(), Integer.toString(this.from.partition()))