用于异步处理作业的Azure Worker角色
本文关键字:Azure Worker 角色 作业 异步 处理 用于 | 更新日期: 2023-09-27 17:49:26
我正在尝试实现以下用例。我有一个Azure Worker Role,它将监视Azure Storage Queue,当消息传入时,它将触发一个作业异步运行。如果可能的话,我希望使用TPL,并且需要支持取消的操作,以便当Azure Role OnStop触发时,作业可以尽可能优雅地退出。Scott Guthrie发布的MyFixIt示例几乎正是我所需要的,我已经将其用作我项目的模板。不支持的一个关键方面是异步运行作业的需求。在FixIt代码中,一旦启动了一个作业,在该作业完成之前不会处理其他作业。我的应用程序将处理的一些作业是长时间运行的,我需要工作者角色能够注意到其他传入的作业,并在长时间运行的作业运行时运行这些作业。
这里的两个关键方法是ProcessMessagesAsync,它监视队列,以及ProcessMessage,它将在消息传入时运行作业。以下是我所拥有的,它主要工作,除了它不能正确处理CancellationRequest,并且Azure Worker Role将在不等待作业完成的情况下关闭。
/// <summary>
/// Continuous loop that monitors the queue and launches jobs when they are retrieved.
/// </summary>
/// <param name="token"></param>
/// <returns></returns>
public virtual async Task ProcessMessagesAsync(CancellationToken token)
{
CloudQueue queue = _queueClient.GetQueueReference(_queueName);
await queue.CreateIfNotExistsAsync(token);
while (!token.IsCancellationRequested)
{
Debug.WriteLine("inLoop");
// The default timeout is 90 seconds, so we won’t continuously poll the queue if there are no messages.
// Pass in a cancellation token, because the operation can be long-running.
CloudQueueMessage message = await queue.GetMessageAsync(token);
if (message != null)
{
ProcessMessage(message, queue, token);
}
else
{
await Task.Delay(500, token);
}
}
}
protected virtual async Task ProcessMessage(CloudQueueMessage message, CloudQueue queue, CancellationToken token)
{
var jobDetails = JobDetails.DeserializeJson(message.AsString);
var result = await _jobRunner.RunJob(jobDetails, token);
//todo handle error
//if (result.Status == JobStatus.Error)
await queue.DeleteMessageAsync(message);
}
然后JobRunner运行请求的作业。我写了一个TestJob,我试图模拟一个长时间运行的作业,它可以注意到CancellationRequest,并在短暂的清理期后,提前退出作业。
public virtual async Task<JobResult> RunJob(JobDetails jobDetails, CancellationToken token)
{
switch (jobDetails.JobName.ToLower())
{
case "testjob":
return await TestJob(jobDetails.Args, token);
}
return new JobResult(JobStatus.Error) { ErrorMessage = "The job requested does not exist." };
}
protected virtual async Task<JobResult> TestJob(List<string> jobArgs, CancellationToken token)
{
var message = "no message";
if (jobArgs != null && jobArgs.Any())
message = jobArgs[0];
return await Task.Run(async () =>
{
Debug.WriteLine(string.Format("Start:{0}", message));
for (int i = 1; i <= 800; i++)
{
if (token.IsCancellationRequested)
{
Debug.WriteLine("CancelationRequest in TestJob");
//simulate short time to cleanup and exit early
Thread.Sleep(1500);
Debug.WriteLine("Cancelation Job Cleanup finsihed.");
token.ThrowIfCancellationRequested();
}
Thread.Sleep(10);
}
Debug.WriteLine(string.Format("Finish:{0}", message));
return new JobResult(JobStatus.Success);
});
}
我已经搜索和研究了2天了,包括TPL数据流库,还没有能够想出一个方法来使这个工作正常。我觉得对ProcessMessage(消息,队列,令牌)的调用没有正确完成,甚至有编译器警告"因为这个调用没有等待……"。但我不想等待(这是FixIt的例子所做的),因为没有其他的工作被注意到,直到运行的一个完成。这似乎不是一个不常见的用例,尽管我似乎找不到任何人描述它。
提前感谢您的帮助!
丹尼绿色
发生这种情况的原因是因为您没有执行从ProcessMessage返回的任务。因此,ProcessMessageAsync可以在ProcessMessage优雅地完成或取消之前完成。请记住,您不希望等待ProcessMessage,因为它将使消息处理顺序,因此我建议您保留一个正在运行的任务列表。换句话说,在ProcessMessageAsync中创建一个列表,并将从ProcessMessage返回的任务添加到该列表中。然后在while循环结束时,如果token被取消,则应该循环遍历此列表以取消所有挂起的任务。
很抱歉,我手边没有VS,但我希望你能明白。
谢谢你Sanjay,根据你的建议,我提出了以下建议。
/// <summary>
/// Continuous loop that monitors the queue and launches jobs when they are retrieved.
/// </summary>
/// <param name="token"></param>
/// <returns></returns>
public virtual async Task ProcessMessagesAsync(CancellationToken token)
{
CloudQueue queue = _queueClient.GetQueueReference(_queueName);
await queue.CreateIfNotExistsAsync(token);
var runningTasks = new ConcurrentDictionary<int, Task>();
while (!token.IsCancellationRequested)
{
Debug.WriteLine("inLoop");
// The default timeout is 90 seconds, so we won’t continuously poll the queue if there are no messages.
// Pass in a cancellation token, because the operation can be long-running.
CloudQueueMessage message = await queue.GetMessageAsync(token);
if (message != null)
{
var t = ProcessMessage(message, queue, token);
var c = t.ContinueWith(z => RemoveRunningTask(t.Id, runningTasks));
while (true)
{
if (runningTasks.TryAdd(t.Id, t))
break;
Task.Delay(25);
}
}
else
{
try
{
await Task.Delay(500, token);
}
catch (Exception ex)
{
Debug.WriteLine(ex.Message);
}
}
}
while (!runningTasks.IsEmpty)
{
Debug.WriteLine("Waiting for running tasks");
Task.Delay(500);
}
}
private static void RemoveRunningTask(int id, ConcurrentDictionary<int, Task> runningTasks)
{
while (true)
{
Task outTask;
if (runningTasks.TryRemove(id, out outTask))
break;
Task.Delay(25);
}
}
这似乎工作,虽然我觉得它是有点笨拙。我开始像这样编码'ContinueWith',但惊讶的是,传入的任务有一个不同的Id值(我期望它是相同的任务):
var task = ProcessMessage(message, queue, token).ContinueWith(x =>
{
while (true)
{
Task outTask;
if (runningTasks.TryRemove(x.Id, out outTask))
break;
Task.Delay(25);
}
});
更新:事实证明,这仍然不太工作,我不知何故误读了测试时的结果。根据MyFixIt的例子,在工作角色OnStop中,我有以下代码:
public override void OnStop()
{
Debug.WriteLine("OnStop_Begin");
tokenSource.Cancel();
tokenSource.Token.WaitHandle.WaitOne();
base.OnStop();
Debug.WriteLine("Onstop_End");
tokenSource.Dispose();
}
看起来tokensource . token . waithhandle . waitone并不能真正等到所有对令牌有引用的任务都完成,所以角色继续并停止,即使任务仍处于结束的处理中。是否有一些方法可以正确地使用令牌来发出取消实际完成的信号?
谢谢!
更新2
好吧,我想我有一个解决方案,现在是有效的。看起来CancellationToken。调用。cancel时发出WaitHandle信号,所以我不确定在调用。cancel后立即使用它的目的是什么,似乎它总是会立即通过该代码继续?这就是FixIt示例中的情况,但我真的不理解它。出于我的目的,我已经将ProcessMessagesAsync更改为现在在ManualResetEventSlim中传递,然后在所有任务完成后设置它。然后在OnStop中,我在完成Stop之前等待。
/// <summary>
/// Continuous loop that monitors the queue and launches jobs when they are retrieved.
/// </summary>
/// <param name="token"></param>
/// <returns></returns>
public virtual async Task ProcessMessagesAsync(CancellationToken token, ManualResetEventSlim reset)
{
CloudQueue queue = _queueClient.GetQueueReference(_queueName);
await queue.CreateIfNotExistsAsync(token);
var runningTasks = new ConcurrentDictionary<int, Task>();
while (!token.IsCancellationRequested)
{
Debug.WriteLine("inLoop");
// The default timeout is 90 seconds, so we won’t continuously poll the queue if there are no messages.
// Pass in a cancellation token, because the operation can be long-running.
CloudQueueMessage message = await queue.GetMessageAsync(token);
if (message != null)
{
var t = ProcessMessage(message, queue, token);
var c = t.ContinueWith(z => RemoveRunningTask(t.Id, runningTasks));
while (true)
{
if (runningTasks.TryAdd(t.Id, t))
break;
await Task.Delay(25);
}
}
else
{
try
{
await Task.Delay(500, token);
}
catch (Exception ex)
{
Debug.WriteLine(ex.Message);
}
}
}
while (!runningTasks.IsEmpty)
{
Debug.WriteLine("Waiting for running tasks");
await Task.Delay(500);
}
Debug.WriteLine("All tasks have finished, exiting ProcessMessagesAsync.");
reset.Set();
}
public override void OnStop()
{
Debug.WriteLine("OnStop_Begin");
tokenSource.Cancel();
tokenSource.Token.WaitHandle.WaitOne();
_reset.Wait();
base.OnStop();
Debug.WriteLine("Onstop_End");
tokenSource.Dispose();
}