同时处理单个和批处理请求的体系结构

本文关键字:请求 体系结构 批处理 处理 单个 | 更新日期: 2023-09-27 18:23:44

我有一个WCF服务托管在Windows服务中。此服务公开了2种方法:

  1. bool ProcessClaim(string options, ref string xml);将一些数据作为输入,进行一些处理(包括IO绑定操作,如DB查询),并返回结果
  2. void RunJob(string ticket);立即返回。根据ticket,从存储器(例如DB或文件系统)读取输入数据,对每个数据元素进行相同的处理,并将结果保存回存储器。批次通常由许多索赔组成

用户可以调用ProcessClaim来处理单个请求,调用RunJob来运行批处理。多个批处理可以同时运行。每个处理请求都封装为Task,因此所有请求都是并行执行的。问题是不允许批处理通过调度大量请求来阻塞处理队列。换句话说,如果用户执行大批量,它将在相当长的时间内阻止小批量和单个处理请求。因此,我提出了以下模式,Albahari对此进行了很好的描述(非常简短):

public sealed class ProcessingQueue : IDisposable
{
    private class WorkItem
    {
        public readonly TaskCompletionSource<string> TaskSource;
        public readonly string Options;
        public readonly string Claim;
        public readonly CancellationToken? CancelToken;
        public WorkItem(
            TaskCompletionSource<string> taskSource,
            string options,
            string claim,
            CancellationToken? cancelToken)
        {
            TaskSource = taskSource;
            Options = options;
            Claim = claim;
            CancelToken = cancelToken;
        }
    }
    public ProcessingQueue()
        : this(Environment.ProcessorCount)
    {
    }
    public ProcessingQueue(int workerCount)
    {
        _taskQ = new BlockingCollection<WorkItem>(workerCount * 2);
        for (var i = 0; i < workerCount; i++)
            Task.Factory.StartNew(Consume);
    }
    public void Dispose()
    {
        _taskQ.CompleteAdding();
    }
    private readonly BlockingCollection<WorkItem> _taskQ;
    public Task<string> EnqueueTask(string options, string claim, CancellationToken? cancelToken = null)
    {
        var tcs = new TaskCompletionSource<string>();
        _taskQ.Add(new WorkItem(tcs, options, claim, cancelToken));
        return tcs.Task;
    }
    public static Task<string> ProcessRequest(string options, string claim, CancellationToken? cancelToken = null)
    {
        return Task<string>.Factory.StartNew(() => ProcessItem(options, claim));
    }
    private void Consume()
    {
        foreach (var workItem in _taskQ.GetConsumingEnumerable())
        {
            if (workItem.CancelToken.HasValue && workItem.CancelToken.Value.IsCancellationRequested)
                workItem.TaskSource.SetCanceled();
            else
            {
                try
                {
                    workItem.TaskSource.SetResult(ProcessItem(workItem.Options, workItem.Claim));
                }
                catch (Exception ex)
                {
                    workItem.TaskSource.SetException(ex);
                }
            }
        }
    }
    private static string ProcessItem(string options, string claim)
    {
        // do some actual work here
        Thread.Sleep(2000); // simulate work;
        return options + claim; // return final result
    }
}

静态方法ProcessRequest可用于处理单个请求,而实例方法EnqueueTask用于批处理。当然,所有批都必须使用ProcessingQueue的单个共享实例。尽管这种方法效果很好,可以控制多个批次同时运行的速度,但在我看来有一些问题:

  • 必须手动维护一个工作线程池
  • 很难猜测工作线程的最佳数量(我默认使用处理器内核的数量)
  • 当没有批处理运行时,线程束仍然被阻塞,浪费系统资源
  • 处理块工作线程的IO绑定部分降低了CPU使用效率

我想知道,有没有更好的方法来处理这种情况?

更新:其中一个要求是为批处理提供全功率,这意味着当用户执行一个批处理,并且没有其他传入请求时,所有资源都必须专门用于处理该批处理。

同时处理单个和批处理请求的体系结构

我想说,使用单个服务接口和单个托管容器来处理这两种截然不同的需求可能是错误的。

您应该将服务解耦为两个部分——一个根据需要返回对单个请求的响应,另一个将批处理查询排队并在单个线程上处理它们。

通过这种方式,您可以为实时消费者提供高可用性渠道,并为批量消费者提供离线渠道。这些可以作为单独的关注点进行部署和管理,允许您在每个服务接口上提供不同的服务级别。

只是我对拟建建筑的看法。

更新

事实是,您的卷处理通道是一个脱机通道。这意味着消费者将不得不排队等待,并且他们的请求返回的时间不确定。

那么,排个工作队怎么样?每个作业在处理过程中都会获得所有可用资源。一旦处理了作业,调用方就会收到作业已完成的通知。