从Parallel.ForEach循环内部调用异步方法时出现无效操作异常

本文关键字:无效 操作 异常 异步方法 ForEach Parallel 循环 内部 调用 | 更新日期: 2023-09-27 18:08:46

我继承了一个windows服务,该服务处理队列中的大量电子邮件。听起来很简单,抓取队列,发送电子邮件,如果SmtpClient.SendAsync没有从回调中返回错误,则在DB中标记电子邮件为已发送。。我正在线程上使用Semaphore来等待one,这样就可以对SMTP客户端的异步发送方法进行多次调用。这是我获得状态的唯一方法,根据Microsoft文档,它必须在另一个调用异步之前完成操作。所以现在是有趣的部分。我决定使用Parallel.ForEach来获得这样的队列。这个方法在Windows服务OnStart中调用。请注意,我曾尝试在一个单独的线程上调用此方法,但得到了相同的结果。

我在想,要么是A,由于我缺乏线程方面的知识,我错过了一些明显的东西,要么是什么东西出了问题。最有可能A.

 private static void ProcessEmailQueue()
    {
        List<EmailQueue> emailQueue =
            _repository.Select<EmailQueue>().Where(x => x.EmailStatuses.EmailStatus == "Pending").ToList();
        Parallel.ForEach(emailQueue, message =>
                                         {
                                             _smtpMail.FromAddress = message.FromAddress;
                                             _smtpMail.ToAddress = message.ToAddress;
                                             _smtpMail.Subject = message.Subject;
                                             _smtpMail.SendAsHtml = message.IsHtml > 0;
                                             _smtpMail.MessageBody = message.MessageBody;
                                             _smtpMail.UserToken = message.EmailQueueID;
                                             bool sendStatus = _smtpMail.SendMessage();
                                                 // THIS BLOWS UP with InvalidOperation Exception
                                         });
    }

这是从循环中调用的SMTP方法。

public bool SendMessage()
    {
        mailSendSemaphore = new Semaphore(0, 10); // This is defined as  private static Semaphore mailSendSemaphore;
        try
        {
            var fromAddress = new MailAddress(FromAddress);
            var toAddress = new MailAddress(ToAddress);
            using (var mailMessage = new MailMessage(fromAddress, toAddress))
            {
                mailMessage.Subject = Subject;
                mailMessage.IsBodyHtml = SendAsHtml;
                mailMessage.Body = MessageBody;
                Envelope = mailMessage;
                smtp.SendCompleted += smtp_SendCompleted;
                smtp.SendAsync(mailMessage, UserToken);
                mailSendSemaphore.WaitOne();
                return _mailSent;
            }
        }
        catch (Exception exception)
        {
            _logger.Error(exception);
            return _mailSent;
        }
    }

回拨Smtp发送

 private void smtp_SendCompleted(object sender, AsyncCompletedEventArgs e)
    {
        if (e.Cancelled)
        {
        }
        if (e.Error != null)
        {
        }
        else
        {
            _mailSent = true;
        }
        mailSendSemaphore.Release(2);
    }

这是例外,由于一些奇怪的原因,花了一些时间才得到它。

System.InvalidOperationException was unhandled by user code

消息=异步调用已在进行中。必须先完成或取消它,然后才能调用此方法。来源=系统StackTrace:位于System.Net.Mail.SmtpClient.SendAsync(MailMessage消息,对象userToken(位于SmtpMail.cs:line 71中的DFW.Infrastructure.Communications.SmtpMail.SendMessage((在Service1.cs:line 57中的EmaiProcessorService.EmailQueueService.b0(EmailQueue消息(位于System.Threading.Tasks.Parallel。<>c_DisplayClass2d 2.<ForEachWorker>b__23(Int32 i) at System.Threading.Tasks.Parallel.<>c__DisplayClassf 1.b_c((内部异常:

我的等待时间似乎被System.Threading.Tasks.Parallel 抹杀了

从Parallel.ForEach循环内部调用异步方法时出现无效操作异常

好吧,现在我们已经得到了错误文本,它看起来相当清楚:

消息=异步调用已在进行中。必须先完成或取消它,然后才能调用此方法。

这与文件一致:

两个简单的选项:

  • 创建固定数量的客户端和要发送的消息队列。让每个客户端在每次完成时从队列中获取一条消息,直到队列为空。BlockingCollection<T>对此有好处。

  • 为每条消息创建一个新的SmtpClient。这可能会导致你在SMTP服务器上有效地发起DOS攻击,这并不理想。

老实说,当你只是在等待消息发送时,还不清楚为什么要使用SendAsync。。。

我不清楚你为什么在这里使用Semaphore,但几乎可以肯定你使用错误了。您正在为每次对SendMessage的调用创建一个新的信号量实例。此外,您对它调用WaitOne一次,然后调用Release(2),所以最终您将获得比获得更多的发布。这可能就是导致InvalidOperationException的原因。

并行处理电子邮件队列对您没有任何好处,因为您一次只能发送一条消息。而尝试在Parallel.Foreach内部异步执行这项操作则是不必要的复杂操作。

您最好使用类似ThreadPool.QueueUserWorkItem的东西,并使用一个一次发送一条消息的简单循环。

List<EmailQueue> emailQueue =
    _repository.Select<EmailQueue>().Where(x => x.EmailStatuses.EmailStatus == "Pending").ToList();
ThreadPool.QueueUserWorkItem(ProcessEmailQueue, emailQueue);
void ProcessEmailQueue(object state)
{
    List<EmailQueue> emailQueue = (List<EmailQueue>)state;
    foreach (var message in EmailQueue)
    {
        // Format and send message here.
    }
}

或者,您也可以使用Task执行同样的操作。关键是,您只需要一个线程来顺序处理队列。由于一次不能发送多条消息,Parallel.ForEach对您没有任何好处。

编辑:

如果您需要一次进行多次发送,您可能可以修改原始代码。首先,在类作用域初始化信号量:

private static Semaphore mailSendSemaphore = new Semaphore(10, 10);

然后,在您的SendMessage方法中:

bool SendMessage()
{
    // acquire semaphore. This will block until there's a slot available.
    mailSendSemaphore.WaitOne();
    try
    {
        // do all your processing here, including sending the message.
        // use Send rather than SendAsync
    }
    finally
    {
        mailSendSemaphore.Release();
    }
}

不需要使用SendAsync