I am implementing WebSockets using Spring Boot with STOMP for real-time notifications in my application. My setup includes a WebSocket configuration, security configuration, and a WebSocket handler decorator.
Issue:
I can successfully connect to the WebSocket endpoint ws://localhost:8080/notification/websocket
using WebSocketKing and receive a successful upgrade handshake.
I can subscribe to /queue/unseen-bookings-1
and /queue/unseen-bookings-2
using different clients, and the subscriptions are acknowledged in the logs and /queue/unseen-bookings-2
same success logs.
2025-03-24 01:46:57 2025-03-24T01:46:57.940Z TRACE 1 --- [ XNIO-1 I/O-2] s.w.s.h.LoggingWebSocketHandlerDecorator : Handling TextMessage payload=[SUBSCRIBE_..], byteCount=54, last=true] in StandardWebSocketSession[id=0fe8173f-9c1b-6ce1-658e-38a33e556d1c, uri=/notification/websocket]
2025-03-24 01:46:57 2025-03-24T01:46:57.945Z TRACE 1 --- [ XNIO-1 I/O-2] o.s.messaging.simp.stomp.StompDecoder : Decoded SUBSCRIBE {id=["1"], destination=[/queue/unseen-bookings-1]} session=null
2025-03-24 01:46:57 2025-03-24T01:46:57.946Z TRACE 1 --- [ XNIO-1 I/O-2] o.s.w.s.m.StompSubProtocolHandler : From client: SUBSCRIBE /queue/unseen-bookings-1 id="1" session=0fe8173f-9c1b-6ce1-658e-38a33e556d1c
2025-03-24 01:46:57 2025-03-24T01:46:57.949Z DEBUG 1 --- [nboundChannel-2] o.s.m.s.b.SimpleBrokerMessageHandler : Processing SUBSCRIBE /queue/unseen-bookings-1 id="1" session=0fe8173f-9c1b-6ce1-658e-38a33e556d1c
2025-03-24 01:47:03 2025-03-24T01:47:03.493Z INFO 1 --- [MessageBroker-1] o.s.w.s.c.WebSocketMessageBrokerStats : WebSocketSession[1 current WS(1)-HttpStream(0)-HttpPoll(0), 1 total, 0 closed abnormally (0 connect failure, 0 send limit, 0 transport error)], stompSubProtocol[processed CONNECT(0)-CONNECTED(0)-DISCONNECT(0)], stompBrokerRelay[null], inboundChannel[pool size = 3, active threads = 0, queued tasks = 0, completed tasks = 3], outboundChannel[pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0], sockJsScheduler[pool size = 3, active threads = 1, queued tasks = 2, completed tasks = 0]
When I call my PATCH endpoint, which updates a booking, I use messagingTemplate.convertAndSend()
to send a notification to the respective queue.
The logs confirm that the message is being broadcasted:
Processing MESSAGE destination=/queue/unseen-bookings-1 session=null payload={"ownerUnseen":0,"renterUnseen":0}
Broadcasting to 1 sessions.
Processing MESSAGE destination=/queue/unseen-bookings-2 session=null payload={"ownerUnseen":0,"renterUnseen":1}
Broadcasting to 1 sessions.
However, my both clients do not receive the message.
This is my configuration
WebsocketSecurityConfiguration.java
@Configuration
public class WebsocketSecurityConfiguration extends AbstractSecurityWebSocketMessageBrokerConfigurer {
@Override
protected void configureInbound(MessageSecurityMetadataSourceRegistry messages) {
messages
.nullDestMatcher()
.authenticated()
.simpDestMatchers("/topic/tracker")
.permitAll() // matches any destination that starts with /topic/
// (i.e. cannot send messages directly to /topic/)
// (i.e. cannot subscribe to /topic/messages/* to get messages sent to
// /topic/messages-user<id>)
.simpDestMatchers("/topic/**")
.permitAll()
// message types other than MESSAGE and SUBSCRIBE
.simpTypeMatchers(SimpMessageType.SUBSCRIBE,SimpMessageType.MESSAGE)
.permitAll()
// catch all
.anyMessage()
.permitAll();
}
/**
* Disables CSRF for Websockets.
*/
@Override
protected boolean sameOriginDisabled() {
return true;
}
}
WebsocketConfiguration.java
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfiguration implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic", "/queue"); // Private and public topics
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/notification").setAllowedOriginPatterns("*") .withSockJS(); // Enables SockJS fallback
;
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration.addDecoratorFactory(new WebSocketHandlerDecoratorFactory() {
@Override
public WebSocketHandler decorate(WebSocketHandler webSocketHandler) {
return new EmaWebSocketHandlerDecorator(webSocketHandler);
}
});
}
}
EmaWebSocketHandlerDecorator.java
public class EmaWebSocketHandlerDecorator extends WebSocketHandlerDecorator {
private static final Logger logger = LoggerFactory.getLogger(EmaWebSocketHandlerDecorator.class);
public EmaWebSocketHandlerDecorator(WebSocketHandler webSocketHandler) {
super(webSocketHandler);
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
super.handleMessage(session, updateBodyIfNeeded(message));
}
/**
* Updates the content of the specified message. The message is updated only if it is
* a {@link TextMessage text message} and if does not contain the <tt>null</tt> character at the end. If
* carriage returns are missing (when the command does not need a body) there are also added.
*/
private WebSocketMessage<?> updateBodyIfNeeded(WebSocketMessage<?> message) {
if (!(message instanceof TextMessage) || ((TextMessage) message).getPayload().endsWith("\u0000")) {
return message;
}
String payload = ((TextMessage) message).getPayload();
final Optional<StompCommand> stompCommand = getStompCommand(payload);
if (!stompCommand.isPresent()) {
return message;
}
if (!stompCommand.get().isBodyAllowed() && !payload.endsWith("\n\n")) {
if (payload.endsWith("\n")) {
payload += "\n";
} else {
payload += "\n\n";
}
}
payload += "\u0000";
return new TextMessage(payload);
}
/**
* Returns the {@link StompCommand STOMP command} associated to the specified payload.
*/
private Optional<StompCommand> getStompCommand(String payload) {
final int firstCarriageReturn = payload.indexOf('\n');
if (firstCarriageReturn < 0) {
return Optional.empty();
}
try {
return Optional.of(
StompCommand.valueOf(payload.substring(0, firstCarriageReturn))
);
} catch (IllegalArgumentException e) {
logger.trace("Error while parsing STOMP command.", e);
return Optional.empty();
}
}
}
EmaWebSocketHandlerDecorator (simplified)
@PatchMapping(value = "/bookings/{id}", consumes = { "application/json", "application/merge-patch+json" })
public ResponseEntity<BookingWithOwnerDTO> partialUpdateBooking(
@PathVariable(value = "id", required = false) final Long id,
@NotNull @RequestBody Booking booking
) throws URISyntaxException, ExecutionException, InterruptedException {
int unseenOwnerBookings = bookingService.countUnseenBookingsForOwner(updatedBooking.getProduct().getOwner().getId());
int unseenRenterBookings = bookingService.countUnseenBookingsForRenter(updatedBooking.getProduct().getOwner().getId());
int unseenOwnerBookings1 = bookingService.countUnseenBookingsForOwner(updatedBooking.getUser().getId());
int unseenRenterBookings1 = bookingService.countUnseenBookingsForRenter(updatedBooking.getUser().getId());
BookingNotification notification = new BookingNotification(unseenOwnerBookings, unseenRenterBookings);
BookingNotification notification1 = new BookingNotification(unseenOwnerBookings1, unseenRenterBookings1);
messagingTemplate.convertAndSend(
"/queue/unseen-bookings-" + updatedBooking.getProduct().getOwner().getId().toString(),
notification
);
// Send message to the user who made the booking
messagingTemplate.convertAndSend(
"/queue/unseen-bookings-" + updatedBooking.getUser().getId().toString(),
notification1
);
return bookingWithOwnerDTO;
});
return ResponseUtil.wrapOrNotFound(
result,
HeaderUtil.createEntityUpdateAlert(applicationName, true, ENTITY_NAME, booking.getId().toString())
);
}
Why are my subscribed clients not receiving the message even though the logs show that it was broadcasted?
Could the issue be related to session=null in the logs? but if am using using SimpleBroker sessions don't matter, right? Am I missing any configuration that would prevent the message from reaching the subscribed clients?
Would appreciate any guidance on what could be causing this issue!
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744226381a4564025.html
评论列表(0条)