Skip to content

Commit

Permalink
[#3558] Make HonoKafkaConsumer compatible with Kafka 3.5
Browse files Browse the repository at this point in the history
The HonoKafkaConsumer has been adapted to explicitly enforce a
rebalance in order to get assigned partitions for auto-created
topics if the metadata.max.age.ms configuration property is not
set or is set to a value greater than 500.

Kafka 3.5 seems to have changed its rebalancing behavior with
regard to auto-created topics. Without this modification,
the assignment of new partitions to the consumer will be deferred
to after the metadata has become stale, which happens after 5 minutes
if not explicitly overridden using the metadata.max.age.ms
configuration property.
  • Loading branch information
sophokles73 committed Oct 7, 2023
1 parent a3e6da5 commit e826907
Show file tree
Hide file tree
Showing 7 changed files with 436 additions and 231 deletions.
5 changes: 5 additions & 0 deletions clients/kafka-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@
<artifactId>opentelemetry-sdk</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.hono</groupId>
<artifactId>core-test-utils</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.hono</groupId>
<artifactId>kafka-test-utils</artifactId>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2021, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2021, 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand All @@ -23,6 +23,7 @@
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
Expand All @@ -44,12 +45,15 @@
import org.apache.kafka.common.record.TimestampType;
import org.eclipse.hono.client.kafka.KafkaClientFactory;
import org.eclipse.hono.kafka.test.KafkaMockConsumer;
import org.eclipse.hono.test.JUnitTests;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.function.Executable;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -283,12 +287,20 @@ public void testConsumerCreationWithTopicPatternSucceeds(final VertxTestContext
* Verifies that invoking <em>ensureTopicIsAmongSubscribedTopicPatternTopics</em> succeeds for
* a topic that matches the topic pattern but has been created after the consumer started.
*
* @param metadataMaxAgeMs The {@value ConsumerConfig#METADATA_MAX_AGE_CONFIG} value to set on the consumer.
* @param requiresExplicitRebalancing {@code true} if the max age value should result in the consumer enforcing a rebalance.
* @param ctx The vert.x test context.
*/
@Test
public void testEnsureTopicIsAmongSubscribedTopicsSucceedsForAddedTopic(final VertxTestContext ctx) {
@ParameterizedTest(name = JUnitTests.PARAMETERIZED_TEST_NAME_PATTERN)
@CsvSource(value = { ",true", "700,true", "300,false" })
public void testEnsureTopicIsAmongSubscribedTopicsSucceedsForAddedTopic(
final String metadataMaxAgeMs,
final boolean requiresExplicitRebalancing,
final VertxTestContext ctx) {
final var consumerConfig = consumerConfigProperties.getConsumerConfig("test");
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
Optional.ofNullable(metadataMaxAgeMs)
.ifPresent(s -> consumerConfig.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, s));
final Promise<Void> readyTracker = Promise.promise();

mockConsumer.updateBeginningOffsets(Map.of(topicPartition, 0L, topic2Partition, 0L));
Expand Down Expand Up @@ -316,6 +328,7 @@ public void testEnsureTopicIsAmongSubscribedTopicsSucceedsForAddedTopic(final Ve
})
.onComplete(ctx.succeeding(ok -> {
ctx.verify(() -> {
assertThat(mockConsumer.shouldRebalance()).isEqualTo(requiresExplicitRebalancing);
assertThat(consumer.getSubscribedTopicPatternTopics()).containsExactly(TOPIC, TOPIC2, TOPIC3);
});
ctx.completeNow();
Expand Down Expand Up @@ -415,8 +428,17 @@ private ConsumerRecord<String, Buffer> createRecordWithElapsedTtl() {
final byte[] timestamp2SecondsAgo = Json.encode(Instant.now().minusSeconds(2).toEpochMilli()).getBytes();
final RecordHeader creationTime = new RecordHeader("creation-time", timestamp2SecondsAgo);

return new ConsumerRecord<>(TOPIC, PARTITION, 0, ConsumerRecord.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
(long) ConsumerRecord.NULL_CHECKSUM, ConsumerRecord.NULL_SIZE, ConsumerRecord.NULL_SIZE, "key_0",
Buffer.buffer(), new RecordHeaders(new Header[] { ttl, creationTime }));
return new ConsumerRecord<>(
TOPIC,
PARTITION,
0,
ConsumerRecord.NO_TIMESTAMP,
TimestampType.NO_TIMESTAMP_TYPE,
ConsumerRecord.NULL_SIZE,
ConsumerRecord.NULL_SIZE,
"key_0",
Buffer.buffer(),
new RecordHeaders(new Header[] { ttl, creationTime }),
Optional.empty());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*******************************************************************************
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*******************************************************************************/

package org.eclipse.hono.test;


/**
* Utility methods for implementing JUnit tests.
*/
public final class JUnitTests {

/**
* Pattern used for the <em>name</em> field of the {@code @ParameterizedTest} annotation.
*/
public static final String PARAMETERIZED_TEST_NAME_PATTERN = "{displayName} [{index}]; parameters: {argumentsWithNames}";

private JUnitTests() {
// prevent instantiation
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2016, 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -70,6 +70,7 @@
import org.eclipse.hono.client.kafka.producer.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties;
import org.eclipse.hono.service.management.device.Device;
import org.eclipse.hono.test.JUnitTests;
import org.eclipse.hono.test.VertxTools;
import org.eclipse.hono.util.CommandConstants;
import org.eclipse.hono.util.Constants;
Expand Down Expand Up @@ -493,7 +494,7 @@ public final class IntegrationTestSupport {
/**
* Pattern used for the <em>name</em> field of the {@code @ParameterizedTest} annotation.
*/
public static final String PARAMETERIZED_TEST_NAME_PATTERN = "{displayName} [{index}]; parameters: {argumentsWithNames}";
public static final String PARAMETERIZED_TEST_NAME_PATTERN = JUnitTests.PARAMETERIZED_TEST_NAME_PATTERN;

/**
* The default factor to apply when determining the timeout to use for executing test cases in a CI environment.
Expand Down Expand Up @@ -656,13 +657,13 @@ public static ClientConfigProperties getMessagingNetworkProperties() {
* @return The properties.
*/
public static MessagingKafkaConsumerConfigProperties getKafkaConsumerConfig() {
LOGGER.info("Configured to connect to Kafka on {}", IntegrationTestSupport.DOWNSTREAM_BOOTSTRAP_SERVERS);
final MessagingKafkaConsumerConfigProperties consumerConfig = new MessagingKafkaConsumerConfigProperties();
consumerConfig.setConsumerConfig(Map.of(
LOGGER.info("Kafka Consumers are configured to connect to broker(s) at {}", IntegrationTestSupport.DOWNSTREAM_BOOTSTRAP_SERVERS);
final var configProps = new MessagingKafkaConsumerConfigProperties();
configProps.setConsumerConfig(Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, IntegrationTestSupport.DOWNSTREAM_BOOTSTRAP_SERVERS,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
ConsumerConfig.GROUP_ID_CONFIG, "its-" + UUID.randomUUID()));
return consumerConfig;
return configProps;
}

/**
Expand All @@ -671,11 +672,11 @@ public static MessagingKafkaConsumerConfigProperties getKafkaConsumerConfig() {
* @return The properties.
*/
public static MessagingKafkaProducerConfigProperties getKafkaProducerConfig() {
LOGGER.info("Configured to connect to Kafka on {}", IntegrationTestSupport.DOWNSTREAM_BOOTSTRAP_SERVERS);
final MessagingKafkaProducerConfigProperties consumerConfig = new MessagingKafkaProducerConfigProperties();
consumerConfig.setProducerConfig(Map.of(
LOGGER.info("Kafka Producers are configured to connect to broker(s) at {}", IntegrationTestSupport.DOWNSTREAM_BOOTSTRAP_SERVERS);
final var configProps = new MessagingKafkaProducerConfigProperties();
configProps.setProducerConfig(Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IntegrationTestSupport.DOWNSTREAM_BOOTSTRAP_SERVERS));
return consumerConfig;
return configProps;
}

/**
Expand All @@ -684,7 +685,7 @@ public static MessagingKafkaProducerConfigProperties getKafkaProducerConfig() {
* @return The properties.
*/
public static KafkaAdminClientConfigProperties getKafkaAdminClientConfig() {
LOGGER.info("Configured to connect to Kafka on {}", IntegrationTestSupport.DOWNSTREAM_BOOTSTRAP_SERVERS);
LOGGER.info("Kafka Admin Clients are configured to connect to broker(s) at {}", IntegrationTestSupport.DOWNSTREAM_BOOTSTRAP_SERVERS);
final KafkaAdminClientConfigProperties adminClientConfig = new KafkaAdminClientConfigProperties();
adminClientConfig.setAdminClientConfig(Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IntegrationTestSupport.DOWNSTREAM_BOOTSTRAP_SERVERS,
Expand Down
Loading

0 comments on commit e826907

Please sign in to comment.