用于异步处理作业的Azure Worker角色

本文关键字:Azure Worker 角色 作业 异步 处理 用于 | 更新日期: 2023-09-27 17:49:26

我正在尝试实现以下用例。我有一个Azure Worker Role,它将监视Azure Storage Queue,当消息传入时,它将触发一个作业异步运行。如果可能的话,我希望使用TPL,并且需要支持取消的操作,以便当Azure Role OnStop触发时,作业可以尽可能优雅地退出。Scott Guthrie发布的MyFixIt示例几乎正是我所需要的,我已经将其用作我项目的模板。不支持的一个关键方面是异步运行作业的需求。在FixIt代码中,一旦启动了一个作业,在该作业完成之前不会处理其他作业。我的应用程序将处理的一些作业是长时间运行的,我需要工作者角色能够注意到其他传入的作业,并在长时间运行的作业运行时运行这些作业。

这里的两个关键方法是ProcessMessagesAsync,它监视队列,以及ProcessMessage,它将在消息传入时运行作业。以下是我所拥有的,它主要工作,除了它不能正确处理CancellationRequest,并且Azure Worker Role将在不等待作业完成的情况下关闭。

        /// <summary>
    /// Continuous loop that monitors the queue and launches jobs when they are retrieved.
    /// </summary>
    /// <param name="token"></param>
    /// <returns></returns>
    public virtual async Task ProcessMessagesAsync(CancellationToken token)
    {
        CloudQueue queue = _queueClient.GetQueueReference(_queueName);
        await queue.CreateIfNotExistsAsync(token);
        while (!token.IsCancellationRequested)
        {
            Debug.WriteLine("inLoop");
            // The default timeout is 90 seconds, so we won’t continuously poll the queue if there are no messages.
            // Pass in a cancellation token, because the operation can be long-running.
            CloudQueueMessage message = await queue.GetMessageAsync(token);
            if (message != null)
            {
                ProcessMessage(message, queue, token);
            }
            else
            {
                await Task.Delay(500, token);
            }
        }
    }

    protected virtual async Task ProcessMessage(CloudQueueMessage message, CloudQueue queue, CancellationToken token)
    {
        var jobDetails = JobDetails.DeserializeJson(message.AsString);
        var result = await _jobRunner.RunJob(jobDetails, token);
        //todo handle error
        //if (result.Status == JobStatus.Error)
        await queue.DeleteMessageAsync(message);
    }

然后JobRunner运行请求的作业。我写了一个TestJob,我试图模拟一个长时间运行的作业,它可以注意到CancellationRequest,并在短暂的清理期后,提前退出作业。

    public virtual async Task<JobResult> RunJob(JobDetails jobDetails, CancellationToken token)
    {
        switch (jobDetails.JobName.ToLower())
        {
            case "testjob":
                return await TestJob(jobDetails.Args, token);
        }
        return new JobResult(JobStatus.Error) { ErrorMessage = "The job requested does not exist." };
    }
    protected virtual async Task<JobResult> TestJob(List<string> jobArgs, CancellationToken token)
    {
        var message = "no message";
        if (jobArgs != null && jobArgs.Any())
            message = jobArgs[0];
        return await Task.Run(async () =>
        {
            Debug.WriteLine(string.Format("Start:{0}", message));
            for (int i = 1; i <= 800; i++)
            {
                if (token.IsCancellationRequested)
                {
                    Debug.WriteLine("CancelationRequest in TestJob");
                    //simulate short time to cleanup and exit early
                    Thread.Sleep(1500);
                    Debug.WriteLine("Cancelation Job Cleanup finsihed.");
                    token.ThrowIfCancellationRequested();
                }
                Thread.Sleep(10);
            }
            Debug.WriteLine(string.Format("Finish:{0}", message));
            return new JobResult(JobStatus.Success);
        });
    }

我已经搜索和研究了2天了,包括TPL数据流库,还没有能够想出一个方法来使这个工作正常。我觉得对ProcessMessage(消息,队列,令牌)的调用没有正确完成,甚至有编译器警告"因为这个调用没有等待……"。但我不想等待(这是FixIt的例子所做的),因为没有其他的工作被注意到,直到运行的一个完成。这似乎不是一个不常见的用例,尽管我似乎找不到任何人描述它。

提前感谢您的帮助!

丹尼绿色

用于异步处理作业的Azure Worker角色

发生这种情况的原因是因为您没有执行从ProcessMessage返回的任务。因此,ProcessMessageAsync可以在ProcessMessage优雅地完成或取消之前完成。请记住,您不希望等待ProcessMessage,因为它将使消息处理顺序,因此我建议您保留一个正在运行的任务列表。换句话说,在ProcessMessageAsync中创建一个列表,并将从ProcessMessage返回的任务添加到该列表中。然后在while循环结束时,如果token被取消,则应该循环遍历此列表以取消所有挂起的任务。

很抱歉,我手边没有VS,但我希望你能明白。

谢谢你Sanjay,根据你的建议,我提出了以下建议。

      /// <summary>
    /// Continuous loop that monitors the queue and launches jobs when they are retrieved.
    /// </summary>
    /// <param name="token"></param>
    /// <returns></returns>
    public virtual async Task ProcessMessagesAsync(CancellationToken token)
    {
        CloudQueue queue = _queueClient.GetQueueReference(_queueName);
        await queue.CreateIfNotExistsAsync(token);
        var runningTasks = new ConcurrentDictionary<int, Task>();
        while (!token.IsCancellationRequested)
        {
            Debug.WriteLine("inLoop");
            // The default timeout is 90 seconds, so we won’t continuously poll the queue if there are no messages.
            // Pass in a cancellation token, because the operation can be long-running.
            CloudQueueMessage message = await queue.GetMessageAsync(token);
            if (message != null)
            {
                var t = ProcessMessage(message, queue, token);
                var c = t.ContinueWith(z => RemoveRunningTask(t.Id, runningTasks));
                while (true)
                {
                    if (runningTasks.TryAdd(t.Id, t))
                        break;
                    Task.Delay(25);
                }                                    
            }                    
            else
            {
                try
                {
                    await Task.Delay(500, token);
                }
                catch (Exception ex)
                {
                    Debug.WriteLine(ex.Message);
                }
            }
        }
        while (!runningTasks.IsEmpty)
        {
            Debug.WriteLine("Waiting for running tasks");
            Task.Delay(500);
        }
    }
    private static void RemoveRunningTask(int id, ConcurrentDictionary<int, Task> runningTasks)
    {
        while (true)
        {
            Task outTask;
            if (runningTasks.TryRemove(id, out outTask))
                break;
            Task.Delay(25);
        }
    }

这似乎工作,虽然我觉得它是有点笨拙。我开始像这样编码'ContinueWith',但惊讶的是,传入的任务有一个不同的Id值(我期望它是相同的任务):

                    var task = ProcessMessage(message, queue, token).ContinueWith(x =>
                {
                    while (true)
                    {
                        Task outTask;
                        if (runningTasks.TryRemove(x.Id, out outTask))
                            break;
                        Task.Delay(25);
                    }
                });

更新:事实证明,这仍然不太工作,我不知何故误读了测试时的结果。根据MyFixIt的例子,在工作角色OnStop中,我有以下代码:

        public override void OnStop()
    {
        Debug.WriteLine("OnStop_Begin");
        tokenSource.Cancel();
        tokenSource.Token.WaitHandle.WaitOne();
        base.OnStop();
        Debug.WriteLine("Onstop_End");
        tokenSource.Dispose();
    }

看起来tokensource . token . waithhandle . waitone并不能真正等到所有对令牌有引用的任务都完成,所以角色继续并停止,即使任务仍处于结束的处理中。是否有一些方法可以正确地使用令牌来发出取消实际完成的信号?

谢谢!

更新2

好吧,我想我有一个解决方案,现在是有效的。看起来CancellationToken。调用。cancel时发出WaitHandle信号,所以我不确定在调用。cancel后立即使用它的目的是什么,似乎它总是会立即通过该代码继续?这就是FixIt示例中的情况,但我真的不理解它。出于我的目的,我已经将ProcessMessagesAsync更改为现在在ManualResetEventSlim中传递,然后在所有任务完成后设置它。然后在OnStop中,我在完成Stop之前等待。

       /// <summary>
    /// Continuous loop that monitors the queue and launches jobs when they are retrieved.
    /// </summary>
    /// <param name="token"></param>
    /// <returns></returns>
    public virtual async Task ProcessMessagesAsync(CancellationToken token, ManualResetEventSlim reset)
    {
        CloudQueue queue = _queueClient.GetQueueReference(_queueName);
        await queue.CreateIfNotExistsAsync(token);
        var runningTasks = new ConcurrentDictionary<int, Task>();
        while (!token.IsCancellationRequested)
        {
            Debug.WriteLine("inLoop");
            // The default timeout is 90 seconds, so we won’t continuously poll the queue if there are no messages.
            // Pass in a cancellation token, because the operation can be long-running.
            CloudQueueMessage message = await queue.GetMessageAsync(token);
            if (message != null)
            {
                var t = ProcessMessage(message, queue, token);
                var c = t.ContinueWith(z => RemoveRunningTask(t.Id, runningTasks));

                while (true)
                {
                    if (runningTasks.TryAdd(t.Id, t))
                        break;
                    await Task.Delay(25);
                }                                    
            }                    
            else
            {
                try
                {
                    await Task.Delay(500, token);
                }
                catch (Exception ex)
                {
                    Debug.WriteLine(ex.Message);
                }
            }
        }
        while (!runningTasks.IsEmpty)
        {
            Debug.WriteLine("Waiting for running tasks");
            await Task.Delay(500);
        }
        Debug.WriteLine("All tasks have finished, exiting ProcessMessagesAsync.");
        reset.Set();
    }
        public override void OnStop()
    {
        Debug.WriteLine("OnStop_Begin");
        tokenSource.Cancel();
        tokenSource.Token.WaitHandle.WaitOne();
        _reset.Wait();
        base.OnStop();
        Debug.WriteLine("Onstop_End");
        tokenSource.Dispose();
    }