java - ActiveMQ multicast topic does round-robin instead - Stack Overflow

I'm trying to implement a pub-sub with embedded ActiveMQ Artemis 2.40.0.I create the address and

I'm trying to implement a pub-sub with embedded ActiveMQ Artemis 2.40.0. I create the address and write messages to it, but I receive the messages on different clients in a round-robin fashion.

My broker.xml:

<configuration xmlns:xsi="; xmlns="urn:activemq" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
    <core xmlns="urn:activemq:core">
        <persistence-enabled>false</persistence-enabled>
        <security-enabled>false</security-enabled>
        <acceptors>
            <acceptor name="in-vm">vm://0</acceptor>
        </acceptors>

        <addresses>
            <address name="address1">
                <multicast>
                    <queue name="q1" max-consumers="99">
                        <durable>false</durable>
                    </queue>
                </multicast>
            </address>
        </addresses>
    </core>
</configuration>

My code:

import com.shorcan.log.LogUtil;
import com.shorcan.util.SleepUtil;
import .apache.activemq.artemis.api.core.ActiveMQException;
import .apache.activemq.artemis.api.core.client.*;
import .apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import .slf4j.Logger;
import .slf4j.LoggerFactory;

import java.util.Timer;
import java.util.TimerTask;

public class Example {
    private static final Logger log_ = LoggerFactory.getLogger(Example.class);

    public Example() throws Exception {
        EmbeddedActiveMQ broker = new EmbeddedActiveMQ();
        broker.setConfigResourcePath("file:cfg/broker.xml");
        broker.start();
    }

     public void createProducer() throws Exception {
        ServerLocator locator = ActiveMQClient.createServerLocator("vm://0");
        ClientSessionFactory factory = locator.createSessionFactory();
        ClientSession session = factory.createSession();
        session.start();

        ClientProducer producer = session.createProducer("address1");

        TimerTask task = new TimerTask() {
            public void run() {
                log_.info("Sending message...");
                ClientMessage message = session.createMessage(false);
                message.getBodyBuffer().writeString("Hi, There");
                try {
                    producer.send(message);
                } catch (ActiveMQException e) {
                    log_.warn("Error sending message: " + e.getMessage());
                }
            }
        };
        Timer timer = new Timer();
        timer.scheduleAtFixedRate(task, 5000, 5000);
    }

    public void createConsumer(int id) throws Exception {
        ServerLocator locator = ActiveMQClient.createServerLocator("vm://0");
        ClientSessionFactory factory = locator.createSessionFactory();
        ClientSession session = factory.createSession();
        session.start();
        ClientConsumer consumer = session.createConsumer("q1");
        consumer.setMessageHandler(new MessageHandler() {
            @Override
            public void onMessage(ClientMessage clientMessage) {
                log_.info(">>> consumer {} received {}", id, clientMessage);
            }
        });
    }

    public static void main(String[] args) throws Exception {
        LogUtil.initFrom("cfg/logging.properties");
        Example example = new Example();

        SleepUtil.waitFor(5);
        example.createProducer();

        example.createConsumer(0);
        example.createConsumer(1);
        example.createConsumer(2);
    }
}

When I run this, I get a message on a different consumer every 5s (0, 1, 2, 0, 1, 2, etc)

Can anyone point out where I've gone wrong?

I'm trying to implement a pub-sub with embedded ActiveMQ Artemis 2.40.0. I create the address and write messages to it, but I receive the messages on different clients in a round-robin fashion.

My broker.xml:

<configuration xmlns:xsi="http://www.w3./2001/XMLSchema-instance" xmlns="urn:activemq" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
    <core xmlns="urn:activemq:core">
        <persistence-enabled>false</persistence-enabled>
        <security-enabled>false</security-enabled>
        <acceptors>
            <acceptor name="in-vm">vm://0</acceptor>
        </acceptors>

        <addresses>
            <address name="address1">
                <multicast>
                    <queue name="q1" max-consumers="99">
                        <durable>false</durable>
                    </queue>
                </multicast>
            </address>
        </addresses>
    </core>
</configuration>

My code:

import com.shorcan.log.LogUtil;
import com.shorcan.util.SleepUtil;
import .apache.activemq.artemis.api.core.ActiveMQException;
import .apache.activemq.artemis.api.core.client.*;
import .apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import .slf4j.Logger;
import .slf4j.LoggerFactory;

import java.util.Timer;
import java.util.TimerTask;

public class Example {
    private static final Logger log_ = LoggerFactory.getLogger(Example.class);

    public Example() throws Exception {
        EmbeddedActiveMQ broker = new EmbeddedActiveMQ();
        broker.setConfigResourcePath("file:cfg/broker.xml");
        broker.start();
    }

     public void createProducer() throws Exception {
        ServerLocator locator = ActiveMQClient.createServerLocator("vm://0");
        ClientSessionFactory factory = locator.createSessionFactory();
        ClientSession session = factory.createSession();
        session.start();

        ClientProducer producer = session.createProducer("address1");

        TimerTask task = new TimerTask() {
            public void run() {
                log_.info("Sending message...");
                ClientMessage message = session.createMessage(false);
                message.getBodyBuffer().writeString("Hi, There");
                try {
                    producer.send(message);
                } catch (ActiveMQException e) {
                    log_.warn("Error sending message: " + e.getMessage());
                }
            }
        };
        Timer timer = new Timer();
        timer.scheduleAtFixedRate(task, 5000, 5000);
    }

    public void createConsumer(int id) throws Exception {
        ServerLocator locator = ActiveMQClient.createServerLocator("vm://0");
        ClientSessionFactory factory = locator.createSessionFactory();
        ClientSession session = factory.createSession();
        session.start();
        ClientConsumer consumer = session.createConsumer("q1");
        consumer.setMessageHandler(new MessageHandler() {
            @Override
            public void onMessage(ClientMessage clientMessage) {
                log_.info(">>> consumer {} received {}", id, clientMessage);
            }
        });
    }

    public static void main(String[] args) throws Exception {
        LogUtil.initFrom("cfg/logging.properties");
        Example example = new Example();

        SleepUtil.waitFor(5);
        example.createProducer();

        example.createConsumer(0);
        example.createConsumer(1);
        example.createConsumer(2);
    }
}

When I run this, I get a message on a different consumer every 5s (0, 1, 2, 0, 1, 2, etc)

Can anyone point out where I've gone wrong?

Share Improve this question edited Mar 28 at 14:57 Justin Bertram 35.4k6 gold badges26 silver badges49 bronze badges asked Mar 25 at 19:18 GordGord 176 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 1

Correct me if I'm wrong, but you are using a multicast address and defining a singular queue q1 under that. This unfortunately would put you into a competing consumer situation.

Update your broker.xml thusly (note <multicast /> with no defined queue)

<configuration xmlns:xsi="http://www.w3./2001/XMLSchema-instance" xmlns="urn:activemq" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
    <core xmlns="urn:activemq:core">
        <persistence-enabled>false</persistence-enabled>
        <security-enabled>false</security-enabled>
        <acceptors>
            <acceptor name="in-vm">vm://0</acceptor>
        </acceptors>

        <addresses>
            <address name="address1">
                <multicast />
            </address>
        </addresses>
    </core>
</configuration>

In your create client segment you have to do a little bit of trickery like this:

import java.util.UUID;

public class Example {
    public void createConsumer(int id) throws Exception {
        ServerLocator locator = ActiveMQClient.createServerLocator("vm://0");
        ClientSessionFactory factory = locator.createSessionFactory();
        ClientSession session = factory.createSession();
        session.start();

        String consumerQueueName = "address1-"+UUID.randomUUID();
        QueueConfiguration topicQueueConfiguration =
            QueueConfiguration.of(consumerQueueName)
                                .setAddress("address1")
                                .setDurable(false)
                                .setTransient(true)
                                .setRoutingType(RoutingType.MULTICAST);
        session.createQueue(topicQueueConfiguration);
                                

        ClientConsumer consumer = session.createConsumer(consumerQueueName);
        consumer.setMessageHandler(new MessageHandler() {
            @Override
            public void onMessage(ClientMessage clientMessage) {
                log_.info(">>> consumer {} received {}", id, clientMessage);
            }
        });
    }
}

It has been a hot minute since I last did this, however you have to dynamically create an ANYCAST queue attached to the address that is transient and will represent a specific client listening. It should auto-delete upon closure of the client session.

The addressing used by artemis is more or less this:

You have an address which is a destination that you can send messages to. You have the option of routing out as ANYCAST (traditional point-to-point) or MULTICAST (traditional pub-sub).

Each client that wants to get a DISTINCT message must have a DISTINCT message queue to receive on. That is why I had to define the QueueConfiguration with a UUID and that SHOULD solve your issue.

If you are doing this in a production targeted application, you will need to do some more work on ensuring deletion of the queue associated with the consumer on termination, otherwise the queue will persist as a multicast member which may degrade performance over time.

Hopefully this will address your situation.

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744173928a4561664.html

相关推荐

  • java - ActiveMQ multicast topic does round-robin instead - Stack Overflow

    I'm trying to implement a pub-sub with embedded ActiveMQ Artemis 2.40.0.I create the address and

    9天前
    30

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信