I'm trying to implement a pub-sub with embedded ActiveMQ Artemis 2.40.0. I create the address and write messages to it, but I receive the messages on different clients in a round-robin fashion.
My broker.xml
:
<configuration xmlns:xsi="; xmlns="urn:activemq" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
<core xmlns="urn:activemq:core">
<persistence-enabled>false</persistence-enabled>
<security-enabled>false</security-enabled>
<acceptors>
<acceptor name="in-vm">vm://0</acceptor>
</acceptors>
<addresses>
<address name="address1">
<multicast>
<queue name="q1" max-consumers="99">
<durable>false</durable>
</queue>
</multicast>
</address>
</addresses>
</core>
</configuration>
My code:
import com.shorcan.log.LogUtil;
import com.shorcan.util.SleepUtil;
import .apache.activemq.artemis.api.core.ActiveMQException;
import .apache.activemq.artemis.api.core.client.*;
import .apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import .slf4j.Logger;
import .slf4j.LoggerFactory;
import java.util.Timer;
import java.util.TimerTask;
public class Example {
private static final Logger log_ = LoggerFactory.getLogger(Example.class);
public Example() throws Exception {
EmbeddedActiveMQ broker = new EmbeddedActiveMQ();
broker.setConfigResourcePath("file:cfg/broker.xml");
broker.start();
}
public void createProducer() throws Exception {
ServerLocator locator = ActiveMQClient.createServerLocator("vm://0");
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession();
session.start();
ClientProducer producer = session.createProducer("address1");
TimerTask task = new TimerTask() {
public void run() {
log_.info("Sending message...");
ClientMessage message = session.createMessage(false);
message.getBodyBuffer().writeString("Hi, There");
try {
producer.send(message);
} catch (ActiveMQException e) {
log_.warn("Error sending message: " + e.getMessage());
}
}
};
Timer timer = new Timer();
timer.scheduleAtFixedRate(task, 5000, 5000);
}
public void createConsumer(int id) throws Exception {
ServerLocator locator = ActiveMQClient.createServerLocator("vm://0");
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession();
session.start();
ClientConsumer consumer = session.createConsumer("q1");
consumer.setMessageHandler(new MessageHandler() {
@Override
public void onMessage(ClientMessage clientMessage) {
log_.info(">>> consumer {} received {}", id, clientMessage);
}
});
}
public static void main(String[] args) throws Exception {
LogUtil.initFrom("cfg/logging.properties");
Example example = new Example();
SleepUtil.waitFor(5);
example.createProducer();
example.createConsumer(0);
example.createConsumer(1);
example.createConsumer(2);
}
}
When I run this, I get a message on a different consumer every 5s (0, 1, 2, 0, 1, 2, etc)
Can anyone point out where I've gone wrong?
I'm trying to implement a pub-sub with embedded ActiveMQ Artemis 2.40.0. I create the address and write messages to it, but I receive the messages on different clients in a round-robin fashion.
My broker.xml
:
<configuration xmlns:xsi="http://www.w3./2001/XMLSchema-instance" xmlns="urn:activemq" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
<core xmlns="urn:activemq:core">
<persistence-enabled>false</persistence-enabled>
<security-enabled>false</security-enabled>
<acceptors>
<acceptor name="in-vm">vm://0</acceptor>
</acceptors>
<addresses>
<address name="address1">
<multicast>
<queue name="q1" max-consumers="99">
<durable>false</durable>
</queue>
</multicast>
</address>
</addresses>
</core>
</configuration>
My code:
import com.shorcan.log.LogUtil;
import com.shorcan.util.SleepUtil;
import .apache.activemq.artemis.api.core.ActiveMQException;
import .apache.activemq.artemis.api.core.client.*;
import .apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import .slf4j.Logger;
import .slf4j.LoggerFactory;
import java.util.Timer;
import java.util.TimerTask;
public class Example {
private static final Logger log_ = LoggerFactory.getLogger(Example.class);
public Example() throws Exception {
EmbeddedActiveMQ broker = new EmbeddedActiveMQ();
broker.setConfigResourcePath("file:cfg/broker.xml");
broker.start();
}
public void createProducer() throws Exception {
ServerLocator locator = ActiveMQClient.createServerLocator("vm://0");
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession();
session.start();
ClientProducer producer = session.createProducer("address1");
TimerTask task = new TimerTask() {
public void run() {
log_.info("Sending message...");
ClientMessage message = session.createMessage(false);
message.getBodyBuffer().writeString("Hi, There");
try {
producer.send(message);
} catch (ActiveMQException e) {
log_.warn("Error sending message: " + e.getMessage());
}
}
};
Timer timer = new Timer();
timer.scheduleAtFixedRate(task, 5000, 5000);
}
public void createConsumer(int id) throws Exception {
ServerLocator locator = ActiveMQClient.createServerLocator("vm://0");
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession();
session.start();
ClientConsumer consumer = session.createConsumer("q1");
consumer.setMessageHandler(new MessageHandler() {
@Override
public void onMessage(ClientMessage clientMessage) {
log_.info(">>> consumer {} received {}", id, clientMessage);
}
});
}
public static void main(String[] args) throws Exception {
LogUtil.initFrom("cfg/logging.properties");
Example example = new Example();
SleepUtil.waitFor(5);
example.createProducer();
example.createConsumer(0);
example.createConsumer(1);
example.createConsumer(2);
}
}
When I run this, I get a message on a different consumer every 5s (0, 1, 2, 0, 1, 2, etc)
Can anyone point out where I've gone wrong?
Share Improve this question edited Mar 28 at 14:57 Justin Bertram 35.4k6 gold badges26 silver badges49 bronze badges asked Mar 25 at 19:18 GordGord 176 bronze badges1 Answer
Reset to default 1Correct me if I'm wrong, but you are using a multicast address and defining a singular queue q1
under that. This unfortunately would put you into a competing consumer situation.
Update your broker.xml
thusly (note <multicast />
with no defined queue)
<configuration xmlns:xsi="http://www.w3./2001/XMLSchema-instance" xmlns="urn:activemq" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
<core xmlns="urn:activemq:core">
<persistence-enabled>false</persistence-enabled>
<security-enabled>false</security-enabled>
<acceptors>
<acceptor name="in-vm">vm://0</acceptor>
</acceptors>
<addresses>
<address name="address1">
<multicast />
</address>
</addresses>
</core>
</configuration>
In your create client segment you have to do a little bit of trickery like this:
import java.util.UUID;
public class Example {
public void createConsumer(int id) throws Exception {
ServerLocator locator = ActiveMQClient.createServerLocator("vm://0");
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession();
session.start();
String consumerQueueName = "address1-"+UUID.randomUUID();
QueueConfiguration topicQueueConfiguration =
QueueConfiguration.of(consumerQueueName)
.setAddress("address1")
.setDurable(false)
.setTransient(true)
.setRoutingType(RoutingType.MULTICAST);
session.createQueue(topicQueueConfiguration);
ClientConsumer consumer = session.createConsumer(consumerQueueName);
consumer.setMessageHandler(new MessageHandler() {
@Override
public void onMessage(ClientMessage clientMessage) {
log_.info(">>> consumer {} received {}", id, clientMessage);
}
});
}
}
It has been a hot minute since I last did this, however you have to dynamically create an ANYCAST queue attached to the address that is transient and will represent a specific client listening. It should auto-delete upon closure of the client session.
The addressing used by artemis is more or less this:
You have an address which is a destination that you can send messages to. You have the option of routing out as ANYCAST (traditional point-to-point) or MULTICAST (traditional pub-sub).
Each client that wants to get a DISTINCT message must have a DISTINCT message queue to receive on. That is why I had to define the QueueConfiguration
with a UUID and that SHOULD solve your issue.
If you are doing this in a production targeted application, you will need to do some more work on ensuring deletion of the queue associated with the consumer on termination, otherwise the queue will persist as a multicast member which may degrade performance over time.
Hopefully this will address your situation.
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744173928a4561664.html
评论列表(0条)