From 90147a347ff62de326e37b3d353206806d3186a3 Mon Sep 17 00:00:00 2001 From: Jesse Yates Date: Wed, 29 Jun 2022 11:50:55 +0200 Subject: [PATCH 1/2] Burrow 404 stack traces are omitted They end up clogging the logs as often due to burrow configs consumers can be returned but not have any lag information available. This can cause the logs to be filled with useless stack traces of REST 404 errors from the burrow lag queries. Instead, we just log the one-line message containing the Response information - everything that is actually useful in debugging a 404. --- .../com/tesla/data/consumer/freshness/Burrow.java | 14 +++++++++++++- .../data/consumer/freshness/ConsumerFreshness.java | 11 ++++++++++- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/Burrow.java b/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/Burrow.java index 9209581..536bf9e 100644 --- a/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/Burrow.java +++ b/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/Burrow.java @@ -92,7 +92,7 @@ private Map request(String... paths) throws IOException { // burrow will build a valid map with the body for invalid responses (i.e. consumer group not found), so we // need to check that the response was failure. if (!response.isSuccessful()) { - throw new IOException("Response was not successful: " + response); + throw new UnsuccessfulResponseException(response); } return MAPPER.readValue(response.body().byteStream(), Map.class); } catch (IOException e) { @@ -148,4 +148,16 @@ public String getCluster() { return cluster; } } + + /** + * Exception when the request returned unsuccessfully. + */ + public class UnsuccessfulResponseException extends IOException { + public final Response response; + + public UnsuccessfulResponseException(Response response) { + super("Response was not successful: " + response); + this.response = response; + } + } } diff --git a/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/ConsumerFreshness.java b/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/ConsumerFreshness.java index a7c730b..334ba4b 100644 --- a/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/ConsumerFreshness.java +++ b/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/ConsumerFreshness.java @@ -339,8 +339,17 @@ private ListenableFuture> measureConsumer(Burrow.ClusterCl status = burrow.getConsumerGroupStatus(consumerGroup); Preconditions.checkState(status.get("partitions") != null, "Burrow response is missing partitions, got {}", status); + } catch (Burrow.UnsuccessfulResponseException e) { + // Sometimes the consumer is missing (e.g. due to include/exclude rules in burrow), which can clog the logs and + // make it hard to see actual problems. In that case, we just log the message, rather than the full stack trace. + if (e.response.code() == 404) { + LOG.error("Failed to read Burrow status for consumer {}. Skipping\n{}", consumerGroup, e.getMessage()); + } else { + LOG.error("Failed to read Burrow status for consumer {}. Skipping", consumerGroup, e); + } + metrics.error.labels(burrow.getCluster(), consumerGroup).inc(); + return Futures.immediateFuture(Collections.emptyList()); } catch (IOException | IllegalStateException e) { - // this happens sometimes, when burrow is acting up (e.g. "bad" consumer names) LOG.error("Failed to read Burrow status for consumer {}. Skipping", consumerGroup, e); metrics.error.labels(burrow.getCluster(), consumerGroup).inc(); return Futures.immediateFuture(Collections.emptyList()); From 4853a02fe5d90f9230b13e141243f3297e973abe Mon Sep 17 00:00:00 2001 From: Jesse Yates Date: Wed, 29 Jun 2022 12:01:44 +0200 Subject: [PATCH 2/2] Adding LOG.debug statements for calculating consumer freshness This gives consumers a flag to toggle - turning on DEBUG level logging - that though it is verbose, will allow them to see what is actually happening under the hood in the freshness tracker. This can be very useful for debugging 'weird' results from freshness computations. This also meant turning down the Burrow GET request logging from DEBUG to TRACE as we really dont need to see the requests in the logs at DEBUG when we are getting that same response information in ConsumerFreshness (albeit slightly massaged). --- .../main/java/com/tesla/data/consumer/freshness/Burrow.java | 2 +- .../tesla/data/consumer/freshness/ConsumerFreshness.java | 1 + .../com/tesla/data/consumer/freshness/FreshnessTracker.java | 6 ++++++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/Burrow.java b/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/Burrow.java index 536bf9e..3ee891b 100644 --- a/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/Burrow.java +++ b/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/Burrow.java @@ -82,7 +82,7 @@ private HttpUrl address(String... paths) { private Map request(String... paths) throws IOException { Response response = null; HttpUrl url = address(paths); - LOG.debug("GET {}", url); + LOG.trace("GET {}", url); Request request = new Request.Builder() .url(url) .get().build(); diff --git a/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/ConsumerFreshness.java b/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/ConsumerFreshness.java index 334ba4b..48ca651 100644 --- a/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/ConsumerFreshness.java +++ b/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/ConsumerFreshness.java @@ -383,6 +383,7 @@ private ListenableFuture> measureConsumer(Burrow.ClusterCl @Override public PartitionResult call() { try { + LOG.debug("Calculating freshness for {}", consumerState); tracker.run(); return new PartitionResult(consumerState); } catch (Exception e) { diff --git a/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/FreshnessTracker.java b/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/FreshnessTracker.java index 0f581a4..45cf8b4 100644 --- a/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/FreshnessTracker.java +++ b/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/FreshnessTracker.java @@ -70,6 +70,12 @@ public void run() { } Instant now = Instant.now(clock); freshness = Math.max(now.toEpochMilli() - record.timestamp(), 0); + LOG.debug("Found freshness of {} from first uncommitted record {} for {}", freshness, record, consumer); + } else { + // up-to-date consumers are at the LEO (i.e. lag == 0), so we don't have a 'first uncommitted record' that + // would be interesting to log, so just log the freshness == 0. Its in an 'else' to avoid too double logging + // for consumers that are not up-to-date. + LOG.debug("{} recording {} ms freshness", consumer, freshness); } metrics.freshness.labels(this.consumer.cluster, this.consumer.group, this.from.topic(), Integer.toString(this.from.partition()))