长时间运行作业的模式.带有事务的Azure服务总线

本文关键字:Azure 事务 服务 总线 运行 作业 模式 长时间 | 更新日期: 2023-09-27 18:07:17

我想在Azure服务总线上使用事务,但是我有一些消息是网络/API绑定的,我根本无法可靠地重构它们,使它们在不到5分钟内完成——这是允许的最大PeekLock持续时间。

我找不到任何允许我扩展锁的API,所以可能有另一种模式。

一个可能的解决方案:

1)使用现有实现接收消息。如果从主题/队列获取需要长时间运行的事务-使用新的ScheduledEnqueueTimeUtc更新消息并发送回服务总线。

myMessage.ScheduledEnqueueTimeUtc = TimeSpan.FromMinutes(actualLockDuration);
serviceBusClient.PublishMessage(topic, myMessage);

2)通过MessageId获取特定的消息,并将该新消息标记为完整。

if (oldMessage.LockedUntilUtc > DateTime.UtcNow) {
  var message = FetchMessage(oldMessage.MessageId);
  message.Complete();
} else {
  oldMessage.Complete();
}

在第二个想法,寻找一个API来获取消息的messageId -我没有看到一个。如果我通过序列Id获取,那么我需要一种方法来获得步骤1之后的序列Id -然后我需要重新考虑一些内部系统(大消息处理,消息日志和相关性等)

长时间运行作业的模式.带有事务的Azure服务总线

我不知道我怎么会错过这个。BrokeredMessage.RenewLock

我在它周围写了一个小的异步包装器来更新直到最大持续时间。

public static async Task<ProcessMessageReturn> RenewLockAfter(this Task<ProcessMessageReturn> processTask, BrokeredMessage message, int maxDuration)
{
    var ss = new SemaphoreSlim(2);
    var startTime = DateTime.UtcNow;
    var trackedTasks = new List<Task> {processTask};
    var timeoutCancellationTokenSource = new CancellationTokenSource();
    while (true)
    {
        ss.Wait(timeoutCancellationTokenSource.Token);
        if (startTime.AddMinutes(maxDuration) < DateTime.UtcNow)
        {
            var task = Task.Run(async () =>
            {
                await Task.Delay(TimeSpan.FromTicks(message.LockedUntilUtc.Ticks - DateTime.UtcNow.AddSeconds(30).Ticks), timeoutCancellationTokenSource.Token);
                await message.RenewLockAsync();
                ss.Release();
            }, timeoutCancellationTokenSource.Token);
            trackedTasks.Add(task);
        }

        var completedTask = await Task.WhenAny(trackedTasks);
        if (completedTask != processTask) continue;
        timeoutCancellationTokenSource.Cancel();
        return processTask.Result;
    }
}