c# - Complete messages from another thread when using ServiceBusSessionProcesser - Stack Overflow

I'm trying to use the Azure ServiceBusSessionProcessor (Azure.Messaging.ServiceBus) to processes t

I'm trying to use the Azure ServiceBusSessionProcessor (Azure.Messaging.ServiceBus) to processes tracking-related messages from an application. Due to different reasons (not to bore you with the details) we would like to batch process them by sessionId to avoid killing the database.

To do this, we simply store the messages in a dictionary until they are ready to be processed (handled by a timer event).

However, it seems we're not allowed to complete a message from another thread. If I complete the message in the message handler, it works - but kills the performance.

If I try to do it from another thread, I immediately get the following error:

Error: "ServiceBusReceiver has already been closed and cannot perform the requested operation.\r\nObject name: 'ServiceBusReceiver'."

I interpret the ServiceBusSessionProcessorOptions.AutoCompleteMessages Property as I should be allowed to complete the message as I see fit (inside the lock duration of course), but fore some reason I'm not allowed to.

Am I doing something weird here or am I trying to achieve something that is simply not possible using the ServiceBusSessionProcessor?

Br, Mikael

Here's some sample code for demo purposes


using Azure.Messaging.ServiceBus;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Threading.Tasks;
using System.Timers;

namespace ServiceBus.DLQ.Reader
{
   public class Demo
    {
        public static async Task Main(string[] args)
        {
            DempoSessionProcessReader reader = new DempoSessionProcessReader(
                new ServiceBusClient("Endpoint=sb://*****.servicebus.windows/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=*****"), "mikes.test.queue.session");

            try
            {
                await reader.CreateSessionProcessors(10, 10, 10, 2, false);

                //To stop!
                Console.ReadKey();
                return;

            }
            catch (Exception ex)
            {
            }
        }
    }

    /// <summary>
    /// A class for grouping all messages for a particula session.
    /// The timestamp is to keep track of when to start processing them (persist)
    /// </summary>
    public class GroupedSession
    {
        public DateTime Timestamp { get; set; } //When this was created (i.e. when the first message for a message was added
        public List<ProcessSessionMessageEventArgs> Messages { get; set; }

        public GroupedSession()
        {

        }
    }
    public class DempoSessionProcessReader
    {
        List<ServiceBusSessionProcessor> processorList =new List<ServiceBusSessionProcessor>();
        List<Guid> guids = new List<Guid>();
        public ServiceBusClient _client;
        public string Queue { get; set; }
        private static bool _automcomplete = false;

        /// <summary>
        /// A list of messages with the sessionId as the key
        /// </summary>
        public static IDictionary<string, GroupedSession> _messages = new Dictionary<string, GroupedSession>();
        static object _lock = new object();
        private delegate void FlowHandlerDelegate(string sessionId, ProcessSessionMessageEventArgs[] messages); 
        static Timer _timer = new Timer();

        public DempoSessionProcessReader(ServiceBusClient client, string queue)
        {
            _client = client; 
            Queue = queue;

            //Generate some guids for testing purposes which we can use when preloading the queue with messages
            for(int i=0;i<1000;i++)
                guids.Add(Guid.NewGuid());

            _timer.Elapsed += ProcessMessageTimerTick;
            _timer.Interval = 2000;
            _timer.Enabled = true;
            _timer.Start();
        }

        public async Task CreateSessionProcessors(int numberOfProcesses, int concurrentSessionsPerProcessor, int prefetchCount, int sessionTimeout, bool autocompleteMessage)
        {
            ServiceBusSessionProcessor processor = null!;

            for (int i=0;i<numberOfProcesses;i++)
            {
                _automcomplete = autocompleteMessage;

                processor = _client.CreateSessionProcessor("mikes.test.queue.session", new ServiceBusSessionProcessorOptions()
                {
                    ReceiveMode = ServiceBusReceiveMode.PeekLock,
                    PrefetchCount = prefetchCount,
                    AutoCompleteMessages = autocompleteMessage,
                    SessionIdleTimeout = TimeSpan.FromSeconds(sessionTimeout),
                    MaxConcurrentSessions = concurrentSessionsPerProcessor
                });

                processor.ProcessMessageAsync += MessageHandler;
                processor.ProcessErrorAsync += ErrorHandler;

                processorList.Add(processor);
            }

            for (int i = 0; i < numberOfProcesses; i++) {
                await processorList[i].StartProcessingAsync();
            }
        }

        Task ErrorHandler(ProcessErrorEventArgs args)
        {
            Console.WriteLine(args.Exception);
            return Task.CompletedTask;
        }

        async Task MessageHandler(ProcessSessionMessageEventArgs args)
        {
            string sessionId = args.Message.SessionId;

            lock (_lock)
            {
                if (!_messages.ContainsKey(sessionId))
                    _messages.Add(sessionId, new GroupedSession() 
                    { 
                        Timestamp = DateTime.Now,
                        Messages = new List<ProcessSessionMessageEventArgs>() { args }
                    });
                else
                    _messages[sessionId].Messages.Add(args);
                
                //PersistMessages(null, null); //We can do it here, but it kills the throughput and gives us other issues
            }

            await Task.CompletedTask;
        }

        private void ProcessMessageTimerTick(object source, ElapsedEventArgs e)
        {
            //Loop over all session Id's and see if any of them are older than 5 seconds
            //In that case, remove it from the dictionary, process/persist them and them complete the messages
            for (int i = 0; i < _messages.Count; i++) {
                if(_messages.Count > i)
                {
                    if (_messages.ElementAt(i).Value.Timestamp.AddSeconds(5) < DateTime.Now)
                    {
                        lock (_lock)
                        {
                            string sessionId = _messages.ElementAt(i).Key;
                            ProcessSessionMessageEventArgs[] array = new ProcessSessionMessageEventArgs[_messages[sessionId].Messages.Count];
                            _messages[sessionId].Messages.CopyTo(array, 0);
                            Array.Sort(array);
                            CompleteMessageList(array).GetAwaiter().GetResult(); //Not allowed to complete here :/ 
                            _messages.Remove(sessionId);
                        }
                    }
                }
            }
        }

        private async Task CompleteMessageList(ProcessSessionMessageEventArgs[] messages)
        {
            try
            {
                if (!_automcomplete) //Just for testing purposes
                {
                    for (int i = 0; i < messages.Length; i++)
                    {
                        await messages[i].CompleteMessageAsync(messages[i].Message);
                    }
                }
            }
            catch(Exception ex) 
            {
                //"ServiceBusReceiver has already been closed and cannot perform the requested operation.\r\nObject name: 'ServiceBusReceiver'."
            }
        }
    }

    public class DateSorter : IComparer
    {

        // Calls CaseInsensitiveComparer.Compare with the parameters reversed.
        int IComparer.Compare(object x, object y)
        {
            DateTime a = DateTime.ParseExact((string)((ProcessSessionMessageEventArgs)x).Message.ApplicationProperties["mikestimestamp"], "yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture);
            DateTime b = DateTime.ParseExact((string)((ProcessSessionMessageEventArgs)y).Message.ApplicationProperties["mikestimestamp"], "yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture);

            if (a > b)
                return 1;
            if (a < b)
                return -1;

            return 0;
        }
    }
}

I'm trying to use the Azure ServiceBusSessionProcessor (Azure.Messaging.ServiceBus) to processes tracking-related messages from an application. Due to different reasons (not to bore you with the details) we would like to batch process them by sessionId to avoid killing the database.

To do this, we simply store the messages in a dictionary until they are ready to be processed (handled by a timer event).

However, it seems we're not allowed to complete a message from another thread. If I complete the message in the message handler, it works - but kills the performance.

If I try to do it from another thread, I immediately get the following error:

Error: "ServiceBusReceiver has already been closed and cannot perform the requested operation.\r\nObject name: 'ServiceBusReceiver'."

I interpret the ServiceBusSessionProcessorOptions.AutoCompleteMessages Property as I should be allowed to complete the message as I see fit (inside the lock duration of course), but fore some reason I'm not allowed to.

Am I doing something weird here or am I trying to achieve something that is simply not possible using the ServiceBusSessionProcessor?

Br, Mikael

Here's some sample code for demo purposes


using Azure.Messaging.ServiceBus;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Threading.Tasks;
using System.Timers;

namespace ServiceBus.DLQ.Reader
{
   public class Demo
    {
        public static async Task Main(string[] args)
        {
            DempoSessionProcessReader reader = new DempoSessionProcessReader(
                new ServiceBusClient("Endpoint=sb://*****.servicebus.windows/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=*****"), "mikes.test.queue.session");

            try
            {
                await reader.CreateSessionProcessors(10, 10, 10, 2, false);

                //To stop!
                Console.ReadKey();
                return;

            }
            catch (Exception ex)
            {
            }
        }
    }

    /// <summary>
    /// A class for grouping all messages for a particula session.
    /// The timestamp is to keep track of when to start processing them (persist)
    /// </summary>
    public class GroupedSession
    {
        public DateTime Timestamp { get; set; } //When this was created (i.e. when the first message for a message was added
        public List<ProcessSessionMessageEventArgs> Messages { get; set; }

        public GroupedSession()
        {

        }
    }
    public class DempoSessionProcessReader
    {
        List<ServiceBusSessionProcessor> processorList =new List<ServiceBusSessionProcessor>();
        List<Guid> guids = new List<Guid>();
        public ServiceBusClient _client;
        public string Queue { get; set; }
        private static bool _automcomplete = false;

        /// <summary>
        /// A list of messages with the sessionId as the key
        /// </summary>
        public static IDictionary<string, GroupedSession> _messages = new Dictionary<string, GroupedSession>();
        static object _lock = new object();
        private delegate void FlowHandlerDelegate(string sessionId, ProcessSessionMessageEventArgs[] messages); 
        static Timer _timer = new Timer();

        public DempoSessionProcessReader(ServiceBusClient client, string queue)
        {
            _client = client; 
            Queue = queue;

            //Generate some guids for testing purposes which we can use when preloading the queue with messages
            for(int i=0;i<1000;i++)
                guids.Add(Guid.NewGuid());

            _timer.Elapsed += ProcessMessageTimerTick;
            _timer.Interval = 2000;
            _timer.Enabled = true;
            _timer.Start();
        }

        public async Task CreateSessionProcessors(int numberOfProcesses, int concurrentSessionsPerProcessor, int prefetchCount, int sessionTimeout, bool autocompleteMessage)
        {
            ServiceBusSessionProcessor processor = null!;

            for (int i=0;i<numberOfProcesses;i++)
            {
                _automcomplete = autocompleteMessage;

                processor = _client.CreateSessionProcessor("mikes.test.queue.session", new ServiceBusSessionProcessorOptions()
                {
                    ReceiveMode = ServiceBusReceiveMode.PeekLock,
                    PrefetchCount = prefetchCount,
                    AutoCompleteMessages = autocompleteMessage,
                    SessionIdleTimeout = TimeSpan.FromSeconds(sessionTimeout),
                    MaxConcurrentSessions = concurrentSessionsPerProcessor
                });

                processor.ProcessMessageAsync += MessageHandler;
                processor.ProcessErrorAsync += ErrorHandler;

                processorList.Add(processor);
            }

            for (int i = 0; i < numberOfProcesses; i++) {
                await processorList[i].StartProcessingAsync();
            }
        }

        Task ErrorHandler(ProcessErrorEventArgs args)
        {
            Console.WriteLine(args.Exception);
            return Task.CompletedTask;
        }

        async Task MessageHandler(ProcessSessionMessageEventArgs args)
        {
            string sessionId = args.Message.SessionId;

            lock (_lock)
            {
                if (!_messages.ContainsKey(sessionId))
                    _messages.Add(sessionId, new GroupedSession() 
                    { 
                        Timestamp = DateTime.Now,
                        Messages = new List<ProcessSessionMessageEventArgs>() { args }
                    });
                else
                    _messages[sessionId].Messages.Add(args);
                
                //PersistMessages(null, null); //We can do it here, but it kills the throughput and gives us other issues
            }

            await Task.CompletedTask;
        }

        private void ProcessMessageTimerTick(object source, ElapsedEventArgs e)
        {
            //Loop over all session Id's and see if any of them are older than 5 seconds
            //In that case, remove it from the dictionary, process/persist them and them complete the messages
            for (int i = 0; i < _messages.Count; i++) {
                if(_messages.Count > i)
                {
                    if (_messages.ElementAt(i).Value.Timestamp.AddSeconds(5) < DateTime.Now)
                    {
                        lock (_lock)
                        {
                            string sessionId = _messages.ElementAt(i).Key;
                            ProcessSessionMessageEventArgs[] array = new ProcessSessionMessageEventArgs[_messages[sessionId].Messages.Count];
                            _messages[sessionId].Messages.CopyTo(array, 0);
                            Array.Sort(array);
                            CompleteMessageList(array).GetAwaiter().GetResult(); //Not allowed to complete here :/ 
                            _messages.Remove(sessionId);
                        }
                    }
                }
            }
        }

        private async Task CompleteMessageList(ProcessSessionMessageEventArgs[] messages)
        {
            try
            {
                if (!_automcomplete) //Just for testing purposes
                {
                    for (int i = 0; i < messages.Length; i++)
                    {
                        await messages[i].CompleteMessageAsync(messages[i].Message);
                    }
                }
            }
            catch(Exception ex) 
            {
                //"ServiceBusReceiver has already been closed and cannot perform the requested operation.\r\nObject name: 'ServiceBusReceiver'."
            }
        }
    }

    public class DateSorter : IComparer
    {

        // Calls CaseInsensitiveComparer.Compare with the parameters reversed.
        int IComparer.Compare(object x, object y)
        {
            DateTime a = DateTime.ParseExact((string)((ProcessSessionMessageEventArgs)x).Message.ApplicationProperties["mikestimestamp"], "yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture);
            DateTime b = DateTime.ParseExact((string)((ProcessSessionMessageEventArgs)y).Message.ApplicationProperties["mikestimestamp"], "yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture);

            if (a > b)
                return 1;
            if (a < b)
                return -1;

            return 0;
        }
    }
}
Share Improve this question edited Mar 11 at 10:26 Mike asked Mar 11 at 10:02 MikeMike 1653 silver badges12 bronze badges 3
  • Your assumption is wrong. Docs clearly say: "Gets or sets a value that indicates whether the processor should automatically complete messages after the ProcessMessageAsync handler has completed processing." - So, if you set this true, the message will be closed as soon as MessageHandler is done executing as to my understanding. – Fildor Commented Mar 11 at 10:36
  • Honestly: I think you are in kind of a dead-end. If I had to fix this, I'd scrap all this and start new, beginning with the dumbest possible solution and working my way up to where I want to be. – Fildor Commented Mar 11 at 10:39
  • @Fildor If I set it to false, it will not be completed automatically when exiting the MessageHandler. Hence I should be able to control when I do it - otherwise that setting seem a bit useless... – Mike Commented Mar 11 at 10:41
Add a comment  | 

1 Answer 1

Reset to default 0

The errors occurs due the above code attempted to complete messages from a separate background timer .

But Azure Service Bus sessions are tied to a specific ServiceBusSessionProcessor, and once the session processing completes, the ServiceBusReceiver is closed automatically. So, when the timer tried to complete the messages later, the session was already closed, causing this error

Modify your dictionary:

public static IDictionary<string, (GroupedSession session, ProcessSessionMessageEventArgs args)> _messages 
    = new Dictionary<string, (GroupedSession, ProcessSessionMessageEventArgs)>();

Then store the session args along with messages:

lock (_lock)
{
    if (!_messages.ContainsKey(sessionId))
        _messages.Add(sessionId, (new GroupedSession 
        { 
            Timestamp = DateTime.Now,
            Messages = new List<ProcessSessionMessageEventArgs> { args }
        }, args));
    else
        _messages[sessionId].session.Messages.Add(args);
}

Then, when processing messages, use:

await _messages[sessionId].args.CompleteMessageAsync(message);

This ensures the completion happens while the receiver is still open.

You could extend the session lock before completing messages:

await args.RenewSessionLockAsync();

This ensures the session isn’t closed before you complete the messages.

Instead of completing messages from the timer event, enqueue them to a background processing task that runs within the session’s lifecycle:

ConcurrentQueue<ProcessSessionMessageEventArgs> _messageQueue = new ConcurrentQueue<ProcessSessionMessageEventArgs>();

Task.Run(async () =>
{
    while (true)
    {
        if (_messageQueue.TryDequeue(out var args))
        {
            await args.CompleteMessageAsync(args.Message);
        }
        await Task.Delay(100);
    }
});

I refer to these documents for sending and receiving session messages from a session-enabled Service Bus:

  • Sending & Receiving Session Messages
  • Session Processor

Please refer to this git link for the complete code.

Output:

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744802560a4594579.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信