diff --git a/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/ConsumerFreshness.java b/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/ConsumerFreshness.java index e5a1186..a7c730b 100644 --- a/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/ConsumerFreshness.java +++ b/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/ConsumerFreshness.java @@ -40,6 +40,7 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; @@ -253,7 +254,7 @@ void run() { * @throws RuntimeException if there is a systemic problem that should shutdown the application. */ private ListenableFuture measureCluster(Burrow.ClusterClient client) { - List>> completedConsumers = new ArrayList<>(); + List>> completedConsumers = new ArrayList<>(); List consumerGroups; try { consumerGroups = client.consumerGroups(); @@ -265,8 +266,9 @@ private ListenableFuture measureCluster(Burrow.ClusterClient client) { return Futures.immediateFailedFuture(e); } + String cluster = client.getCluster(); try { - ArrayBlockingQueue workers = this.availableWorkers.get(client.getCluster()); + ArrayBlockingQueue workers = this.availableWorkers.get(cluster); for (String consumerGroup : consumerGroups) { completedConsumers.add(measureConsumer(client, workers, consumerGroup)); } @@ -278,10 +280,40 @@ private ListenableFuture measureCluster(Burrow.ClusterClient client) { throw e; } - // if all the consumer measurements succeed, then we return the cluster name + // if at least one consumer measurement succeeds, then we return the cluster name // otherwise, Future.get will throw an exception representing the failure to measure a consumer (and thus the // failure to successfully monitor the cluster). - return Futures.whenAllSucceed(completedConsumers).call(client::getCluster, this.executor); + return Futures.whenAllSucceed(completedConsumers).call(() -> { + List allPartitions = completedConsumers.stream() + .flatMap(f -> { + // recall, these have all completed successfully by this point, unless it's something catastrophic, so + // this is safe to just re-throw if we do find an exception + try { + return f.get().stream(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); + + long successes = allPartitions.stream() + .filter(result -> result.success) + .count(); + + // if any single partition for any single consumer succeeded, then we count the cluster as having been successful + if (successes != allPartitions.size()) { + LOG.info("Freshness for {} / {} partitions across all consumers succeeded for cluster {}", + successes, allPartitions.size(), cluster); + } + + if (successes > 0) { + LOG.info("Got freshness for at least one partition for one consumer partition for {} marking the cluster " + + "successful", cluster); + return cluster; + } + + throw new RuntimeException("No single partition for any topic for any consumer for cluster {}" + cluster + + " returned successfully - is the cluster configured correctly?"); + }, this.executor); } /** @@ -299,7 +331,7 @@ private ListenableFuture measureCluster(Burrow.ClusterClient client) { * failed to be measured the entire future (i.e. calls to {@link Future#get()}) will be considered a failure. * @throws InterruptedException if the application is interrupted while waiting for an available worker */ - private ListenableFuture> measureConsumer(Burrow.ClusterClient burrow, + private ListenableFuture> measureConsumer(Burrow.ClusterClient burrow, ArrayBlockingQueue workers, String consumerGroup) throws InterruptedException { Map status; @@ -316,7 +348,7 @@ private ListenableFuture> measureConsumer(Burrow.ClusterClient burr boolean anyEndOffsetFound = false; List> partitions = (List>) status.get("partitions"); - List> partitionFreshnessComputation = new ArrayList<>(partitions.size()); + List> partitionFreshnessComputation = new ArrayList<>(partitions.size()); for (Map state : partitions) { String topic = (String) state.get("topic"); int partition = (int) state.get("partition"); @@ -336,7 +368,22 @@ private ListenableFuture> measureConsumer(Burrow.ClusterClient burr // wait for a consumer to become available KafkaConsumer consumer = workers.take(); - ListenableFuture result = this.executor.submit(new FreshnessTracker(consumerState, consumer, metrics)); + ListenableFuture result = this.executor.submit(new Callable() { + FreshnessTracker tracker = new FreshnessTracker(consumerState, consumer, metrics); + + @Override + public PartitionResult call() { + try { + tracker.run(); + return new PartitionResult(consumerState); + } catch (Exception e) { + // intentionally at debug - there are many reasons for failures and often many partitions will fail for + // one reason or another, which can clog the logs. + LOG.debug("Failed to evaluate freshness for {}", consumerState, e); + return new PartitionResult(consumerState, e); + } + } + }); // Hand back the consumer to the available workers when the task is complete Futures.addCallback(result, new FutureCallback() { @Override @@ -367,16 +414,33 @@ public void onFailure(Throwable throwable) { .map(partition -> { try { return partition.get(); - } catch (Exception e) { - // skip it! - return null; + } catch (Exception e){ + // only can happen if we are interrupted, or something catastrophic, which both fit our criteria for + // failing the consumer + throw new RuntimeException(e); } }) - .filter(Objects::isNull) .collect(Collectors.toList()); }, this.executor); } + class PartitionResult { + FreshnessTracker.ConsumerOffset consumerOffset; + boolean success; + Optional errorCause; + + + public PartitionResult(FreshnessTracker.ConsumerOffset consumerOffset) { + this(consumerOffset, null); + } + + public PartitionResult(FreshnessTracker.ConsumerOffset consumerOffset, Throwable errorCause) { + this.consumerOffset = consumerOffset; + this.errorCause = Optional.ofNullable(errorCause); + this.success = !this.errorCause.isPresent(); + } + } + private void stop() { if (this.executor != null) { this.executor.shutdown(); diff --git a/kafka_consumer_freshness_tracker/src/test/java/com/tesla/data/consumer/freshness/ConsumerFreshnessTest.java b/kafka_consumer_freshness_tracker/src/test/java/com/tesla/data/consumer/freshness/ConsumerFreshnessTest.java index 4ec42e9..72ab8cf 100644 --- a/kafka_consumer_freshness_tracker/src/test/java/com/tesla/data/consumer/freshness/ConsumerFreshnessTest.java +++ b/kafka_consumer_freshness_tracker/src/test/java/com/tesla/data/consumer/freshness/ConsumerFreshnessTest.java @@ -38,6 +38,7 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.function.Consumer; @@ -105,9 +106,9 @@ public void testLargeCurrentLag() throws Exception { freshness.run(); FreshnessMetrics metrics = freshness.getMetricsForTesting(); - + assertSuccessfulClusterMeasurement(freshness, "cluster1"); Gauge.Child measurement = metrics.freshness.labels("cluster1", "group1", "topic1", "1"); - assertTrue("Should have at least the specified lag for the group "+lagMs+", but found"+measurement.get(), + assertTrue("Should have at least the specified lag for the group " + lagMs + ", but found" + measurement.get(), measurement.get() >= lagMs); }); } @@ -146,12 +147,17 @@ public void testFailClusterWhenInterruptedWaitingForAConsumer() throws Exception public void testFailConsumerButNotClusterIfComputationFails() throws Exception { Burrow burrow = mock(Burrow.class); Burrow.ClusterClient client = mockClusterState("cluster1", "group1", - partitionState("topic1", 1, 10, 10L)); + partitionState("topic1", 0, 10, 10L), + partitionState("topic1", 1, 10, 10L) + ); when(burrow.getClusters()).thenReturn(newArrayList(client)); withExecutor(executor -> { KafkaConsumer consumer = mock(KafkaConsumer.class); - when(consumer.poll(Mockito.any(Duration.class))).thenThrow(new RuntimeException("injected")); + // first consumer lookup fails, the second one success + when(consumer.poll(Mockito.any(Duration.class))) + .thenThrow(new RuntimeException("injected")) + .thenReturn(records("topic", 1, 10, 10L)); ConsumerFreshness freshness = new ConsumerFreshness(); freshness.setupForTesting(burrow, workers("cluster1", consumer), executor); @@ -219,7 +225,7 @@ public void testBurrowFailingToReadConsumerGroupStatusMarksGroupError() throws E freshness.run(); assertEquals(1.0, freshness.getMetricsForTesting().error.labels("cluster", "group").get(), 0.0); // failing all the groups status lookup should not fail the cluster. Feels weird, but it's the current behavior - assertSuccessfulClusterMeasurement(freshness, "cluster"); + assertNoSuccessfulClusterMeasurement(freshness, "cluster"); } catch (Exception e) { throw new RuntimeException(e); } @@ -240,8 +246,8 @@ public void testBurrowMissingConsumerGroupPartitionsMarksErrorForGroup() throws when(client.getConsumerGroupStatus("group")).thenReturn(new HashMap<>()); freshness.run(); assertEquals(1.0, freshness.getMetricsForTesting().error.labels("cluster", "group").get(), 0.0); - // the cluster is overall successful, even though the group fails - assertSuccessfulClusterMeasurement(freshness, "cluster"); + // no consumer group was successful, cluster is not successful + assertNoSuccessfulClusterMeasurement(freshness, "cluster"); } catch (Exception e) { throw new RuntimeException(e); } @@ -267,8 +273,8 @@ public void testBurrowMissingConsumerGroupPartitionEndOffsetMarksMissing() throw ))); freshness.run(); assertEquals(1.0, freshness.getMetricsForTesting().missing.get(), 0.0); - // the cluster is overall successful, even though the group fails - assertSuccessfulClusterMeasurement(freshness, "cluster"); + // no consumer group was successful, cluster is not successful + assertNoSuccessfulClusterMeasurement(freshness, "cluster"); } catch (Exception e) { throw new RuntimeException(e); } @@ -420,7 +426,7 @@ public void testFailToSubmitTaskExitsTracker() throws Exception { Burrow.ClusterClient client = mockClusterState("cluster", "group", partitionState("t", 1, 1, 0)); when(burrow.getClusters()).thenReturn(newArrayList(client)); Exception cause = new RejectedExecutionException("injected"); - when(executor.submit(any(FreshnessTracker.class))).thenThrow(cause); + when(executor.submit(any(Callable.class))).thenThrow(cause); thrown.expect(RuntimeException.class); thrown.expectCause(org.hamcrest.CoreMatchers.equalTo(cause));