spring - RabbitMQ queue is empty even if message was published - Stack Overflow

I am trying to create a app using RabbitMQ and i am stuck at a point and I do not know what to do. I tr

I am trying to create a app using RabbitMQ and i am stuck at a point and I do not know what to do. I tried to follow a tutorial and everything should have worked, but it doesn't. Even if I am sending a postman request, even if I am doing the publishing from the RabbitMQ portal at localhost:15672, the pop up with message published appears, I get this text in the console, but when I want to check the queue, it is saying that is empty.

It is written very clear that the message has arrived to the consumer, but still, it the queue when I am checking from the portal, it doesn't

I will show the entire code now.

CONFIG

package ro.tuc.ds2020.config;

import .springframework.amqp.core.*;
import .springframework.amqp.rabbit.connection.ConnectionFactory;
import .springframework.amqp.rabbit.core.RabbitTemplate;
import .springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import .springframework.amqp.support.converter.MessageConverter;
import .springframework.beans.factory.annotation.Value;
import .springframework.context.annotation.Bean;
import .springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Value("${rabbitmq.queue.name}")
    private String queue;

    @Value("${rabbitmq.queue.json.name}")
    private String jsonQueue;

    @Value("${rabbitmq.queue.exchange}")
    private String exchange;

    @Value("${rabbitmq.queue.routing_key_one}")
    private String routingKeyOne;

    @Value("${rabbitmq.queue.routing_key_json}")
    private String routingKeyJson;

    @Bean
    public Queue queue() {
        return new Queue(queue);
    }

    @Bean
    public Queue jsonQueue() {
        return new Queue(jsonQueue, true);
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(exchange, true, false);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with(routingKeyOne);
    }

    @Bean
    public Binding jsonBinding() {
        return BindingBuilder.bind(jsonQueue()).to(exchange()).with(routingKeyJson);
    }

    @Bean
    public MessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(converter());
        return rabbitTemplate;
    }
}

CONSUMER

package ro.tuc.ds2020.consumer;

import .slf4j.Logger;
import .slf4j.LoggerFactory;
import .springframework.amqp.rabbit.annotation.RabbitListener;
import .springframework.stereotype.Service;
import ro.tuc.ds2020.dtos.MeasurementDTO;

@Service
public class RabbitMQJsonConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonConsumer.class);

    @RabbitListener(queues = {"${rabbitmq.queue.json.name}"})
    public void consumeJsonMessage(MeasurementDTO measurementDTO) {
        LOGGER.info(String.format("Received JSON message here -> %s", measurementDTO.toString()));
    }

}

CONTROLLER for when I am using postman

package ro.tuc.ds2020.controllers;

import .springframework.http.ResponseEntity;
import .springframework.web.bind.annotation.*;
import ro.tuc.ds2020.dtos.MeasurementDTO;
import ro.tuc.ds2020.publisher.RabbitMQJsonProducer;

@RequestMapping("/api/v1")
@RestController
@CrossOrigin(origins = "http://localhost:4200", allowCredentials = "true")
public class MessageJsonController {

    private RabbitMQJsonProducer jsonProducer;

    public MessageJsonController(RabbitMQJsonProducer rabbitMQJsonProducer) {
        this.jsonProducer = rabbitMQJsonProducer;
    }

    @PostMapping("/publish")
    public ResponseEntity<String> sendJsonMessage(@RequestBody MeasurementDTO measurementDTO) {
        jsonProducer.sendJsonMessage(measurementDTO);
        return  ResponseEntity.ok("Json message sent to RabbitMQ ...");
    }
}

PUBLISHER

package ro.tuc.ds2020.publisher;

import .slf4j.Logger;
import .slf4j.LoggerFactory;
import .springframework.amqp.rabbit.core.RabbitTemplate;
import .springframework.beans.factory.annotation.Autowired;
import .springframework.beans.factory.annotation.Value;
import .springframework.stereotype.Service;
import ro.tuc.ds2020.dtos.MeasurementDTO;

@Service
public class RabbitMQJsonProducer {

    @Value("${rabbitmq.queue.exchange}")
    private String exchange;

    @Value("${rabbitmq.queue.routing_key_json}")
    private String routingKeyJson;

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonProducer.class);

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public RabbitMQJsonProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendJsonMessage(MeasurementDTO measurementDTO) {
        LOGGER.info(String.format("Json message sent -> %s", measurementDTO.toString()));
        rabbitTemplate.convertAndSend(exchange, routingKeyJson, measurementDTO);
    }

}

And here is the application.properties

spring.rabbitmq.host = localhost
spring.rabbitmq.port = 5672
spring.rabbitmq.username = guest
spring.rabbitmq.password = guest

rabbitmq.queue.name = queue_1
rabbitmq.queue.json.name = queue_json
rabbitmq.queue.exchange = exchange
rabbitmq.queue.routing_key_one = routing_key_1
rabbitmq.queue.routing_key_json = routing_key_json

I am trying to create a app using RabbitMQ and i am stuck at a point and I do not know what to do. I tried to follow a tutorial and everything should have worked, but it doesn't. Even if I am sending a postman request, even if I am doing the publishing from the RabbitMQ portal at localhost:15672, the pop up with message published appears, I get this text in the console, but when I want to check the queue, it is saying that is empty.

It is written very clear that the message has arrived to the consumer, but still, it the queue when I am checking from the portal, it doesn't

I will show the entire code now.

CONFIG

package ro.tuc.ds2020.config;

import .springframework.amqp.core.*;
import .springframework.amqp.rabbit.connection.ConnectionFactory;
import .springframework.amqp.rabbit.core.RabbitTemplate;
import .springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import .springframework.amqp.support.converter.MessageConverter;
import .springframework.beans.factory.annotation.Value;
import .springframework.context.annotation.Bean;
import .springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Value("${rabbitmq.queue.name}")
    private String queue;

    @Value("${rabbitmq.queue.json.name}")
    private String jsonQueue;

    @Value("${rabbitmq.queue.exchange}")
    private String exchange;

    @Value("${rabbitmq.queue.routing_key_one}")
    private String routingKeyOne;

    @Value("${rabbitmq.queue.routing_key_json}")
    private String routingKeyJson;

    @Bean
    public Queue queue() {
        return new Queue(queue);
    }

    @Bean
    public Queue jsonQueue() {
        return new Queue(jsonQueue, true);
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(exchange, true, false);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with(routingKeyOne);
    }

    @Bean
    public Binding jsonBinding() {
        return BindingBuilder.bind(jsonQueue()).to(exchange()).with(routingKeyJson);
    }

    @Bean
    public MessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(converter());
        return rabbitTemplate;
    }
}

CONSUMER

package ro.tuc.ds2020.consumer;

import .slf4j.Logger;
import .slf4j.LoggerFactory;
import .springframework.amqp.rabbit.annotation.RabbitListener;
import .springframework.stereotype.Service;
import ro.tuc.ds2020.dtos.MeasurementDTO;

@Service
public class RabbitMQJsonConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonConsumer.class);

    @RabbitListener(queues = {"${rabbitmq.queue.json.name}"})
    public void consumeJsonMessage(MeasurementDTO measurementDTO) {
        LOGGER.info(String.format("Received JSON message here -> %s", measurementDTO.toString()));
    }

}

CONTROLLER for when I am using postman

package ro.tuc.ds2020.controllers;

import .springframework.http.ResponseEntity;
import .springframework.web.bind.annotation.*;
import ro.tuc.ds2020.dtos.MeasurementDTO;
import ro.tuc.ds2020.publisher.RabbitMQJsonProducer;

@RequestMapping("/api/v1")
@RestController
@CrossOrigin(origins = "http://localhost:4200", allowCredentials = "true")
public class MessageJsonController {

    private RabbitMQJsonProducer jsonProducer;

    public MessageJsonController(RabbitMQJsonProducer rabbitMQJsonProducer) {
        this.jsonProducer = rabbitMQJsonProducer;
    }

    @PostMapping("/publish")
    public ResponseEntity<String> sendJsonMessage(@RequestBody MeasurementDTO measurementDTO) {
        jsonProducer.sendJsonMessage(measurementDTO);
        return  ResponseEntity.ok("Json message sent to RabbitMQ ...");
    }
}

PUBLISHER

package ro.tuc.ds2020.publisher;

import .slf4j.Logger;
import .slf4j.LoggerFactory;
import .springframework.amqp.rabbit.core.RabbitTemplate;
import .springframework.beans.factory.annotation.Autowired;
import .springframework.beans.factory.annotation.Value;
import .springframework.stereotype.Service;
import ro.tuc.ds2020.dtos.MeasurementDTO;

@Service
public class RabbitMQJsonProducer {

    @Value("${rabbitmq.queue.exchange}")
    private String exchange;

    @Value("${rabbitmq.queue.routing_key_json}")
    private String routingKeyJson;

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonProducer.class);

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public RabbitMQJsonProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendJsonMessage(MeasurementDTO measurementDTO) {
        LOGGER.info(String.format("Json message sent -> %s", measurementDTO.toString()));
        rabbitTemplate.convertAndSend(exchange, routingKeyJson, measurementDTO);
    }

}

And here is the application.properties

spring.rabbitmq.host = localhost
spring.rabbitmq.port = 5672
spring.rabbitmq.username = guest
spring.rabbitmq.password = guest

rabbitmq.queue.name = queue_1
rabbitmq.queue.json.name = queue_json
rabbitmq.queue.exchange = exchange
rabbitmq.queue.routing_key_one = routing_key_1
rabbitmq.queue.routing_key_json = routing_key_json
Share Improve this question asked Nov 16, 2024 at 19:31 CornelCornel 797 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

Java often presents more challenges in setting up a proper working environment compared to other languages like Node.js or Python, which are generally easier to configure.

Requirement

Maven 3.9.9 and JDK 17

> mvn --version
Apache Maven 3.9.9 (8e8579a9e76f7d015ee5ec7bfcdc97d260186937)
Maven home: C:\Users\benchvue\maven\apache-maven-3.9.9
Java version: 17.0.12, vendor: Amazon Inc., runtime: C:\Program Files\Amazon Corretto\jdk17.0.12_7
Default locale: en_US, platform encoding: Cp1252
OS name: "windows 11", version: "10.0", arch: "amd64", family: "windows"

File Tree

C:.
│   docker-compose.yml
│   pom.xml
│
├───.idea
│       .gitignore
│       compiler.xml
│       encodings.xml
│       jarRepositories.xml
│       misc.xml
│
└───src
    └───main
        ├───java
        │   └───ro
        │       └───tuc
        │           └───ds2020
        │               │   Ds2020Application.java
        │               │
        │               ├───config
        │               │       RabbitMQConfig.java
        │               │
        │               ├───consumer
        │               │       RabbitMQJsonConsumer.java
        │               │
        │               ├───controllers
        │               │       MessageJsonController.java
        │               │
        │               ├───dtos
        │               │       MeasurementDTO.java
        │               │
        │               └───publisher
        │                       RabbitMQJsonProducer.java
        │
        └───resources
            │   application.properties
            │
            └───static

RabbitMQConfig.java

package ro.tuc.ds2020.config;

import .springframework.amqp.core.*;
import .springframework.amqp.rabbit.connection.ConnectionFactory;
import .springframework.amqp.rabbit.core.RabbitTemplate;
import .springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import .springframework.amqp.support.converter.MessageConverter;
import .springframework.beans.factory.annotation.Value;
import .springframework.context.annotation.Bean;
import .springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Value("${rabbitmq.queue.name}")
    private String queue;

    @Value("${rabbitmq.queue.json.name}")
    private String jsonQueue;

    @Value("${rabbitmq.queue.exchange}")
    private String exchange;

    @Value("${rabbitmq.queue.routing_key_one}")
    private String routingKeyOne;

    @Value("${rabbitmq.queue.routing_key_json}")
    private String routingKeyJson;

    @Bean
    public Queue queue() {
        return new Queue(queue);
    }

    @Bean
    public Queue jsonQueue() {
        return new Queue(jsonQueue, true);
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(exchange, true, false);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with(routingKeyOne);
    }

    @Bean
    public Binding jsonBinding() {
        return BindingBuilder.bind(jsonQueue()).to(exchange()).with(routingKeyJson);
    }

    @Bean
    public MessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(converter());
        return rabbitTemplate;
    }
}

RabbitMQJsonConsumer.java

package ro.tuc.ds2020.consumer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import .slf4j.Logger;
import .slf4j.LoggerFactory;
import .springframework.amqp.rabbit.annotation.RabbitListener;
import .springframework.stereotype.Service;
import ro.tuc.ds2020.dtos.MeasurementDTO;

@Service
public class RabbitMQJsonConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonConsumer.class);
    private final ObjectMapper objectMapper = new ObjectMapper();

    @RabbitListener(queues = {"${rabbitmq.queue.json.name}"})
    public void consumeJsonMessage(MeasurementDTO measurementDTO) {
        try {
            String jsonMessage = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(measurementDTO);
            LOGGER.info("Received JSON message here -> \n{}", jsonMessage);
        } catch (JsonProcessingException e) {
            LOGGER.error("Failed to convert message to JSON", e);
        }
    }
}

MessageJsonController/java

package ro.tuc.ds2020.controllers;

import .springframework.http.ResponseEntity;
import .springframework.web.bind.annotation.*;
import ro.tuc.ds2020.dtos.MeasurementDTO;
import ro.tuc.ds2020.publisher.RabbitMQJsonProducer;

import java.util.HashMap;
import java.util.Map;

@RequestMapping("/api/v1")
@RestController
public class MessageJsonController {

    private final RabbitMQJsonProducer jsonProducer;

    public MessageJsonController(RabbitMQJsonProducer rabbitMQJsonProducer) {
        this.jsonProducer = rabbitMQJsonProducer;
    }

    @PostMapping("/publish")
    public ResponseEntity<Map<String, String>> sendJsonMessage(@RequestBody MeasurementDTO measurementDTO) {
        jsonProducer.sendJsonMessage(measurementDTO);

        // Create a JSON response body
        Map<String, String> response = new HashMap<>();
        response.put("message", "Json message sent to RabbitMQ");
        response.put("status", "success");

        return ResponseEntity.ok(response);
    }
}

MeasurementDTO.java

package ro.tuc.ds2020.dtos;

import com.fasterxml.jackson.annotation.JsonProperty;

public class MeasurementDTO {

    @JsonProperty("sensorId")
    private String sensorId;

    @JsonProperty("value")
    private double value;

    @JsonProperty("unit")
    private String unit;

    @JsonProperty("timestamp")
    private String timestamp;

    // Getters and Setters
    public String getSensorId() {
        return sensorId;
    }

    public void setSensorId(String sensorId) {
        this.sensorId = sensorId;
    }

    public double getValue() {
        return value;
    }

    public void setValue(double value) {
        this.value = value;
    }

    public String getUnit() {
        return unit;
    }

    public void setUnit(String unit) {
        this.unit = unit;
    }

    public String getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(String timestamp) {
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "MeasurementDTO{" +
                "sensorId='" + sensorId + '\'' +
                ", value=" + value +
                ", unit='" + unit + '\'' +
                ", timestamp='" + timestamp + '\'' +
                '}';
    }
}

RabbitMQJsonProducer.java

package ro.tuc.ds2020.publisher;

import .slf4j.Logger;
import .slf4j.LoggerFactory;
import .springframework.amqp.rabbit.core.RabbitTemplate;
import .springframework.beans.factory.annotation.Autowired;
import .springframework.beans.factory.annotation.Value;
import .springframework.stereotype.Service;
import ro.tuc.ds2020.dtos.MeasurementDTO;

@Service
public class RabbitMQJsonProducer {

    @Value("${rabbitmq.queue.exchange}")
    private String exchange;

    @Value("${rabbitmq.queue.routing_key_json}")
    private String routingKeyJson;

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonProducer.class);

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public RabbitMQJsonProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendJsonMessage(MeasurementDTO measurementDTO) {
        LOGGER.info(String.format("Json message sent -> %s", measurementDTO.toString()));
        rabbitTemplate.convertAndSend(exchange, routingKeyJson, measurementDTO);
    }
}

Ds2020Application.java

package ro.tuc.ds2020;

import .springframework.boot.SpringApplication;
import .springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Ds2020Application {
    public static void main(String[] args) {
        SpringApplication.run(Ds2020Application.class, args);
    }
}

application.properties

spring.rabbitmq.host = localhost
spring.rabbitmq.port = 5672
spring.rabbitmq.username = guest
spring.rabbitmq.password = guest

rabbitmq.queue.name = queue_1
rabbitmq.queue.json.name = queue_json
rabbitmq.queue.exchange = exchange
rabbitmq.queue.routing_key_one = routing_key_1
rabbitmq.queue.routing_key_json = routing_key_json

pom.xml

<project xmlns="http://maven.apache./POM/4.0.0" xmlns:xsi="http://www.w3./2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache./POM/4.0.0 http://maven.apache./xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>ro.tuc</groupId>
    <artifactId>ds2020</artifactId>
    <version>1.0.0</version>
    <packaging>jar</packaging>

    <parent>
        <groupId>.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <!-- Spring Boot Starter Web -->
        <dependency>
            <groupId>.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Spring Boot Starter AMQP -->
        <dependency>
            <groupId>.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <!-- Jackson for JSON serialization -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

        <!-- Spring Boot Starter Test -->
        <dependency>
            <groupId>.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- Spring Boot Maven Plugin -->
            <plugin>
                <groupId>.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*</include>
                </includes>
            </resource>
        </resources>        
    </build>
</project>

docker-compose.yml

version: '3.8'

services:
  rabbitmq:
    image: rabbitmq:3-management
    container_name: rabbitmq
    ports:
      - "5672:5672" # RabbitMQ messaging port
      - "15672:15672" # RabbitMQ management UI
    environment:
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest

Launching RabbitMQ

docker compose up

Access RabbitMQ UI

username: guest
password: guest
http://localhost:15672/#/

Compile jar

mvn clean install

dir target

launching Java project

java -jar target/ds2020-1.0.0.jar

Call REST API by Postman

POST http://localhost:8080/api/v1/publish

Input Body

{
    "sensorId": "12345",
    "value": 67.5,
    "unit": "Celsius",
    "timestamp": "2024-11-16T18:30:00Z"
}

Java Side

Consumer will display in Spring Log

2024-11-16 19:11:43.964  INFO 22464 --- [ntContainer#0-1] r.t.d.consumer.RabbitMQJsonConsumer      : Received JSON message here ->
{
  "sensorId" : "12345",
  "value" : 67.5,
  "unit" : "Celsius",
  "timestamp" : "2024-11-16T18:30:00Z"
}

You can see the Spike in Rabbit UI

If you want to see the queue message by RabbitMQ UI

you need to comment out RabbitMQJsonConsumer.java

From

@RabbitListener(queues = {"${rabbitmq.queue.json.name}"})

To

//@RabbitListener(queues = {"${rabbitmq.queue.json.name}"})

Then build jar and run it again

  • The @RabbitListener annotation makes the consumer automatically consume messages from the queue as soon as they arrive.
  • When the consumer processes a message, it removes it from the queue, leaving the queue empty.
  • By commenting out @RabbitListener, the consumer is disabled, and messages remain in the queue for inspection.
  • This behavior ensures that messages are not lost but immediately processed unless explicitly paused.
  • For debugging, disabling the consumer allows you to verify message flow and queue content in RabbitMQ.

Good Luck!

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1745645857a4637966.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信