From 491b1b84961100a786b4da9eaca3572f9b893df9 Mon Sep 17 00:00:00 2001 From: Jesse Yates Date: Wed, 29 Jun 2022 11:35:06 +0200 Subject: [PATCH] Cluster freshness should fail if no single partition succeeds (#58) Across all the partitions for all topics for all consumers for a given cluster, if we succeed at reading the freshness for at least one of the consumers then the cluster freshness succeeds. Without this, if none the consumers could be evaluated successfully then the cluster would still be marked successful, but that often indicated an incorrectly configured cluster. However, if we are able to read even a single partition, then we can reach the cluster. Maybe its a transient so we should be allowed a next round to try to get more successes. Addresses #51 --- .../consumer/freshness/ConsumerFreshness.java | 86 ++++++++++++++++--- .../freshness/ConsumerFreshnessTest.java | 26 +++--- 2 files changed, 91 insertions(+), 21 deletions(-) diff --git a/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/ConsumerFreshness.java b/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/ConsumerFreshness.java index e5a1186..a7c730b 100644 --- a/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/ConsumerFreshness.java +++ b/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/ConsumerFreshness.java @@ -40,6 +40,7 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; @@ -253,7 +254,7 @@ void run() { * @throws RuntimeException if there is a systemic problem that should shutdown the application. */ private ListenableFuture measureCluster(Burrow.ClusterClient client) { - List>> completedConsumers = new ArrayList<>(); + List>> completedConsumers = new ArrayList<>(); List consumerGroups; try { consumerGroups = client.consumerGroups(); @@ -265,8 +266,9 @@ private ListenableFuture measureCluster(Burrow.ClusterClient client) { return Futures.immediateFailedFuture(e); } + String cluster = client.getCluster(); try { - ArrayBlockingQueue workers = this.availableWorkers.get(client.getCluster()); + ArrayBlockingQueue workers = this.availableWorkers.get(cluster); for (String consumerGroup : consumerGroups) { completedConsumers.add(measureConsumer(client, workers, consumerGroup)); } @@ -278,10 +280,40 @@ private ListenableFuture measureCluster(Burrow.ClusterClient client) { throw e; } - // if all the consumer measurements succeed, then we return the cluster name + // if at least one consumer measurement succeeds, then we return the cluster name // otherwise, Future.get will throw an exception representing the failure to measure a consumer (and thus the // failure to successfully monitor the cluster). - return Futures.whenAllSucceed(completedConsumers).call(client::getCluster, this.executor); + return Futures.whenAllSucceed(completedConsumers).call(() -> { + List allPartitions = completedConsumers.stream() + .flatMap(f -> { + // recall, these have all completed successfully by this point, unless it's something catastrophic, so + // this is safe to just re-throw if we do find an exception + try { + return f.get().stream(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); + + long successes = allPartitions.stream() + .filter(result -> result.success) + .count(); + + // if any single partition for any single consumer succeeded, then we count the cluster as having been successful + if (successes != allPartitions.size()) { + LOG.info("Freshness for {} / {} partitions across all consumers succeeded for cluster {}", + successes, allPartitions.size(), cluster); + } + + if (successes > 0) { + LOG.info("Got freshness for at least one partition for one consumer partition for {} marking the cluster " + + "successful", cluster); + return cluster; + } + + throw new RuntimeException("No single partition for any topic for any consumer for cluster {}" + cluster + + " returned successfully - is the cluster configured correctly?"); + }, this.executor); } /** @@ -299,7 +331,7 @@ private ListenableFuture measureCluster(Burrow.ClusterClient client) { * failed to be measured the entire future (i.e. calls to {@link Future#get()}) will be considered a failure. * @throws InterruptedException if the application is interrupted while waiting for an available worker */ - private ListenableFuture> measureConsumer(Burrow.ClusterClient burrow, + private ListenableFuture> measureConsumer(Burrow.ClusterClient burrow, ArrayBlockingQueue workers, String consumerGroup) throws InterruptedException { Map status; @@ -316,7 +348,7 @@ private ListenableFuture> measureConsumer(Burrow.ClusterClient burr boolean anyEndOffsetFound = false; List> partitions = (List>) status.get("partitions"); - List> partitionFreshnessComputation = new ArrayList<>(partitions.size()); + List> partitionFreshnessComputation = new ArrayList<>(partitions.size()); for (Map state : partitions) { String topic = (String) state.get("topic"); int partition = (int) state.get("partition"); @@ -336,7 +368,22 @@ private ListenableFuture> measureConsumer(Burrow.ClusterClient burr // wait for a consumer to become available KafkaConsumer consumer = workers.take(); - ListenableFuture result = this.executor.submit(new FreshnessTracker(consumerState, consumer, metrics)); + ListenableFuture result = this.executor.submit(new Callable() { + FreshnessTracker tracker = new FreshnessTracker(consumerState, consumer, metrics); + + @Override + public PartitionResult call() { + try { + tracker.run(); + return new PartitionResult(consumerState); + } catch (Exception e) { + // intentionally at debug - there are many reasons for failures and often many partitions will fail for + // one reason or another, which can clog the logs. + LOG.debug("Failed to evaluate freshness for {}", consumerState, e); + return new PartitionResult(consumerState, e); + } + } + }); // Hand back the consumer to the available workers when the task is complete Futures.addCallback(result, new FutureCallback() { @Override @@ -367,16 +414,33 @@ public void onFailure(Throwable throwable) { .map(partition -> { try { return partition.get(); - } catch (Exception e) { - // skip it! - return null; + } catch (Exception e){ + // only can happen if we are interrupted, or something catastrophic, which both fit our criteria for + // failing the consumer + throw new RuntimeException(e); } }) - .filter(Objects::isNull) .collect(Collectors.toList()); }, this.executor); } + class PartitionResult { + FreshnessTracker.ConsumerOffset consumerOffset; + boolean success; + Optional errorCause; + + + public PartitionResult(FreshnessTracker.ConsumerOffset consumerOffset) { + this(consumerOffset, null); + } + + public PartitionResult(FreshnessTracker.ConsumerOffset consumerOffset, Throwable errorCause) { + this.consumerOffset = consumerOffset; + this.errorCause = Optional.ofNullable(errorCause); + this.success = !this.errorCause.isPresent(); + } + } + private void stop() { if (this.executor != null) { this.executor.shutdown(); diff --git a/kafka_consumer_freshness_tracker/src/test/java/com/tesla/data/consumer/freshness/ConsumerFreshnessTest.java b/kafka_consumer_freshness_tracker/src/test/java/com/tesla/data/consumer/freshness/ConsumerFreshnessTest.java index 4ec42e9..72ab8cf 100644 --- a/kafka_consumer_freshness_tracker/src/test/java/com/tesla/data/consumer/freshness/ConsumerFreshnessTest.java +++ b/kafka_consumer_freshness_tracker/src/test/java/com/tesla/data/consumer/freshness/ConsumerFreshnessTest.java @@ -38,6 +38,7 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.function.Consumer; @@ -105,9 +106,9 @@ public void testLargeCurrentLag() throws Exception { freshness.run(); FreshnessMetrics metrics = freshness.getMetricsForTesting(); - + assertSuccessfulClusterMeasurement(freshness, "cluster1"); Gauge.Child measurement = metrics.freshness.labels("cluster1", "group1", "topic1", "1"); - assertTrue("Should have at least the specified lag for the group "+lagMs+", but found"+measurement.get(), + assertTrue("Should have at least the specified lag for the group " + lagMs + ", but found" + measurement.get(), measurement.get() >= lagMs); }); } @@ -146,12 +147,17 @@ public void testFailClusterWhenInterruptedWaitingForAConsumer() throws Exception public void testFailConsumerButNotClusterIfComputationFails() throws Exception { Burrow burrow = mock(Burrow.class); Burrow.ClusterClient client = mockClusterState("cluster1", "group1", - partitionState("topic1", 1, 10, 10L)); + partitionState("topic1", 0, 10, 10L), + partitionState("topic1", 1, 10, 10L) + ); when(burrow.getClusters()).thenReturn(newArrayList(client)); withExecutor(executor -> { KafkaConsumer consumer = mock(KafkaConsumer.class); - when(consumer.poll(Mockito.any(Duration.class))).thenThrow(new RuntimeException("injected")); + // first consumer lookup fails, the second one success + when(consumer.poll(Mockito.any(Duration.class))) + .thenThrow(new RuntimeException("injected")) + .thenReturn(records("topic", 1, 10, 10L)); ConsumerFreshness freshness = new ConsumerFreshness(); freshness.setupForTesting(burrow, workers("cluster1", consumer), executor); @@ -219,7 +225,7 @@ public void testBurrowFailingToReadConsumerGroupStatusMarksGroupError() throws E freshness.run(); assertEquals(1.0, freshness.getMetricsForTesting().error.labels("cluster", "group").get(), 0.0); // failing all the groups status lookup should not fail the cluster. Feels weird, but it's the current behavior - assertSuccessfulClusterMeasurement(freshness, "cluster"); + assertNoSuccessfulClusterMeasurement(freshness, "cluster"); } catch (Exception e) { throw new RuntimeException(e); } @@ -240,8 +246,8 @@ public void testBurrowMissingConsumerGroupPartitionsMarksErrorForGroup() throws when(client.getConsumerGroupStatus("group")).thenReturn(new HashMap<>()); freshness.run(); assertEquals(1.0, freshness.getMetricsForTesting().error.labels("cluster", "group").get(), 0.0); - // the cluster is overall successful, even though the group fails - assertSuccessfulClusterMeasurement(freshness, "cluster"); + // no consumer group was successful, cluster is not successful + assertNoSuccessfulClusterMeasurement(freshness, "cluster"); } catch (Exception e) { throw new RuntimeException(e); } @@ -267,8 +273,8 @@ public void testBurrowMissingConsumerGroupPartitionEndOffsetMarksMissing() throw ))); freshness.run(); assertEquals(1.0, freshness.getMetricsForTesting().missing.get(), 0.0); - // the cluster is overall successful, even though the group fails - assertSuccessfulClusterMeasurement(freshness, "cluster"); + // no consumer group was successful, cluster is not successful + assertNoSuccessfulClusterMeasurement(freshness, "cluster"); } catch (Exception e) { throw new RuntimeException(e); } @@ -420,7 +426,7 @@ public void testFailToSubmitTaskExitsTracker() throws Exception { Burrow.ClusterClient client = mockClusterState("cluster", "group", partitionState("t", 1, 1, 0)); when(burrow.getClusters()).thenReturn(newArrayList(client)); Exception cause = new RejectedExecutionException("injected"); - when(executor.submit(any(FreshnessTracker.class))).thenThrow(cause); + when(executor.submit(any(Callable.class))).thenThrow(cause); thrown.expect(RuntimeException.class); thrown.expectCause(org.hamcrest.CoreMatchers.equalTo(cause));