Skip to content

Commit

Permalink
[#3669] Fix deletion of obsolete metrics in KafkaConsumer.
Browse files Browse the repository at this point in the history
  • Loading branch information
calohmn authored and sophokles73 committed Dec 10, 2024
1 parent bc86807 commit 43b8466
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerDelegate;
import org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.kafka.KafkaClientFactory;
Expand Down Expand Up @@ -94,7 +99,13 @@ public class HonoKafkaConsumer<V> implements Lifecycle, ServiceClient {
*/
public static final long DEFAULT_POLL_TIMEOUT_MILLIS = 250;

private static final long OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS = TimeUnit.SECONDS.toMillis(30);
/**
* The name of the configuration property to set the delay after which obsolete metrics for a deleted Kafka topic
* are removed from the metrics of the Kafka consumer.
*/
public static final String CONFIG_HONO_OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS = "hono.obsolete.metrics.removal.delay.millis";

private static final long OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS_DEFAULT = TimeUnit.SECONDS.toMillis(30);
private static final String MSG_CONSUMER_NOT_INITIALIZED_STARTED = "consumer not initialized/started";
private static final Logger LOG = LoggerFactory.getLogger(HonoKafkaConsumer.class);

Expand Down Expand Up @@ -500,6 +511,16 @@ public final boolean isRecordHandlingAndPollingPaused() {
return pollingPaused.get();
}

/**
* Get the metrics kept by the consumer.
*
* @return The metrics.
* @throws IllegalStateException if invoked before the KafkaConsumer is set via the {@link #start()} method.
*/
public final Map<MetricName, ? extends Metric> metrics() {
return getUnderlyingConsumer().metrics();
}

/**
* Gets the used vert.x KafkaConsumer.
*
Expand Down Expand Up @@ -625,7 +646,7 @@ postponing record handling until consumer has been initialized \
/**
* {@inheritDoc}
* <p>
* This methods triggers the creation of a Kafka consumer in the background. A new attempt to create the
* This method triggers the creation of a Kafka consumer in the background. A new attempt to create the
* consumer is made periodically until creation succeeds or the {@link #stop()} method has been invoked.
* <p>
* Client code may {@linkplain #addOnKafkaConsumerReadyHandler(Handler) register a dedicated handler}
Expand Down Expand Up @@ -893,9 +914,13 @@ private void updateSubscribedTopicPatternTopicsAndRemoveMetrics() {
.filter(t -> !subscribedTopicPatternTopics.contains(t))
.collect(Collectors.toSet());
if (!deletedTopics.isEmpty()) {
LOG.debug("deleted topics: {}", deletedTopics);
// actual removal to be done with a delay, as there might still be unprocessed fetch response data
// regarding these topics, in which case metrics would get re-created after they were removed
runOnContext(v -> vertx.setTimer(OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS, tid -> {
final long obsoleteMetricsRemovalDelayMillis = Optional
.ofNullable(consumerConfig.get(CONFIG_HONO_OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS))
.map(Long::parseLong).orElse(OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS_DEFAULT);
runOnContext(v -> vertx.setTimer(obsoleteMetricsRemovalDelayMillis, tid -> {
runOnKafkaWorkerThread(v2 -> {
removeMetricsForDeletedTopics(deletedTopics.stream()
.filter(t -> !subscribedTopicPatternTopics.contains(t)));
Expand Down Expand Up @@ -985,7 +1010,7 @@ protected void onPartitionsRevokedBlocking(final Set<TopicPartition> partitionsS
* <p>
* This default implementation does nothing. Subclasses may override this method.
*
* @param partitionsSet The list of partitions that are not assigned to this consumer any more.
* @param partitionsSet The list of partitions that are not assigned to this consumer anymore.
*/
protected void onPartitionsLostBlocking(final Set<TopicPartition> partitionsSet) {
// do nothing by default
Expand Down Expand Up @@ -1050,7 +1075,7 @@ protected void runOnContext(final Handler<Void> codeToRun) {
/**
* Runs the given handler on the Kafka polling thread.
* <p>
* The invocation of the handler is skipped if the this consumer is already closed.
* The invocation of the handler is skipped if this consumer is already closed.
*
* @param handler The handler to invoke.
* @throws IllegalStateException if the corresponding executor service isn't available because no subscription
Expand Down Expand Up @@ -1151,7 +1176,7 @@ public final Future<Void> ensureTopicIsAmongSubscribedTopicPatternTopics(final S
LOG.debug("ensureTopicIsAmongSubscribedTopics: topic is already subscribed [{}]", topic);
return Future.succeededFuture();
}

LOG.debug("ensureTopicIsAmongSubscribedTopics: called for topic [{}]", topic);
synchronized (subscriptionUpdateTrackersForToBeAddedTopics) {
final var tracker = new SubscriptionUpdateTracker(topic);

Expand All @@ -1168,7 +1193,7 @@ public final Future<Void> ensureTopicIsAmongSubscribedTopicPatternTopics(final S

private void triggerTopicPatternSubscriptionUpdate() {
if (!subscriptionUpdateTriggered.compareAndSet(false, true)) {
LOG.debug("ensureTopicIsAmongSubscribedTopics: subscription update already triggered");
LOG.debug("triggerTopicPatternSubscriptionUpdate: subscription update already triggered");
return;
}
runOnKafkaWorkerThread(v -> {
Expand Down Expand Up @@ -1198,7 +1223,7 @@ private void triggerTopicPatternSubscriptionUpdate() {
try {
LOG.info("triggering refresh of subscribed topic list ...");
getUnderlyingConsumer().subscribe(topicPattern, rebalanceListener);
if (!metadataMaxAge.isPresent() || metadataMaxAge.get() > THRESHOLD_METADATA_MAX_AGE_MS) {
if (metadataMaxAge.isEmpty() || metadataMaxAge.get() > THRESHOLD_METADATA_MAX_AGE_MS) {
// Partitions of newly created topics are being assigned by means of
// a rebalance. We make sure the rebalancing happens during the next poll()
// operation in order to not having to wait for the metadata to become stale
Expand All @@ -1217,35 +1242,46 @@ private void triggerTopicPatternSubscriptionUpdate() {
}

private void failAllSubscriptionUpdateTrackers(final Exception failure) {
final List<SubscriptionUpdateTracker> toBeFailedTrackers = new ArrayList<>();
final List<SubscriptionUpdateTracker> toBeFailedTrackers;
synchronized (subscriptionUpdateTrackersForToBeAddedTopics) {
toBeFailedTrackers.addAll(subscriptionUpdateTrackersForToBeAddedTopics.values());
toBeFailedTrackers = new ArrayList<>(subscriptionUpdateTrackersForToBeAddedTopics.values());
subscriptionUpdateTrackersForToBeAddedTopics.clear();
}
toBeFailedTrackers.forEach(tracker -> {
runOnContext(v -> tracker.fail(failure));
});
toBeFailedTrackers.forEach(tracker -> runOnContext(v -> tracker.fail(failure)));
}

private void removeMetricsForDeletedTopics(final Stream<String> deletedTopics) {
final Metrics metrics = getInternalMetricsObject(kafkaConsumer.unwrap());
if (metrics != null) {
deletedTopics.forEach(topic -> {
LOG.debug("removing metrics for deleted topic: {}", topic);
metrics.removeSensor("topic." + topic + ".bytes-fetched");
metrics.removeSensor("topic." + topic + ".records-fetched");
});
}
}

@SuppressWarnings("unchecked")
private Metrics getInternalMetricsObject(final Consumer<String, V> consumer) {
if (consumer instanceof org.apache.kafka.clients.consumer.KafkaConsumer) {
try {
final Field field = org.apache.kafka.clients.consumer.KafkaConsumer.class.getDeclaredField("metrics");
field.setAccessible(true);
return (Metrics) field.get(consumer);
final Field delegateField = org.apache.kafka.clients.consumer.KafkaConsumer.class.getDeclaredField("delegate");
delegateField.setAccessible(true);
final ConsumerDelegate<String, V> delegate = (ConsumerDelegate<String, V>) delegateField.get(consumer);
if (delegate instanceof AsyncKafkaConsumer) {
final Field metricsField = AsyncKafkaConsumer.class.getDeclaredField("metrics");
metricsField.setAccessible(true);
return (Metrics) metricsField.get(delegate);
} else if (delegate instanceof LegacyKafkaConsumer) {
final Field metricsField = LegacyKafkaConsumer.class.getDeclaredField("metrics");
metricsField.setAccessible(true);
return (Metrics) metricsField.get(delegate);
}
} catch (final Exception e) {
LOG.warn("failed to get metrics object", e);
}
} else {
LOG.warn("unsupported consumer type: {}", consumer.getClass().getName());
}
return null;
}
Expand Down Expand Up @@ -1300,7 +1336,7 @@ private ExecutorService getKafkaConsumerWorker(final KafkaConsumer<String, V> co
return worker;
}

private final class SubscriptionUpdateTracker {
private static final class SubscriptionUpdateTracker {
private final Promise<Void> outcome = Promise.promise();
private final String topicName;
private final AtomicInteger rebalancesLeft = new AtomicInteger(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand All @@ -30,7 +31,9 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.MetricName;
import org.eclipse.hono.client.kafka.consumer.AsyncHandlingAutoCommitKafkaConsumer;
import org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumer;
import org.eclipse.hono.tests.EnabledIfMessagingSystemConfigured;
import org.eclipse.hono.tests.IntegrationTestSupport;
import org.eclipse.hono.util.MessagingType;
Expand All @@ -49,6 +52,7 @@
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.junit5.Timeout;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import io.vertx.kafka.admin.KafkaAdminClient;
Expand Down Expand Up @@ -171,13 +175,13 @@ public void testConsumerReadsAllRecordsForDynamicallyCreatedTopics(

final var topicsToPublishTo = IntStream.range(0, numTopicsAndRecords)
.mapToObj(i -> "%s%d".formatted(patternPrefix, i))
.collect(Collectors.toList());
.toList();

// create some matching topics - these shall be deleted after consumer start;
// this shall make sure that topic deletion doesn't influence the test result
final var otherTopics = IntStream.range(0, numTopicsAndRecords)
.mapToObj(i -> "%s%d_other".formatted(patternPrefix, i))
.collect(Collectors.toList());
.toList();

final var recordsReceived = ctx.checkpoint(numTopicsAndRecords);
final String recordKey = "addedAfterStartKey";
Expand Down Expand Up @@ -235,6 +239,99 @@ public void testConsumerReadsAllRecordsForDynamicallyCreatedTopics(
}
}

/**
* Verifies that a topic-pattern based AsyncHandlingAutoCommitKafkaConsumer removes topic-related metrics
* once a topic that matches the topic-pattern gets deleted.
*
* NOTE: The logic for removing the metrics is located in HonoKafkaConsumer, therefore there should better be a test
* in HonoKafkaConsumerIT. But this proves to be difficult with the current integration test Kafka setup, where
* topic auto-creation is enabled in the broker config. For some reason, even when setting 'allow.auto.create.topics=false'
* for the consumer and having ensured that offsets got committed before topic-deletion (along with disabled standard
* auto-commit), the topic gets again auto-created some time after it got deleted, letting the test fail.
* With the manual auto-commit handling of the AsyncHandlingAutoCommitKafkaConsumer, this isn't the case.
*
* @param partitionAssignmentStrategy The partition assignment strategy to use for the consumer.
* @param ctx The vert.x test context.
*/
@ParameterizedTest(name = IntegrationTestSupport.PARAMETERIZED_TEST_NAME_PATTERN)
@MethodSource("partitionAssignmentStrategies")
@Timeout(value = 10, timeUnit = TimeUnit.SECONDS)
public void testPatternBasedConsumerRemovesMetricsOfDeletedTopics(
final String partitionAssignmentStrategy,
final VertxTestContext ctx) {

// prepare consumer
final var consumerConfig = IntegrationTestSupport.getKafkaConsumerConfig().getConsumerConfig("test");
applyPartitionAssignmentStrategy(consumerConfig, partitionAssignmentStrategy);
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
consumerConfig.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
consumerConfig.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100");
consumerConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
consumerConfig.put(HonoKafkaConsumer.CONFIG_HONO_OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS, "200");

final Promise<Void> recordReceivedPromise = Promise.promise();
final Function<KafkaConsumerRecord<String, Buffer>, Future<Void>> recordHandler = record -> {
LOG.debug("received record: {}", record);
recordReceivedPromise.complete();
return Future.succeededFuture();
};
final String topicPrefix = "test_" + UUID.randomUUID();
final String topic = topicPrefix + "_toBeDeleted";
kafkaConsumer = new AsyncHandlingAutoCommitKafkaConsumer<>(vertx, Pattern.compile(topicPrefix + ".*"),
recordHandler, consumerConfig);
// create topic and start consumer
final Promise<Void> consumerReadyTracker = Promise.promise();
kafkaConsumer.addOnKafkaConsumerReadyHandler(consumerReadyTracker);
adminClient.createTopics(List.of(new NewTopic(topic, 1, REPLICATION_FACTOR)))
.compose(ok -> kafkaConsumer.start())
.compose(ok -> consumerReadyTracker.future())
.compose(ok -> {
ctx.verify(() -> assertThat(recordReceivedPromise.future().isComplete()).isFalse());
LOG.debug("consumer started, publishing record to be received by the consumer...");
return Future.all(
publish(topic, "recordKey", Buffer.buffer("testPayload")),
recordReceivedPromise.future());
})
.compose(ok -> {
LOG.debug("waiting for offset to be committed");
final Promise<Void> offsetCommitCheckTracker = Promise.promise();
final AtomicInteger checkCount = new AtomicInteger(0);
vertx.setPeriodic(100, tid -> {
if (!kafkaConsumer.isOffsetsCommitNeededForTopic(topic)) {
vertx.cancelTimer(tid);
offsetCommitCheckTracker.complete();
} else if (checkCount.incrementAndGet() >= 10) {
vertx.cancelTimer(tid);
offsetCommitCheckTracker.fail("timeout waiting for offset commit");
}
});
return offsetCommitCheckTracker.future();
})
.compose(ok -> {
ctx.verify(() -> assertThat(getTopicRelatedMetricNames(topic)).isNotEmpty());
LOG.debug("delete topic {}", topic);
return adminClient.deleteTopics(List.of(topic));
})
.compose(ok -> {
LOG.debug("waiting for metrics to be removed...");
final Promise<Void> metricCheckTracker = Promise.promise();
final AtomicInteger checkCount = new AtomicInteger(0);
vertx.setPeriodic(200, tid -> {
LOG.debug("topic-related metrics: {}", getTopicRelatedMetricNames(topic));
if (getTopicRelatedMetricNames(topic).isEmpty()) {
vertx.cancelTimer(tid);
metricCheckTracker.complete();
} else if (checkCount.incrementAndGet() >= 40) {
vertx.cancelTimer(tid);
metricCheckTracker.fail("timeout waiting for metrics to be removed");
}
});
return metricCheckTracker.future();
})
.onComplete(ctx.succeeding(v -> ctx.completeNow()));
}

private Future<Void> ensureTopicIsAmongSubscribedTopicPatternTopicsAndPublish(
final VertxTestContext ctx,
final String topic,
Expand Down Expand Up @@ -288,5 +385,12 @@ private void applyPartitionAssignmentStrategy(final Map<String, String> consumer
Optional.ofNullable(partitionAssignmentStrategy)
.ifPresent(s -> consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, s));
}

private List<String> getTopicRelatedMetricNames(final String topicName) {
return kafkaConsumer.metrics().keySet().stream()
.filter(metricName -> metricName.tags().containsValue(topicName))
.map(MetricName::name)
.collect(Collectors.toList());
}
}

0 comments on commit 43b8466

Please sign in to comment.