Spring integration: 6.2.0
I have noticed that even if I set a sendFailureChannel
on a Kafka.outboundChannelAdapter
an avro SerializationException
is still rethrown and ends up in the error output whether a errorChannel
header is set or not.
Is it the intended behaviour ? I would have first expected that the framework would not propagate the exception if a sendFailureChannel
is set, or alternatively to take into account the errorChannel
.
More precisely, in the example above, the KAFKA_SEND_FAILURE_CHANNEL's serviceActivator is called, but the CUSTOM_ERROR_CHANNEL one never. (KAFKA_OUTBOUND_CHANNEL
is a direct channel)
public static final String KAFKA_OUTBOUND_CHANNEL = "kafkaOutboundChannel";
public static final String KAFKA_SEND_FAILURE_CHANNEL = "kafkaSendFailureChannel";
public static final String CUSTOM_ERROR_CHANNEL = "customErrorChanne";
@Bean
IntegrationFlow kafkaProducerFlow(KafkaTemplate<? extends SpecificRecord, ? extends SpecificRecord> kafkaTemplate) {
return IntegrationFlow.from(KAFKA_OUTBOUND_CHANNEL)
.log(log.getName(),
m -> "Sending following message to topic '" + m.getHeaders().get(KafkaHeaders.TOPIC) + "':\n"
+ "key:\n" + GeneralUtils.convertToJson(m.getHeaders().get(KafkaHeaders.KEY)) + "\n"
+ "value:\n" + GeneralUtils.convertToJson(m.getPayload()))
.enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, CUSTOM_ERROR_CHANNEL))
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
.sendFailureChannel(KAFKA_SEND_FAILURE_CHANNEL)
.get();
}
@ServiceActivator(inputChannel = KAFKA_SEND_FAILURE_CHANNEL)
void logKafkaSendFailure(KafkaSendFailureException exception) {
log.error("SendFailure:", exception);
}
@ServiceActivator(inputChannel = CUSTOM_ERROR_CHANNEL)
void logOtherError(Exception exception) {
log.error("Failed to send message to Kafka.", exception);
}
Spring integration: 6.2.0
I have noticed that even if I set a sendFailureChannel
on a Kafka.outboundChannelAdapter
an avro SerializationException
is still rethrown and ends up in the error output whether a errorChannel
header is set or not.
Is it the intended behaviour ? I would have first expected that the framework would not propagate the exception if a sendFailureChannel
is set, or alternatively to take into account the errorChannel
.
More precisely, in the example above, the KAFKA_SEND_FAILURE_CHANNEL's serviceActivator is called, but the CUSTOM_ERROR_CHANNEL one never. (KAFKA_OUTBOUND_CHANNEL
is a direct channel)
public static final String KAFKA_OUTBOUND_CHANNEL = "kafkaOutboundChannel";
public static final String KAFKA_SEND_FAILURE_CHANNEL = "kafkaSendFailureChannel";
public static final String CUSTOM_ERROR_CHANNEL = "customErrorChanne";
@Bean
IntegrationFlow kafkaProducerFlow(KafkaTemplate<? extends SpecificRecord, ? extends SpecificRecord> kafkaTemplate) {
return IntegrationFlow.from(KAFKA_OUTBOUND_CHANNEL)
.log(log.getName(),
m -> "Sending following message to topic '" + m.getHeaders().get(KafkaHeaders.TOPIC) + "':\n"
+ "key:\n" + GeneralUtils.convertToJson(m.getHeaders().get(KafkaHeaders.KEY)) + "\n"
+ "value:\n" + GeneralUtils.convertToJson(m.getPayload()))
.enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, CUSTOM_ERROR_CHANNEL))
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
.sendFailureChannel(KAFKA_SEND_FAILURE_CHANNEL)
.get();
}
@ServiceActivator(inputChannel = KAFKA_SEND_FAILURE_CHANNEL)
void logKafkaSendFailure(KafkaSendFailureException exception) {
log.error("SendFailure:", exception);
}
@ServiceActivator(inputChannel = CUSTOM_ERROR_CHANNEL)
void logOtherError(Exception exception) {
log.error("Failed to send message to Kafka.", exception);
}
Share
Improve this question
asked Nov 18, 2024 at 15:35
Maxime DutautMaxime Dutaut
1326 bronze badges
1 Answer
Reset to default 0That's correct.
The logic there is like this:
catch (RuntimeException rtex) {
sendFailure(message, producerRecord, getSendFailureChannel(), rtex);
throw rtex;
}
So, indeed such an error is sent to the sendFailureChannel
and re-thrown back to the caller.
The header(MessageHeaders.ERROR_CHANNEL, CUSTOM_ERROR_CHANNEL)
is not involved here because process is fully direct. The errorChannel
is only considered when we deal with async channels.
We may consider to change such a logic, but only for tomorrow's 6.4.0
release.
What I'm trying to say, that I am agreed with you that it is not supposed to be like that: if we provide sendFailureChannel
, then no reason to re-throw such an exception.
Feel free to raise a GH issue and we will address it shortly!
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1745611136a4635954.html
评论列表(0条)