并发运行不正确,提供的通道数超出预期
本文关键字:通道 不正确 运行 并发 | 更新日期: 2023-09-27 18:26:19
我有一个应用程序,它可以使任务同时运行。我们在这里设置MaxDegreeOfParallelism=4
,这意味着在任何时候最多有4个任务同时运行。在这种情况下,我只有4个频道可用。否则为异常
无法获取通道
将被抛出。
每个任务将有一个OutboundDial
实例,因此最多为4个实例。
public class OutboundDial
{
private ChannelResource m_ChannelResource;
private VoiceResource m_VoiceResource;
private TelephonyServer m_TelephonyServer;
private AppointmentReminderResult m_Result = new AppointmentReminderResult();
public OutboundDial(TelephonyServer telephonyServer)
{
m_TelephonyServer = telephonyServer;
}
internal void RunScript(AppointmentReminder callData)
{
try
{
try
{
m_ChannelResource = m_TelephonyServer.GetChannel();
m_VoiceResource = m_ChannelResource.VoiceResource;
}
catch (Exception ex)
{
Console.WriteLine("Could not get channel: {0}",ex.StackTrace);
return;
}
// a long running process of I/O bound operation
生产者-消费者队列是
public static BufferBlock<AppointmentReminder> m_Queue =
new BufferBlock<AppointmentReminder>(new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 4});
BufferBlock是TPL类。TelephontServer在一开始就初始化了。
public static TelephonyServer ts;
ts = new TelephonyServer(sIpaddress, "username", "password");
在消费者部分,我们有:
static async Task Consumer()
{
try
{
while (await m_Queue.OutputAvailableAsync())
{
m_Queue.TryReceive(4, ts); // MaxDegreeOfParallelism = 4
}
}
TryReceive
是一种可拓方法。
public static void TryReceive<T>(this BufferBlock<T> bufferBlock, int count, TelephonyServer ts) where T : AppointmentReminder
{
try
{
for (var i = 0; i < count; i++)
{
T item;
if (bufferBlock.TryReceive(out item))
{
Task t = Task.Run(() =>
{
OutboundDial d = new OutboundDial(ts);
d.RunScript<T>((T)item);
});
}
else
{
break;
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex.StackTrace);
}
}
我的问题是:我在生产者部分向队列添加了10个项目,并在构造函数处设置了一个断点。
我发现代码在构造函数中运行了10次,然后是10次RunScript
,这表明10个任务一起运行,而不是4个。但我只想要4(最大并行度)。因此,我没有足够的可用频道,引发了一个异常。
为什么在我的扩展方法中并发运行不起作用?
-
BufferBlock
并没有真正执行任何内容,因此指定其MaxDegreeOfParallelism
是没有意义的。它之所以有效,是因为ExecutionDataflowBlockOptions
继承自DataflowBlockOptions
,而BufferBlock
构造函数正是这样期望的。 -
您的
Consumer()
可以做到这一点:最多4个项目并执行它们,最多4个项并执行它们、最多4个条目并执行它们等等。由于您从不等待这些执行完成,因此您实际上并没有通过这种方式限制并行度。 -
如果您想将并行度限制为4,可以使用
ActionBlock
,而不是BufferBlock
和Consumer()
:的组合new ActionBlock<AppointmentReminder>( reminder => new OutboundDial(ts).RunScript(reminder), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4});
这样做的目的是为每个提醒执行lambda,但同时最多执行4个,这似乎是您所要求的。虽然我不确定这是否是你所需要的,因为你似乎不会在使用后释放(处置)频道,用于下一次提醒。
问题出在哪里还不清楚,但似乎不是在方法之后调用了cinstructor。我建议您将构造函数代码更改为:
public OutboundDial(TelephonyServer telephonyServer)
{
m_TelephonyServer = telephonyServer;
Console.WriteLine(m_Telephonyserver);
}
然后你会看到构造函数是完整的。
此外,在RunScript中的每一行后面添加一些Console.WriteLine,其中包含有用的信息,然后您就会看到错误的来源。