I am trying to build a simple node.js client using the amqp library, that opens a single connection and then a single channel to a RabbitMQ server. I want to reuse the same connection and channel to send multiple messages. The main problem is, that I don't want to write my entire code inside the callback function of the ceateChannel() function.
How do I reuse the channel outside of the callback function and make sure the callback function has finished before I use the channel?
I've tried both the callback way and the promise way but I can't make either of them work. When using the callback method I run into the described problem.
When using promises, I have the problem that I can't keep a reference of the connection and channel outside of the .then() function because the passed variables get destroyed after setting up the connection and channel.
amqp.connect('amqp://localhost', (err, conn) => {
if (err !== null) return console.warn(err);
console.log('Created connection!');
conn.createChannel((err, ch) => {
if (err !== null) return console.warn(err);
console.log('Created channel!');
//this is where I would need to write the code that uses the variable "ch"
//but I want to move the code outside of this structure, while making sure
//this callback pletes before I try using "ch"
});
});
amqp.connect('amqp://localhost').then((conn) => {
return conn.createChannel();
}).then((ch) => {
this.channel = ch;
return ch.assertQueue('', {}).then((ok) => {
return this.queueName = ok.queue;
});
}).catch(console.warn);
I am trying to build a simple node.js client using the amqp library, that opens a single connection and then a single channel to a RabbitMQ server. I want to reuse the same connection and channel to send multiple messages. The main problem is, that I don't want to write my entire code inside the callback function of the ceateChannel() function.
How do I reuse the channel outside of the callback function and make sure the callback function has finished before I use the channel?
I've tried both the callback way and the promise way but I can't make either of them work. When using the callback method I run into the described problem.
When using promises, I have the problem that I can't keep a reference of the connection and channel outside of the .then() function because the passed variables get destroyed after setting up the connection and channel.
amqp.connect('amqp://localhost', (err, conn) => {
if (err !== null) return console.warn(err);
console.log('Created connection!');
conn.createChannel((err, ch) => {
if (err !== null) return console.warn(err);
console.log('Created channel!');
//this is where I would need to write the code that uses the variable "ch"
//but I want to move the code outside of this structure, while making sure
//this callback pletes before I try using "ch"
});
});
amqp.connect('amqp://localhost').then((conn) => {
return conn.createChannel();
}).then((ch) => {
this.channel = ch;
return ch.assertQueue('', {}).then((ok) => {
return this.queueName = ok.queue;
});
}).catch(console.warn);
Share
Improve this question
asked Mar 29, 2019 at 12:44
Steve MelonsSteve Melons
832 silver badges9 bronze badges
3
-
why you don't use
async\await
? const conn = await amqp.connect('amqp://localhost'); const ch = await conn.createChannel(); // after that you can use ch anywhere – Егор Лебедев Commented Mar 29, 2019 at 13:06 - Where exactly would i use that? I've tried but I don't exactly know how the best way would be. – Steve Melons Commented Mar 29, 2019 at 13:09
- I answered for better format of text – Егор Лебедев Commented Mar 29, 2019 at 13:12
2 Answers
Reset to default 6why you don't use async\await
?
const conn = await amqp.connect('amqp://localhost');
const ch = await conn.createChannel();
// after that you can use ch anywhere, don't forget to handle exceptions
Also if you use amqplib
, don't forget to handle close
and internal error
events, for example like this:
conn.on('error', function (err) {
console.log('AMQP:Error:', err);
});
conn.on('close', () => {
console.log("AMQP:Closed");
});
Try with a class, like this:
RabbitConnection.js
const amqp = require('amqplib');
const RabbitSettings = {
protocol: 'amqp',
hostname: 'localhost',
port: 5672,
username: 'guest',
password: 'guest',
authMechanism: 'AMQPLAIN',
vhost: '/',
queue: 'test'
}
class RabbitConnection {
constructor() {
RabbitConnection.createConnection();
this.connection = null;
this.channel = null;
}
static getInstance() {
if (!RabbitConnection.instance) {
RabbitConnection.instance = new RabbitConnection();
}
return RabbitConnection.instance;
}
//create connection to rabbitmq
static async createConnection() {
try {
this.connection = await amqp.connect(`${RabbitSettings.protocol}://${RabbitSettings.username}:${RabbitSettings.password}@${RabbitSettings.hostname}:${RabbitSettings.port}${RabbitSettings.vhost}`);
this.channel = await this.connection.createChannel();
this.channel.assertQueue(RabbitSettings.queue);
console.log('Connection to RabbitMQ established');
} catch (error) {
console.log(error);
}
}
//send message to rabbitmq queue
static async sendMessage(message, queueName) {
try {
let msg = await this.channel.sendToQueue(queueName, Buffer.from(message));
console.log('Message sent to RabbitMQ');
return msg;
} catch (error) {
console.log(error);
}
}
}
module.exports = { RabbitConnection };
ServerExpress.js
const express = require('express');
const { RabbitConnection } = require('./RabbitConnection');
const serverUp = () => {
const app = express();
app.get('/', (req, res) => {
RabbitConnection.sendMessage('Hello World', 'test');
res.send('Hello World!');
});
app.listen(3000, () => {
console.log('Server is running on port 3000');
});
};
module.exports = { serverUp };
index.js
const { RabbitConnection } = require("./RabbitConnection");
const { serverUp } = require("./ServerExpress");
serverUp();
RabbitConnection.getInstance();
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744307512a4567786.html
评论列表(0条)