I'm trying to use the Azure ServiceBusSessionProcessor (Azure.Messaging.ServiceBus) to processes tracking-related messages from an application. Due to different reasons (not to bore you with the details) we would like to batch process them by sessionId to avoid killing the database.
To do this, we simply store the messages in a dictionary until they are ready to be processed (handled by a timer event).
However, it seems we're not allowed to complete a message from another thread. If I complete the message in the message handler, it works - but kills the performance.
If I try to do it from another thread, I immediately get the following error:
Error: "ServiceBusReceiver has already been closed and cannot perform the requested operation.\r\nObject name: 'ServiceBusReceiver'."
I interpret the ServiceBusSessionProcessorOptions.AutoCompleteMessages Property as I should be allowed to complete the message as I see fit (inside the lock duration of course), but fore some reason I'm not allowed to.
Am I doing something weird here or am I trying to achieve something that is simply not possible using the ServiceBusSessionProcessor?
Br, Mikael
Here's some sample code for demo purposes
using Azure.Messaging.ServiceBus;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Threading.Tasks;
using System.Timers;
namespace ServiceBus.DLQ.Reader
{
public class Demo
{
public static async Task Main(string[] args)
{
DempoSessionProcessReader reader = new DempoSessionProcessReader(
new ServiceBusClient("Endpoint=sb://*****.servicebus.windows/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=*****"), "mikes.test.queue.session");
try
{
await reader.CreateSessionProcessors(10, 10, 10, 2, false);
//To stop!
Console.ReadKey();
return;
}
catch (Exception ex)
{
}
}
}
/// <summary>
/// A class for grouping all messages for a particula session.
/// The timestamp is to keep track of when to start processing them (persist)
/// </summary>
public class GroupedSession
{
public DateTime Timestamp { get; set; } //When this was created (i.e. when the first message for a message was added
public List<ProcessSessionMessageEventArgs> Messages { get; set; }
public GroupedSession()
{
}
}
public class DempoSessionProcessReader
{
List<ServiceBusSessionProcessor> processorList =new List<ServiceBusSessionProcessor>();
List<Guid> guids = new List<Guid>();
public ServiceBusClient _client;
public string Queue { get; set; }
private static bool _automcomplete = false;
/// <summary>
/// A list of messages with the sessionId as the key
/// </summary>
public static IDictionary<string, GroupedSession> _messages = new Dictionary<string, GroupedSession>();
static object _lock = new object();
private delegate void FlowHandlerDelegate(string sessionId, ProcessSessionMessageEventArgs[] messages);
static Timer _timer = new Timer();
public DempoSessionProcessReader(ServiceBusClient client, string queue)
{
_client = client;
Queue = queue;
//Generate some guids for testing purposes which we can use when preloading the queue with messages
for(int i=0;i<1000;i++)
guids.Add(Guid.NewGuid());
_timer.Elapsed += ProcessMessageTimerTick;
_timer.Interval = 2000;
_timer.Enabled = true;
_timer.Start();
}
public async Task CreateSessionProcessors(int numberOfProcesses, int concurrentSessionsPerProcessor, int prefetchCount, int sessionTimeout, bool autocompleteMessage)
{
ServiceBusSessionProcessor processor = null!;
for (int i=0;i<numberOfProcesses;i++)
{
_automcomplete = autocompleteMessage;
processor = _client.CreateSessionProcessor("mikes.test.queue.session", new ServiceBusSessionProcessorOptions()
{
ReceiveMode = ServiceBusReceiveMode.PeekLock,
PrefetchCount = prefetchCount,
AutoCompleteMessages = autocompleteMessage,
SessionIdleTimeout = TimeSpan.FromSeconds(sessionTimeout),
MaxConcurrentSessions = concurrentSessionsPerProcessor
});
processor.ProcessMessageAsync += MessageHandler;
processor.ProcessErrorAsync += ErrorHandler;
processorList.Add(processor);
}
for (int i = 0; i < numberOfProcesses; i++) {
await processorList[i].StartProcessingAsync();
}
}
Task ErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine(args.Exception);
return Task.CompletedTask;
}
async Task MessageHandler(ProcessSessionMessageEventArgs args)
{
string sessionId = args.Message.SessionId;
lock (_lock)
{
if (!_messages.ContainsKey(sessionId))
_messages.Add(sessionId, new GroupedSession()
{
Timestamp = DateTime.Now,
Messages = new List<ProcessSessionMessageEventArgs>() { args }
});
else
_messages[sessionId].Messages.Add(args);
//PersistMessages(null, null); //We can do it here, but it kills the throughput and gives us other issues
}
await Task.CompletedTask;
}
private void ProcessMessageTimerTick(object source, ElapsedEventArgs e)
{
//Loop over all session Id's and see if any of them are older than 5 seconds
//In that case, remove it from the dictionary, process/persist them and them complete the messages
for (int i = 0; i < _messages.Count; i++) {
if(_messages.Count > i)
{
if (_messages.ElementAt(i).Value.Timestamp.AddSeconds(5) < DateTime.Now)
{
lock (_lock)
{
string sessionId = _messages.ElementAt(i).Key;
ProcessSessionMessageEventArgs[] array = new ProcessSessionMessageEventArgs[_messages[sessionId].Messages.Count];
_messages[sessionId].Messages.CopyTo(array, 0);
Array.Sort(array);
CompleteMessageList(array).GetAwaiter().GetResult(); //Not allowed to complete here :/
_messages.Remove(sessionId);
}
}
}
}
}
private async Task CompleteMessageList(ProcessSessionMessageEventArgs[] messages)
{
try
{
if (!_automcomplete) //Just for testing purposes
{
for (int i = 0; i < messages.Length; i++)
{
await messages[i].CompleteMessageAsync(messages[i].Message);
}
}
}
catch(Exception ex)
{
//"ServiceBusReceiver has already been closed and cannot perform the requested operation.\r\nObject name: 'ServiceBusReceiver'."
}
}
}
public class DateSorter : IComparer
{
// Calls CaseInsensitiveComparer.Compare with the parameters reversed.
int IComparer.Compare(object x, object y)
{
DateTime a = DateTime.ParseExact((string)((ProcessSessionMessageEventArgs)x).Message.ApplicationProperties["mikestimestamp"], "yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture);
DateTime b = DateTime.ParseExact((string)((ProcessSessionMessageEventArgs)y).Message.ApplicationProperties["mikestimestamp"], "yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture);
if (a > b)
return 1;
if (a < b)
return -1;
return 0;
}
}
}
I'm trying to use the Azure ServiceBusSessionProcessor (Azure.Messaging.ServiceBus) to processes tracking-related messages from an application. Due to different reasons (not to bore you with the details) we would like to batch process them by sessionId to avoid killing the database.
To do this, we simply store the messages in a dictionary until they are ready to be processed (handled by a timer event).
However, it seems we're not allowed to complete a message from another thread. If I complete the message in the message handler, it works - but kills the performance.
If I try to do it from another thread, I immediately get the following error:
Error: "ServiceBusReceiver has already been closed and cannot perform the requested operation.\r\nObject name: 'ServiceBusReceiver'."
I interpret the ServiceBusSessionProcessorOptions.AutoCompleteMessages Property as I should be allowed to complete the message as I see fit (inside the lock duration of course), but fore some reason I'm not allowed to.
Am I doing something weird here or am I trying to achieve something that is simply not possible using the ServiceBusSessionProcessor?
Br, Mikael
Here's some sample code for demo purposes
using Azure.Messaging.ServiceBus;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Threading.Tasks;
using System.Timers;
namespace ServiceBus.DLQ.Reader
{
public class Demo
{
public static async Task Main(string[] args)
{
DempoSessionProcessReader reader = new DempoSessionProcessReader(
new ServiceBusClient("Endpoint=sb://*****.servicebus.windows/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=*****"), "mikes.test.queue.session");
try
{
await reader.CreateSessionProcessors(10, 10, 10, 2, false);
//To stop!
Console.ReadKey();
return;
}
catch (Exception ex)
{
}
}
}
/// <summary>
/// A class for grouping all messages for a particula session.
/// The timestamp is to keep track of when to start processing them (persist)
/// </summary>
public class GroupedSession
{
public DateTime Timestamp { get; set; } //When this was created (i.e. when the first message for a message was added
public List<ProcessSessionMessageEventArgs> Messages { get; set; }
public GroupedSession()
{
}
}
public class DempoSessionProcessReader
{
List<ServiceBusSessionProcessor> processorList =new List<ServiceBusSessionProcessor>();
List<Guid> guids = new List<Guid>();
public ServiceBusClient _client;
public string Queue { get; set; }
private static bool _automcomplete = false;
/// <summary>
/// A list of messages with the sessionId as the key
/// </summary>
public static IDictionary<string, GroupedSession> _messages = new Dictionary<string, GroupedSession>();
static object _lock = new object();
private delegate void FlowHandlerDelegate(string sessionId, ProcessSessionMessageEventArgs[] messages);
static Timer _timer = new Timer();
public DempoSessionProcessReader(ServiceBusClient client, string queue)
{
_client = client;
Queue = queue;
//Generate some guids for testing purposes which we can use when preloading the queue with messages
for(int i=0;i<1000;i++)
guids.Add(Guid.NewGuid());
_timer.Elapsed += ProcessMessageTimerTick;
_timer.Interval = 2000;
_timer.Enabled = true;
_timer.Start();
}
public async Task CreateSessionProcessors(int numberOfProcesses, int concurrentSessionsPerProcessor, int prefetchCount, int sessionTimeout, bool autocompleteMessage)
{
ServiceBusSessionProcessor processor = null!;
for (int i=0;i<numberOfProcesses;i++)
{
_automcomplete = autocompleteMessage;
processor = _client.CreateSessionProcessor("mikes.test.queue.session", new ServiceBusSessionProcessorOptions()
{
ReceiveMode = ServiceBusReceiveMode.PeekLock,
PrefetchCount = prefetchCount,
AutoCompleteMessages = autocompleteMessage,
SessionIdleTimeout = TimeSpan.FromSeconds(sessionTimeout),
MaxConcurrentSessions = concurrentSessionsPerProcessor
});
processor.ProcessMessageAsync += MessageHandler;
processor.ProcessErrorAsync += ErrorHandler;
processorList.Add(processor);
}
for (int i = 0; i < numberOfProcesses; i++) {
await processorList[i].StartProcessingAsync();
}
}
Task ErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine(args.Exception);
return Task.CompletedTask;
}
async Task MessageHandler(ProcessSessionMessageEventArgs args)
{
string sessionId = args.Message.SessionId;
lock (_lock)
{
if (!_messages.ContainsKey(sessionId))
_messages.Add(sessionId, new GroupedSession()
{
Timestamp = DateTime.Now,
Messages = new List<ProcessSessionMessageEventArgs>() { args }
});
else
_messages[sessionId].Messages.Add(args);
//PersistMessages(null, null); //We can do it here, but it kills the throughput and gives us other issues
}
await Task.CompletedTask;
}
private void ProcessMessageTimerTick(object source, ElapsedEventArgs e)
{
//Loop over all session Id's and see if any of them are older than 5 seconds
//In that case, remove it from the dictionary, process/persist them and them complete the messages
for (int i = 0; i < _messages.Count; i++) {
if(_messages.Count > i)
{
if (_messages.ElementAt(i).Value.Timestamp.AddSeconds(5) < DateTime.Now)
{
lock (_lock)
{
string sessionId = _messages.ElementAt(i).Key;
ProcessSessionMessageEventArgs[] array = new ProcessSessionMessageEventArgs[_messages[sessionId].Messages.Count];
_messages[sessionId].Messages.CopyTo(array, 0);
Array.Sort(array);
CompleteMessageList(array).GetAwaiter().GetResult(); //Not allowed to complete here :/
_messages.Remove(sessionId);
}
}
}
}
}
private async Task CompleteMessageList(ProcessSessionMessageEventArgs[] messages)
{
try
{
if (!_automcomplete) //Just for testing purposes
{
for (int i = 0; i < messages.Length; i++)
{
await messages[i].CompleteMessageAsync(messages[i].Message);
}
}
}
catch(Exception ex)
{
//"ServiceBusReceiver has already been closed and cannot perform the requested operation.\r\nObject name: 'ServiceBusReceiver'."
}
}
}
public class DateSorter : IComparer
{
// Calls CaseInsensitiveComparer.Compare with the parameters reversed.
int IComparer.Compare(object x, object y)
{
DateTime a = DateTime.ParseExact((string)((ProcessSessionMessageEventArgs)x).Message.ApplicationProperties["mikestimestamp"], "yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture);
DateTime b = DateTime.ParseExact((string)((ProcessSessionMessageEventArgs)y).Message.ApplicationProperties["mikestimestamp"], "yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture);
if (a > b)
return 1;
if (a < b)
return -1;
return 0;
}
}
}
Share
Improve this question
edited Mar 11 at 10:26
Mike
asked Mar 11 at 10:02
MikeMike
1653 silver badges12 bronze badges
3
|
1 Answer
Reset to default 0The errors occurs due the above code attempted to complete messages from a separate background timer .
But Azure Service Bus sessions are tied to a specific ServiceBusSessionProcessor
, and once the session processing completes, the ServiceBusReceiver
is closed automatically.
So, when the timer tried to complete the messages later, the session was already closed, causing this error
Modify your dictionary:
public static IDictionary<string, (GroupedSession session, ProcessSessionMessageEventArgs args)> _messages
= new Dictionary<string, (GroupedSession, ProcessSessionMessageEventArgs)>();
Then store the session args along with messages:
lock (_lock)
{
if (!_messages.ContainsKey(sessionId))
_messages.Add(sessionId, (new GroupedSession
{
Timestamp = DateTime.Now,
Messages = new List<ProcessSessionMessageEventArgs> { args }
}, args));
else
_messages[sessionId].session.Messages.Add(args);
}
Then, when processing messages, use:
await _messages[sessionId].args.CompleteMessageAsync(message);
This ensures the completion happens while the receiver is still open.
You could extend the session lock before completing messages:
await args.RenewSessionLockAsync();
This ensures the session isn’t closed before you complete the messages.
Instead of completing messages from the timer event, enqueue them to a background processing task that runs within the session’s lifecycle:
ConcurrentQueue<ProcessSessionMessageEventArgs> _messageQueue = new ConcurrentQueue<ProcessSessionMessageEventArgs>();
Task.Run(async () =>
{
while (true)
{
if (_messageQueue.TryDequeue(out var args))
{
await args.CompleteMessageAsync(args.Message);
}
await Task.Delay(100);
}
});
I refer to these documents for sending and receiving session messages from a session-enabled Service Bus:
- Sending & Receiving Session Messages
- Session Processor
Please refer to this git link for the complete code.
Output:
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744802560a4594579.html
MessageHandler
is done executing as to my understanding. – Fildor Commented Mar 11 at 10:36