spring - Message Sent to Queue but Not Received by Subscribed Clients - Stack Overflow

I am implementing WebSockets using Spring Boot with STOMP for real-time notifications in my application

I am implementing WebSockets using Spring Boot with STOMP for real-time notifications in my application. My setup includes a WebSocket configuration, security configuration, and a WebSocket handler decorator.

Issue:

I can successfully connect to the WebSocket endpoint ws://localhost:8080/notification/websocket using WebSocketKing and receive a successful upgrade handshake.

I can subscribe to /queue/unseen-bookings-1and /queue/unseen-bookings-2 using different clients, and the subscriptions are acknowledged in the logs and /queue/unseen-bookings-2 same success logs.

2025-03-24 01:46:57 2025-03-24T01:46:57.940Z TRACE 1 --- [   XNIO-1 I/O-2] s.w.s.h.LoggingWebSocketHandlerDecorator : Handling TextMessage payload=[SUBSCRIBE_..], byteCount=54, last=true] in StandardWebSocketSession[id=0fe8173f-9c1b-6ce1-658e-38a33e556d1c, uri=/notification/websocket]  
2025-03-24 01:46:57 2025-03-24T01:46:57.945Z TRACE 1 --- [   XNIO-1 I/O-2] o.s.messaging.simp.stomp.StompDecoder    : Decoded SUBSCRIBE {id=["1"], destination=[/queue/unseen-bookings-1]} session=null 
2025-03-24 01:46:57 2025-03-24T01:46:57.946Z TRACE 1 --- [   XNIO-1 I/O-2] o.s.w.s.m.StompSubProtocolHandler        : From client: SUBSCRIBE /queue/unseen-bookings-1 id="1" session=0fe8173f-9c1b-6ce1-658e-38a33e556d1c 
2025-03-24 01:46:57 2025-03-24T01:46:57.949Z DEBUG 1 --- [nboundChannel-2] o.s.m.s.b.SimpleBrokerMessageHandler     : Processing SUBSCRIBE /queue/unseen-bookings-1 id="1" session=0fe8173f-9c1b-6ce1-658e-38a33e556d1c 
2025-03-24 01:47:03 2025-03-24T01:47:03.493Z  INFO 1 --- [MessageBroker-1] o.s.w.s.c.WebSocketMessageBrokerStats    : WebSocketSession[1 current WS(1)-HttpStream(0)-HttpPoll(0), 1 total, 0 closed abnormally (0 connect failure, 0 send limit, 0 transport error)], stompSubProtocol[processed CONNECT(0)-CONNECTED(0)-DISCONNECT(0)], stompBrokerRelay[null], inboundChannel[pool size = 3, active threads = 0, queued tasks = 0, completed tasks = 3], outboundChannel[pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0], sockJsScheduler[pool size = 3, active threads = 1, queued tasks = 2, completed tasks = 0]  

When I call my PATCH endpoint, which updates a booking, I use messagingTemplate.convertAndSend() to send a notification to the respective queue.

The logs confirm that the message is being broadcasted:

Processing MESSAGE destination=/queue/unseen-bookings-1 session=null payload={"ownerUnseen":0,"renterUnseen":0}  
Broadcasting to 1 sessions.  
Processing MESSAGE destination=/queue/unseen-bookings-2 session=null payload={"ownerUnseen":0,"renterUnseen":1}  
Broadcasting to 1 sessions.  

However, my both clients do not receive the message.

This is my configuration

WebsocketSecurityConfiguration.java

@Configuration
public class WebsocketSecurityConfiguration extends AbstractSecurityWebSocketMessageBrokerConfigurer {

    @Override
    protected void configureInbound(MessageSecurityMetadataSourceRegistry messages) {
        messages
            .nullDestMatcher()
            .authenticated()
            .simpDestMatchers("/topic/tracker")
            .permitAll()            // matches any destination that starts with /topic/
            // (i.e. cannot send messages directly to /topic/)
            // (i.e. cannot subscribe to /topic/messages/* to get messages sent to
            // /topic/messages-user<id>)
            .simpDestMatchers("/topic/**")
            .permitAll()
            // message types other than MESSAGE and SUBSCRIBE
            .simpTypeMatchers(SimpMessageType.SUBSCRIBE,SimpMessageType.MESSAGE)
            .permitAll()
            // catch all
            .anyMessage()
            .permitAll();
    }

    /**
     * Disables CSRF for Websockets.
     */
    @Override
    protected boolean sameOriginDisabled() {
        return true;
    }
}

WebsocketConfiguration.java

@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfiguration implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic", "/queue"); // Private and public topics
        config.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/notification").setAllowedOriginPatterns("*") .withSockJS(); // Enables SockJS fallback
        ;
    }
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.addDecoratorFactory(new WebSocketHandlerDecoratorFactory() {
            @Override
            public WebSocketHandler decorate(WebSocketHandler webSocketHandler) {
                return new EmaWebSocketHandlerDecorator(webSocketHandler);
            }
        });
    }

}

EmaWebSocketHandlerDecorator.java

public class EmaWebSocketHandlerDecorator extends WebSocketHandlerDecorator {

    private static final Logger logger = LoggerFactory.getLogger(EmaWebSocketHandlerDecorator.class);

    public EmaWebSocketHandlerDecorator(WebSocketHandler webSocketHandler) {
        super(webSocketHandler);
    }

    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
        super.handleMessage(session, updateBodyIfNeeded(message));
    }

    /**
     * Updates the content of the specified message. The message is updated only if it is
     * a {@link TextMessage text message} and if does not contain the <tt>null</tt> character at the end. If
     * carriage returns are missing (when the command does not need a body) there are also added.
     */
    private WebSocketMessage<?> updateBodyIfNeeded(WebSocketMessage<?> message) {
        if (!(message instanceof TextMessage) || ((TextMessage) message).getPayload().endsWith("\u0000")) {
            return message;
        }

        String payload = ((TextMessage) message).getPayload();

        final Optional<StompCommand> stompCommand = getStompCommand(payload);

        if (!stompCommand.isPresent()) {
            return message;
        }

        if (!stompCommand.get().isBodyAllowed() && !payload.endsWith("\n\n")) {
            if (payload.endsWith("\n")) {
                payload += "\n";
            } else {
                payload += "\n\n";
            }
        }

        payload += "\u0000";

        return new TextMessage(payload);
    }

    /**
     * Returns the {@link StompCommand STOMP command} associated to the specified payload.
     */
    private Optional<StompCommand> getStompCommand(String payload) {
        final int firstCarriageReturn = payload.indexOf('\n');

        if (firstCarriageReturn < 0) {
            return Optional.empty();
        }

        try {
            return Optional.of(
                StompCommand.valueOf(payload.substring(0, firstCarriageReturn))
            );
        } catch (IllegalArgumentException e) {
            logger.trace("Error while parsing STOMP command.", e);

            return Optional.empty();
        }
    }

}

EmaWebSocketHandlerDecorator (simplified)


    @PatchMapping(value = "/bookings/{id}", consumes = { "application/json", "application/merge-patch+json" })
    public ResponseEntity<BookingWithOwnerDTO> partialUpdateBooking(
        @PathVariable(value = "id", required = false) final Long id,
        @NotNull @RequestBody Booking booking
    ) throws URISyntaxException, ExecutionException, InterruptedException {
       
                int unseenOwnerBookings = bookingService.countUnseenBookingsForOwner(updatedBooking.getProduct().getOwner().getId());
                int unseenRenterBookings = bookingService.countUnseenBookingsForRenter(updatedBooking.getProduct().getOwner().getId());
                int unseenOwnerBookings1 = bookingService.countUnseenBookingsForOwner(updatedBooking.getUser().getId());
                int unseenRenterBookings1 = bookingService.countUnseenBookingsForRenter(updatedBooking.getUser().getId());

                BookingNotification notification = new BookingNotification(unseenOwnerBookings, unseenRenterBookings);
                BookingNotification notification1 = new BookingNotification(unseenOwnerBookings1, unseenRenterBookings1);



                messagingTemplate.convertAndSend(
                    "/queue/unseen-bookings-" + updatedBooking.getProduct().getOwner().getId().toString(),
                    notification
                );

// Send message to the user who made the booking
                messagingTemplate.convertAndSend(
                    "/queue/unseen-bookings-" + updatedBooking.getUser().getId().toString(),
                    notification1
                );


                return bookingWithOwnerDTO;
            });
        return ResponseUtil.wrapOrNotFound(
            result,
            HeaderUtil.createEntityUpdateAlert(applicationName, true, ENTITY_NAME, booking.getId().toString())
        );
    }

Why are my subscribed clients not receiving the message even though the logs show that it was broadcasted?

Could the issue be related to session=null in the logs? but if am using using SimpleBroker sessions don't matter, right? Am I missing any configuration that would prevent the message from reaching the subscribed clients?

Would appreciate any guidance on what could be causing this issue!

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744226381a4564025.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信