Azure服务总线/服务结构消息未从队列中删除

本文关键字:服务 队列 删除 消息 总线 结构 Azure | 更新日期: 2023-09-27 18:06:07

我正在将计算引擎从Azure worker角色移动到Azure Service Fabric中。它的工作原理是监听服务总线上的消息,然后根据消息内容对消息进行处理。

当前,计算工作正常,但如果计算时间超过一分钟左右,则消息在完成后不会从队列中删除。在worker角色中,我们通过增加"AutoRenewTimeout"来解决这个问题。

var options = new OnMessageOptions { AutoComplete = true, AutoRenewTimeout = TimeSpan.FromMinutes(3) };
        _queueClient.OnMessage(OnMessage, options);

然而,使用"serviceffabric"。ServiceBus"nuget包,我不知道你会在哪里设置它。我使用演示项目作为参考来设置实际运行计算的无状态服务。下面是对CalculateService.cs的摘录,其中初始化了无状态服务。

internal sealed class CalculateService : StatelessService
{
    public CalculateService(StatelessServiceContext context)
        : base(context)
    { }
    /// <summary>
    /// Optional override to create listeners (e.g., TCP, HTTP) for this service replica to handle client or user requests.
    /// </summary>
    /// <returns>A collection of listeners.</returns>
    protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
    {
        string serviceBusQueueName = CloudConfigurationManager.GetSetting("QueueName");
        yield return new ServiceInstanceListener(context => new ServiceBusQueueCommunicationListener(new Handler(this), context, serviceBusQueueName), "StatelessService-ServiceBusQueueListener");
    }
}

internal sealed class Handler : AutoCompleteServiceBusMessageReceiver
{
    protected override Task ReceiveMessageImplAsync(BrokeredMessage message, CancellationToken cancellationToken)
    {
        ServiceEventSource.Current.ServiceMessage(_service, $"Handling queue message {message.MessageId}");
        var computeRole = new ExcelCompute();
        var rMessage = new RangeMessage();
        rMessage = message.GetBody<RangeMessage>();
        var result = computeRole.OnMessage(rMessage, message.MessageId); //returns true if the compute was successful (which it currently, always is)
        return Task.FromResult(result);
    }
 }

我确实尝试使用<BrokeredMessage> message.Complete();,但这是抛出消息锁错误。

Azure服务总线/服务结构消息未从队列中删除

  1. 获取软件包的最新版本(>= v3.5.0)。将CommunicationListener上的属性' MessageLockRenewTimeSpan '设置为小于锁定持续时间的值。(例如锁定持续时间为60秒的50秒)这允许一些时钟偏差。注意:默认情况下,这个属性是空的,这意味着没有自动更新锁。

此选项在处理批处理所需的时间超过锁定持续时间允许的情况下非常有效。

  • 可以使用BrokeredMessage。当处理所需的时间超过锁定时间时,使用RenewLock周期性地延长消息锁定。如果需要,您可以在单独的线程中执行此操作。可能是有用的信息
  • 当处理单个消息(批大小为1)所需的时间超过锁定持续时间时,此选项可以很好地工作。