I have a bunch of KafkaListeners:
@KafkaListener(
id = XXX_LISTENER_ID,
containerFactory = "kafkaListenerContainerFactory",
topics = {"${XXX}"},
groupId = "${XXX}",
autoStartup = "false"
)
public void consumeXXX(PropertyUpdateEvent message, Acknowledgment ack) {
xxxProcessor.process(message);
ack.acknowledge();
}
For tests I use EmbededKafka and I was thinking that it will be enough to override ConsumerFactory and ConcurrentKafkaListenerContainerFactory in @TestConfiguration
@Bean
@Primary
@Qualifier("testConsumerFactory")
public ConsumerFactory<String, PropertyUpdateEvent> testConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Use embedded Kafka
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); // Trust all packages for testing
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
@Primary
public ConcurrentKafkaListenerContainerFactory<String, PropertyUpdateEvent> kafkaListenerContainerFactory(
@Qualifier("testConsumerFactory") ConsumerFactory<String, PropertyUpdateEvent> consumerFactory
) {
ConcurrentKafkaListenerContainerFactory<String, PropertyUpdateEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(1);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
But @KafkaListener consumeXXX
was never invoked. What do I do wrong?
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744692663a4588302.html
评论列表(0条)