并发运行不正确,提供的通道数超出预期

本文关键字:通道 不正确 运行 并发 | 更新日期: 2023-09-27 18:26:19

我有一个应用程序,它可以使任务同时运行。我们在这里设置MaxDegreeOfParallelism=4,这意味着在任何时候最多有4个任务同时运行。在这种情况下,我只有4个频道可用。否则为异常

无法获取通道

将被抛出。

每个任务将有一个OutboundDial实例,因此最多为4个实例。

 public class OutboundDial
 {
    private ChannelResource m_ChannelResource;
    private VoiceResource m_VoiceResource;
    private TelephonyServer m_TelephonyServer;
    private AppointmentReminderResult m_Result = new AppointmentReminderResult();
    public OutboundDial(TelephonyServer telephonyServer)
    {
        m_TelephonyServer = telephonyServer;
    }
    internal void RunScript(AppointmentReminder callData) 
    {
        try
        {
            try
            {
                m_ChannelResource = m_TelephonyServer.GetChannel();
                m_VoiceResource = m_ChannelResource.VoiceResource;
            }
            catch (Exception ex)
            {
                Console.WriteLine("Could not get channel: {0}",ex.StackTrace);
                return;
            }
            // a long running process of I/O bound operation

生产者-消费者队列是

 public static BufferBlock<AppointmentReminder> m_Queue =
        new BufferBlock<AppointmentReminder>(new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 4});

BufferBlock是TPL类。TelephontServer在一开始就初始化了。

public static TelephonyServer ts;
ts = new TelephonyServer(sIpaddress, "username", "password");

在消费者部分,我们有:

static async  Task Consumer()
{
    try
    {
        while (await m_Queue.OutputAvailableAsync())
        {
            m_Queue.TryReceive(4, ts); // MaxDegreeOfParallelism = 4
        }
    }

TryReceive是一种可拓方法。

 public static void TryReceive<T>(this BufferBlock<T> bufferBlock, int count, TelephonyServer ts) where T : AppointmentReminder
    {
        try
        {
            for (var i = 0; i < count; i++)
            {
                T item;
                if (bufferBlock.TryReceive(out item))
                {
                    Task t = Task.Run(() =>
                        {
                            OutboundDial d = new OutboundDial(ts);
                            d.RunScript<T>((T)item);
                        });
                }
                else
                {
                    break;
                }
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.StackTrace);
        }
    }

我的问题是:我在生产者部分向队列添加了10个项目,并在构造函数处设置了一个断点。

我发现代码在构造函数中运行了10次,然后是10次RunScript,这表明10个任务一起运行,而不是4个。但我只想要4(最大并行度)。因此,我没有足够的可用频道,引发了一个异常。

为什么在我的扩展方法中并发运行不起作用?

并发运行不正确,提供的通道数超出预期

  1. BufferBlock并没有真正执行任何内容,因此指定其MaxDegreeOfParallelism是没有意义的。它之所以有效,是因为ExecutionDataflowBlockOptions继承自DataflowBlockOptions,而BufferBlock构造函数正是这样期望的。

  2. 您的Consumer()可以做到这一点:最多4个项目并执行它们,最多4个项并执行它们、最多4个条目并执行它们等等。由于您从不等待这些执行完成,因此您实际上并没有通过这种方式限制并行度。

  3. 如果您想将并行度限制为4,可以使用ActionBlock,而不是BufferBlockConsumer():的组合

    new ActionBlock<AppointmentReminder>(
        reminder => new OutboundDial(ts).RunScript(reminder),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4});
    

    这样做的目的是为每个提醒执行lambda,但同时最多执行4个,这似乎是您所要求的。虽然我不确定这是否是你所需要的,因为你似乎不会在使用后释放(处置)频道,用于下一次提醒。

问题出在哪里还不清楚,但似乎不是在方法之后调用了cinstructor。我建议您将构造函数代码更改为:

public OutboundDial(TelephonyServer telephonyServer)
{
    m_TelephonyServer = telephonyServer;
    Console.WriteLine(m_Telephonyserver);
}

然后你会看到构造函数是完整的。

此外,在RunScript中的每一行后面添加一些Console.WriteLine,其中包含有用的信息,然后您就会看到错误的来源。