Skip to content

kirillparfenov/embedded_kafka

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Модель Message:

public class Message {
    private UUID id;
    private String title;
}

Консьюмер, принимающий Message в виде json:

public class Consumer {

    private final ObjectMapper objectMapper;
    private final MessageService messageService;

    @SneakyThrows
    @KafkaListener(topics = Topics.TEST_TOPIC, containerFactory = KafkaConfig.LISTENER_CONTAINER)
    public void handleMessage(String message) {
        log.info("Из топика [{}] получено сообщение: {}", Topics.TEST_TOPIC, message);
        var msg = objectMapper.readValue(message, Message.class);
        messageService.process(msg);
    }
}

MessageService для обработки Message:

public class MessageService {

    public void process(Message msg) {
        //логика обработки сообщения
    }
}

Конфигурация Kafka:

@EnableKafka
@Configuration
@RequiredArgsConstructor
public class KafkaConfig {
    public static final String LISTENER_CONTAINER = "LISTENER_CONTAINER";
    public static final String KAFKA_TEMPLATE = "KAFKA_TEMPLATE";

    private static final String PRODUCER_FACTORY = "PRODUCER_FACTORY";
    private static final String CONSUMER_FACTORY = "CONSUMER_FACTORY";

    private final KafkaProperties kafkaProperties;

    @Bean(PRODUCER_FACTORY)
    public ProducerFactory<String, String> producerFactory() {
        var props = new HashMap<String, Object>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getServers());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean(CONSUMER_FACTORY)
    public ConsumerFactory<String, String> consumerFactory() {
        var props = new HashMap<String, Object>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getServers());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getGroupId());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getReset());
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean(LISTENER_CONTAINER)
    public ConcurrentKafkaListenerContainerFactory<String, String> listenerContainerFactory(
            @Qualifier(CONSUMER_FACTORY) ConsumerFactory<String, String> consumerFactory
    ) {
        var container = new ConcurrentKafkaListenerContainerFactory<String, String>();
        container.setConsumerFactory(consumerFactory);
        return container;
    }

    @Bean(KAFKA_TEMPLATE)
    public KafkaTemplate<String, String> kafkaTemplate(
            @Qualifier(PRODUCER_FACTORY) ProducerFactory<String, String> producerFactory
    ) {
        return new KafkaTemplate<>(producerFactory);
    }
}

@EnableKafka - без этой аннотации приложение не увидит @KafkaListener


Пропсы для теста в application-test.yaml:

kafka:
  servers: ${spring.embedded.kafka.brokers}
  reset: earliest
  group-id: test_group_id

${spring.embedded.kafka.brokers} - обязательно, чтобы консьюмеры и продюсеры смотрели в одно место


ConsumerTests для теста Consumer:

@DirtiesContext
@ActiveProfiles("test")
@TestInstance(PER_CLASS)
@EmbeddedKafka(
        partitions = 1,
        topics = Topics.TEST_TOPIC
)
@SpringBootTest(classes = {
        KafkaConfig.class
})
@EnableConfigurationProperties({
        KafkaProperties.class
})
public class ConsumerTests {}

@TestInstance(PER_CLASS) - дает возможность создать метод для @MethodSource не статическим

    @MockBean
    MessageService messageService;
    
    @SpyBean
    ObjectMapper objectMapper;
    @SpyBean
    Consumer consumer;
    
    @Autowired
    @Qualifier(KAFKA_TEMPLATE)
    KafkaTemplate<String, String> kafkaTemplate;
    
    @ParameterizedTest
    @MethodSource("messageSources")
    void testConsumer(Message msg) throws JsonProcessingException {
        var json = objectMapper.writeValueAsString(msg);
    
        kafkaTemplate.send(Topics.TEST_TOPIC, json);
    
        await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> {
            verify(consumer, times(1)).handleMessage(eq(json));
            verify(objectMapper, times(1)).readValue(eq(json), eq(Message.class));
            verify(messageService, times(1)).process(eq(msg));
        });
    }
    
    private Stream<Arguments> messageSources() {
        return Stream.of(
                Arguments.of(new Message(UUID.randomUUID(), UUID.randomUUID().toString())),
                Arguments.of(new Message(UUID.randomUUID(), UUID.randomUUID().toString()))
        );
    }

Releases

No releases published

Packages

No packages published

Languages