从Parallel.ForEach循环内部调用异步方法时出现无效操作异常
本文关键字:无效 操作 异常 异步方法 ForEach Parallel 循环 内部 调用 | 更新日期: 2023-09-27 18:08:46
我继承了一个windows服务,该服务处理队列中的大量电子邮件。听起来很简单,抓取队列,发送电子邮件,如果SmtpClient.SendAsync没有从回调中返回错误,则在DB中标记电子邮件为已发送。。我正在线程上使用Semaphore来等待one,这样就可以对SMTP客户端的异步发送方法进行多次调用。这是我获得状态的唯一方法,根据Microsoft文档,它必须在另一个调用异步之前完成操作。所以现在是有趣的部分。我决定使用Parallel.ForEach来获得这样的队列。这个方法在Windows服务OnStart中调用。请注意,我曾尝试在一个单独的线程上调用此方法,但得到了相同的结果。
我在想,要么是A,由于我缺乏线程方面的知识,我错过了一些明显的东西,要么是什么东西出了问题。最有可能A.
private static void ProcessEmailQueue()
{
List<EmailQueue> emailQueue =
_repository.Select<EmailQueue>().Where(x => x.EmailStatuses.EmailStatus == "Pending").ToList();
Parallel.ForEach(emailQueue, message =>
{
_smtpMail.FromAddress = message.FromAddress;
_smtpMail.ToAddress = message.ToAddress;
_smtpMail.Subject = message.Subject;
_smtpMail.SendAsHtml = message.IsHtml > 0;
_smtpMail.MessageBody = message.MessageBody;
_smtpMail.UserToken = message.EmailQueueID;
bool sendStatus = _smtpMail.SendMessage();
// THIS BLOWS UP with InvalidOperation Exception
});
}
这是从循环中调用的SMTP方法。
public bool SendMessage()
{
mailSendSemaphore = new Semaphore(0, 10); // This is defined as private static Semaphore mailSendSemaphore;
try
{
var fromAddress = new MailAddress(FromAddress);
var toAddress = new MailAddress(ToAddress);
using (var mailMessage = new MailMessage(fromAddress, toAddress))
{
mailMessage.Subject = Subject;
mailMessage.IsBodyHtml = SendAsHtml;
mailMessage.Body = MessageBody;
Envelope = mailMessage;
smtp.SendCompleted += smtp_SendCompleted;
smtp.SendAsync(mailMessage, UserToken);
mailSendSemaphore.WaitOne();
return _mailSent;
}
}
catch (Exception exception)
{
_logger.Error(exception);
return _mailSent;
}
}
回拨Smtp发送
private void smtp_SendCompleted(object sender, AsyncCompletedEventArgs e)
{
if (e.Cancelled)
{
}
if (e.Error != null)
{
}
else
{
_mailSent = true;
}
mailSendSemaphore.Release(2);
}
这是例外,由于一些奇怪的原因,花了一些时间才得到它。
System.InvalidOperationException was unhandled by user code
消息=异步调用已在进行中。必须先完成或取消它,然后才能调用此方法。来源=系统StackTrace:位于System.Net.Mail.SmtpClient.SendAsync(MailMessage消息,对象userToken(位于SmtpMail.cs:line 71中的DFW.Infrastructure.Communications.SmtpMail.SendMessage((在Service1.cs:line 57中的EmaiProcessorService.EmailQueueService.b0(EmailQueue消息(位于System.Threading.Tasks.Parallel。<>c_DisplayClass2d 2.<ForEachWorker>b__23(Int32 i)
at System.Threading.Tasks.Parallel.<>c__DisplayClassf
1.b_c((内部异常:
我的等待时间似乎被System.Threading.Tasks.Parallel 抹杀了
好吧,现在我们已经得到了错误文本,它看起来相当清楚:
消息=异步调用已在进行中。必须先完成或取消它,然后才能调用此方法。
这与文件一致:
两个简单的选项:
-
创建固定数量的客户端和要发送的消息队列。让每个客户端在每次完成时从队列中获取一条消息,直到队列为空。
BlockingCollection<T>
对此有好处。 -
为每条消息创建一个新的
SmtpClient
。这可能会导致你在SMTP服务器上有效地发起DOS攻击,这并不理想。
老实说,当你只是在等待消息发送时,还不清楚为什么要使用SendAsync
。。。
我不清楚你为什么在这里使用Semaphore,但几乎可以肯定你使用错误了。您正在为每次对SendMessage
的调用创建一个新的信号量实例。此外,您对它调用WaitOne
一次,然后调用Release(2)
,所以最终您将获得比获得更多的发布。这可能就是导致InvalidOperationException
的原因。
并行处理电子邮件队列对您没有任何好处,因为您一次只能发送一条消息。而尝试在Parallel.Foreach
内部异步执行这项操作则是不必要的复杂操作。
您最好使用类似ThreadPool.QueueUserWorkItem
的东西,并使用一个一次发送一条消息的简单循环。
List<EmailQueue> emailQueue =
_repository.Select<EmailQueue>().Where(x => x.EmailStatuses.EmailStatus == "Pending").ToList();
ThreadPool.QueueUserWorkItem(ProcessEmailQueue, emailQueue);
void ProcessEmailQueue(object state)
{
List<EmailQueue> emailQueue = (List<EmailQueue>)state;
foreach (var message in EmailQueue)
{
// Format and send message here.
}
}
或者,您也可以使用Task
执行同样的操作。关键是,您只需要一个线程来顺序处理队列。由于一次不能发送多条消息,Parallel.ForEach
对您没有任何好处。
编辑:
如果您需要一次进行多次发送,您可能可以修改原始代码。首先,在类作用域初始化信号量:
private static Semaphore mailSendSemaphore = new Semaphore(10, 10);
然后,在您的SendMessage
方法中:
bool SendMessage()
{
// acquire semaphore. This will block until there's a slot available.
mailSendSemaphore.WaitOne();
try
{
// do all your processing here, including sending the message.
// use Send rather than SendAsync
}
finally
{
mailSendSemaphore.Release();
}
}
不需要使用SendAsync
。