spring boot - TestConfiguration for @KafkaListener - Stack Overflow

I have a bunch of KafkaListeners:@KafkaListener(id = XXX_LISTENER_ID,containerFactory = "kafkaLis

I have a bunch of KafkaListeners:

@KafkaListener(
    id = XXX_LISTENER_ID,
    containerFactory = "kafkaListenerContainerFactory",
    topics = {"${XXX}"},
    groupId = "${XXX}",
    autoStartup = "false"
)
public void consumeXXX(PropertyUpdateEvent message, Acknowledgment ack) {
    xxxProcessor.process(message);
    ack.acknowledge();
}

For tests I use EmbededKafka and I was thinking that it will be enough to override ConsumerFactory and ConcurrentKafkaListenerContainerFactory in @TestConfiguration

@Bean
@Primary
@Qualifier("testConsumerFactory")
public ConsumerFactory<String, PropertyUpdateEvent> testConsumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Use embedded Kafka
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER);
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");  // Trust all packages for testing

    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
@Primary
public ConcurrentKafkaListenerContainerFactory<String, PropertyUpdateEvent> kafkaListenerContainerFactory(
    @Qualifier("testConsumerFactory") ConsumerFactory<String, PropertyUpdateEvent> consumerFactory
) {
    ConcurrentKafkaListenerContainerFactory<String, PropertyUpdateEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setConcurrency(1);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
    return factory;
}

But @KafkaListener consumeXXX was never invoked. What do I do wrong?

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744692663a4588302.html

相关推荐

  • spring boot - TestConfiguration for @KafkaListener - Stack Overflow

    I have a bunch of KafkaListeners:@KafkaListener(id = XXX_LISTENER_ID,containerFactory = "kafkaLis

    1天前
    40

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信