I have Kafka Producer and Consumer configured in Spring Boot application (Spring Boot version 3.4.2).
application.properties:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=group
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.max-poll-records=100
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
In the application logic a situation may occur where the producer tries to send an event to a topic that does not exist. And I want to handle this situation in my code after the first unsuccessful attempt. I did not find a way to do this correctly with exceptions. Producer just outputs a bunch of logs:
2025-03-25T15:31:52.363+05:00 WARN 49600 --- [app] [tter-producer-1] .apache.kafka.clients.NetworkClient : [Producer clientId=app-producer-1] Error while fetching metadata with correlation id 5 : {new_topic=UNKNOWN_TOPIC_OR_PARTITION}
2025-03-25T15:31:52.601+05:00 WARN 49600 --- [app] [tter-producer-1] .apache.kafka.clients.NetworkClient : [Producer clientId=app-producer-1] Error while fetching metadata with correlation id 6 : {new_topic=UNKNOWN_TOPIC_OR_PARTITION}
2025-03-25T15:31:52.989+05:00 WARN 49600 --- [app] [tter-producer-1] .apache.kafka.clients.NetworkClient : [Producer clientId=app-producer-1] Error while fetching metadata with correlation id 7 : {new_topic=UNKNOWN_TOPIC_OR_PARTITION}
2025-03-25T15:31:53.973+05:00 WARN 49600 --- [app] [tter-producer-1] .apache.kafka.clients.NetworkClient : [Producer clientId=app-producer-1] Error while fetching metadata with correlation id 8 : {new_topic=UNKNOWN_TOPIC_OR_PARTITION}
And continues trying until a minute passes:
Exception thrown when sending a message with key='null' and payload='...' to topic new_topic and partition 0:
.apache.kafkamon.errors.TimeoutException: Topic new_topic not present in metadata after 60 ms.
I tried to set up exception handling:
try {
kafkaTemplate.send(new ProducerRecord<>(topic, partition, System.currentTimeMillis(), null, event, headers));
} catch (KafkaException ex) {
// handle
}
And add the setting spring.kafka.producer.properties.max.block.ms=10
. The log was also output, the exception was caught as expected. But after that, attempts to fetch metadata continued, I want to get rid of this. How could I do this?
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744201633a4562906.html
评论列表(0条)