类似WebJob的队列上的Azure WorkerRole触发器

本文关键字:WorkerRole 触发器 Azure 队列 WebJob 类似 | 更新日期: 2023-09-27 17:59:47

我习惯于在Azure上使用webjob来触发Azure队列。它就像一个符咒。

Azure教程webjob+队列

static void Main(string[] args)
{
    JobHost host = new JobHost();
    host.RunAndBlock();
}
public static void ProcessQueueMessage([QueueTrigger("logqueue")] string logMessage, TextWriter logger)
{
    logger.WriteLine(logMessage);
}

queueTrigger真正的好处是,在消息触发的过程没有完成之前,消息保持不可见(而不是删除)。因此,如果您关闭网络作业(例如,用于网络作业更新),则消息将在队列中可见(经过一点超时),由更新的网络作业处理(完美)。

现在我想做同样的事情,只是扮演一个工人的角色。今天我确实喜欢这个。

while (true)
{
     var cloudMessage = await sourceImportationQueue.GetMessageAsync();
     if (cloudMessage != null)
           sourceImportationQueue.DeleteMessage(cloudMessage);
      // process my job (few hours)
     else
           await Task.Delay(1000 * 5);
}

但如果我在工作期间阻止了那个工人,我就失去了信息。那么我该如何像webJob一样触发呢?

类似WebJob的队列上的Azure WorkerRole触发器

最后我找到了一个简单的解决方案。在运行几个小时的工作之前,我启动了一个任务KeepHiddenMessageAsync,该任务会随着超时更新消息。在超时结束之前,将对消息进行新的更新。如果出现问题,则将达到消息的超时时间,并且消息将变为可见。

        private bool jobIsComplete;
        private void Run()
        {
             while (true)
            {
                 jobIsComplete = false;
                 //get the message
                 var cloudMessage = await queue.GetMessageAsync();
                 if (cloudMessage != null)
                        //run the task to keep the message until end of the job and worker role stopping for an update for example 
                       var keepHiddenMessageTask = KeepHiddenMessageAsync(cloudMessage);
                        //
                        // process my job (few hours)
                        //
                      jobIsComplete = true;
                      await keepHiddenMessageTask;
                      await _queue.DeleteMessageAsync(cloudMessage);
                 else
                       await Task.Delay(1000 * 5);
            }
        }
        private async Task KeepHiddenMessageAsync(CloudQueueMessage iCloudQueueMessage)
        {
            while (true)
            {
                //Update message and hidding during 5 new minutes
                await _queue.UpdateMessageAsync(iCloudQueueMessage, TimeSpan.FromMinutes(5), MessageUpdateFields.Visibility);
                //Wait 4 minutes
                for (int i = 0; i < 60 * 4; i++)
                {
                    if (JobIsComplete)
                        return;
                    else
                        await Task.Delay(1000);
                }
            }
        }

默认情况下,一旦检索到队列消息,它将在5分钟内不可见。在此延迟之后,如果消息尚未从队列中删除,它将再次可见,以便可以再次处理。

在您的代码示例中,您将在从队列中获得消息后立即删除该消息。如果您想确保邮件的安全,则只应在流程结束时删除该邮件。您是否尝试在处理作业结束时移动sourceImportationQueue.DeleteMessage(cloudMessage);

如果不使用某种持久存储来跟踪您的工作进度,可能无法解决此问题。如前所述,您将在作业开始前删除消息,因此,如果作业因任何原因(包括停止角色)失败,则消息将丢失。消息的最长锁定期为5分钟,这意味着当作业仍在运行时,消息将再次出现,如果删除操作移到最后,则会因丢失锁定而失败。

如果您的长时间运行的作业由多个较小的步骤组成,其中没有一个步骤超过5分钟,则您可以定期调用RenewLock()来保留消息的锁定,并阻止消息再次出现在队列中。只要锁永远不会过期,那么在这种情况下,最后的DeleteMessage就会成功。不过,这可能不太可能符合您的情况。

一个可能的解决方案是在处理作业的整个过程中将作业状态写入Azure表和记录状态。您的工作者角色循环将检查表中是否有任何尚未完成的作业,并继续任何现有作业,如果没有找到,请检查服务总线中是否有新作业。这个解决方案还可以让你有机会从失败的工作开始,而不是从头开始你的2小时工作。