Azure ServiceBus &;异步-生存,还是毁灭

本文关键字:毁灭 -生存 异步 ServiceBus Azure | 更新日期: 2023-09-27 18:05:34

我在Azure上运行服务总线,每秒泵送大约10-100条消息

最近我已经切换到.net 4.5,并兴奋地重构了所有的代码,使'async'和'await'在每行至少两次,以确保它'正确'完成:)

现在我想知道它实际上是更好还是更坏。如果您能看一下代码片段并让我知道您的想法。我特别担心,如果线程上下文切换不是给我更多的痛苦比好处,从所有的异步…(看着!dumpheap,这绝对是一个因素)

只是一点描述-我将发布2个方法-一个在ConcurrentQueue上做while循环,等待新消息,另一个方法一次发送一个消息。我还按照Azure博士的要求使用了瞬态故障处理模块。

发送循环(从头开始,等待新消息):

private async void SendingLoop()
    {
        try
        {
            await this.RecreateMessageFactory();
            this.loopSemaphore.Reset();
            Buffer<SendMessage> message = null;
            while (true)
            {
                if (this.cancel.Token.IsCancellationRequested)
                {
                    break;
                }
                this.semaphore.WaitOne();
                if (this.cancel.Token.IsCancellationRequested)
                {
                    break;
                }
                while (this.queue.TryDequeue(out message))
                {                       
                    try
                    {
                        using (message)
                        {
                            //only take send the latest message
                            if (!this.queue.IsEmpty)
                            {
                                this.Log.Debug("Skipping qeued message, Topic: " + message.Value.Topic);
                                continue;
                            }
                            else
                            {
                                if (this.Topic == null || this.Topic.Path != message.Value.Topic)
                                    await this.EnsureTopicExists(message.Value.Topic, this.cancel.Token);
                                if (this.cancel.Token.IsCancellationRequested)
                                    break;
                                await this.SendMessage(message, this.cancel.Token);
                            }
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        break;
                    }
                    catch (Exception ex)
                    {
                        ex.LogError();
                    }
                }
            }
        }
        catch (OperationCanceledException)
        { }
        catch (Exception ex)
        {
            ex.LogError();
        }
        finally
        {
            if (this.loopSemaphore != null)
                this.loopSemaphore.Set();
        }
    }

发送消息:

private async Task SendMessage(Buffer<SendMessage> message, CancellationToken cancellationToken)
    {
        //this.Log.Debug("MessageBroadcaster.SendMessage to " + this.GetTopic());
        bool entityNotFound = false;
        if (this.MessageSender.IsClosed)
        {
            //this.Log.Debug("MessageBroadcaster.SendMessage MessageSender closed, recreating " + this.GetTopic());
            await this.EnsureMessageSender(cancellationToken);
        }
        try
        {
            await this.sendMessageRetryPolicy.ExecuteAsync(async () =>
            {
                message.Value.Body.Seek(0, SeekOrigin.Begin);
                using (var msg = new BrokeredMessage(message.Value.Body, false))
                {
                    await Task.Factory.FromAsync(this.MessageSender.BeginSend, this.MessageSender.EndSend, msg, null);
                }
            }, cancellationToken);
        }
        catch (MessagingEntityNotFoundException)
        {
            entityNotFound = true;                
        }
        catch (OperationCanceledException)
        { }
        catch (ObjectDisposedException)
        { }
        catch (Exception ex)
        {
            ex.LogError();
        }
        if (entityNotFound)
        {
            if (!cancellationToken.IsCancellationRequested)
            {
                await this.EnsureTopicExists(message.Value.Topic, cancellationToken);
            }
        }
    }

上面的代码来自一个'Sender'类,它每秒发送1条消息。我有大约50-100个实例在任何给定的时间运行,所以它可能是相当多的线程。

顺便说一句,不要担心过多的ensureessagesender, RecreateMessageFactory, EnsureTopicExists,它们不经常被调用。

我不只是有一个后台线程通过消息队列工作和同步发送消息更好,提供所有我需要的是发送一个消息在同一时间,不担心异步的东西,避免开销随之而来。

请注意,通常发送一个消息到Azure服务总线是几毫秒的问题,它不是很昂贵。(除了有时它很慢,超时或服务总线后端有问题,它可能会挂起一段时间,试图发送东西)。

谢谢,很抱歉写了这么长一篇文章,

Stevo

解决方案

这个例子能解决我的情况吗?

static void Main(string[] args)
    {
        var broadcaster = new BufferBlock<int>(); //queue
        var cancel = new CancellationTokenSource();
        var run = Task.Run(async () =>
        {
            try
            {
                while (true)
                {
                    //check if we are not finished
                    if (cancel.IsCancellationRequested)
                        break;                       
                    //async wait until a value is available
                    var val = await broadcaster.ReceiveAsync(cancel.Token).ConfigureAwait(false);
                    int next = 0;
                    //greedy - eat up and ignore all the values but last
                    while (broadcaster.TryReceive(out next))
                    {
                        Console.WriteLine("Skipping " + val);
                        val = next;
                    }
                    //check if we are not finished
                    if (cancel.IsCancellationRequested)
                        break;
                    Console.WriteLine("Sending " + val);
                    //simulate sending delay
                    await Task.Delay(1000).ConfigureAwait(false); 
                    Console.WriteLine("Value sent " + val);                        
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
        }, cancel.Token);
        //simulate sending messages. One every 200mls 
        for (int i = 0; i < 20; i++)
        {
            Console.WriteLine("Broadcasting " + i);
            broadcaster.Post(i);
            Thread.Sleep(200);
        }
        cancel.Cancel();
        run.Wait();
    }

Azure ServiceBus &;异步-生存,还是毁灭

你说:

上面的代码来自一个'Sender'类,它每秒发送1条消息。我在任何给定时间运行大约50-100个实例,所以它可能是相当多的线程

这是一个async的好例子。你在这里节省了很多线程。Async 减少了上下文切换,因为它不是基于线程的。它不会在需要等待的情况下进行上下文切换。相反,下一个工作项将在同一个线程中处理(如果有的话)。

因此,你的异步解决方案肯定会比同步解决方案伸缩性更好。是否在50-100个工作流实例中使用更少的CPU需要测量。实例越多,异步速度越快的可能性就越大。

现在,实现有一个问题:您使用的ConcurrentQueue不是异步就绪的。所以你实际上使用50-100个线程,甚至在你的异步版本。它们要么会阻塞(这是您想要避免的),要么会忙着等待燃烧100%的CPU(这似乎是您实现的情况!)。您需要消除这个问题,并使队列异步化。也许SemaphoreSlim在这里是有帮助的,因为它可以异步等待。

首先,要记住Task != Thread。任务(和async方法延续)被调度到线程池中,微软在线程池中进行了大量的优化,只要你的任务相当短,就能创造奇迹。

回顾你的代码,有一行出现了一个标志:semaphore.WaitOne。我假设你用这个作为一种信号,表示队列中有可用的数据。这很糟糕,因为它是async方法中的阻塞等待。通过使用阻塞等待,代码从一个轻量级的延续变成了一个更重的线程池线程。

所以,我会遵循@usr的建议,用async就绪队列替换队列(和信号量)。TPL Dataflow的BufferBlock<T>是一个async就绪的生产者/消费者队列,可以通过NuGet获得。我首先推荐这个,因为听起来您的项目可以从更广泛地使用数据流而不仅仅是作为队列(但队列是一个很好的开始)中受益。

存在其他async就绪的数据结构;我的AsyncEx库有几个。自己建一个简单的也不难;我有一篇关于这个主题的博客文章。但是我推荐您使用TPL数据流。