长时间运行作业的模式.带有事务的Azure服务总线
本文关键字:Azure 事务 服务 总线 运行 作业 模式 长时间 | 更新日期: 2023-09-27 18:07:17
我想在Azure服务总线上使用事务,但是我有一些消息是网络/API绑定的,我根本无法可靠地重构它们,使它们在不到5分钟内完成——这是允许的最大PeekLock持续时间。
我找不到任何允许我扩展锁的API,所以可能有另一种模式。
一个可能的解决方案:
1)使用现有实现接收消息。如果从主题/队列获取需要长时间运行的事务-使用新的ScheduledEnqueueTimeUtc更新消息并发送回服务总线。
myMessage.ScheduledEnqueueTimeUtc = TimeSpan.FromMinutes(actualLockDuration);
serviceBusClient.PublishMessage(topic, myMessage);
2)通过MessageId获取特定的消息,并将该新消息标记为完整。
if (oldMessage.LockedUntilUtc > DateTime.UtcNow) {
var message = FetchMessage(oldMessage.MessageId);
message.Complete();
} else {
oldMessage.Complete();
}
在第二个想法,寻找一个API来获取消息的messageId -我没有看到一个。如果我通过序列Id获取,那么我需要一种方法来获得步骤1之后的序列Id -然后我需要重新考虑一些内部系统(大消息处理,消息日志和相关性等)
我不知道我怎么会错过这个。BrokeredMessage.RenewLock
我在它周围写了一个小的异步包装器来更新直到最大持续时间。
public static async Task<ProcessMessageReturn> RenewLockAfter(this Task<ProcessMessageReturn> processTask, BrokeredMessage message, int maxDuration)
{
var ss = new SemaphoreSlim(2);
var startTime = DateTime.UtcNow;
var trackedTasks = new List<Task> {processTask};
var timeoutCancellationTokenSource = new CancellationTokenSource();
while (true)
{
ss.Wait(timeoutCancellationTokenSource.Token);
if (startTime.AddMinutes(maxDuration) < DateTime.UtcNow)
{
var task = Task.Run(async () =>
{
await Task.Delay(TimeSpan.FromTicks(message.LockedUntilUtc.Ticks - DateTime.UtcNow.AddSeconds(30).Ticks), timeoutCancellationTokenSource.Token);
await message.RenewLockAsync();
ss.Release();
}, timeoutCancellationTokenSource.Token);
trackedTasks.Add(task);
}
var completedTask = await Task.WhenAny(trackedTasks);
if (completedTask != processTask) continue;
timeoutCancellationTokenSource.Cancel();
return processTask.Result;
}
}