如何使用c#任务并行库和IProducerConsumerCollection实现泛型回调

本文关键字:IProducerConsumerCollection 实现 泛型 回调 何使用 任务 并行 | 更新日期: 2023-09-27 18:04:56

我有一个向基于web的API提交请求的组件,但是必须对这些请求进行限制,以免违反API的数据限制。这意味着所有请求都必须通过一个队列来控制它们提交的速率,但是它们可以(也应该)并发执行以实现最大吞吐量。每个请求在将来完成时必须在某个时间点向调用代码返回一些数据。

我正在努力创建一个好的模型来处理数据的返回。

使用BlockingCollection,我不能只从Schedule方法返回Task<TResult>,因为排队和脱队进程在缓冲区的两端。因此,我创建了一个RequestItem<TResult>类型,其中包含Action<Task<TResult>>形式的回调。

这个想法是,一旦一个项目已经从队列中取出,回调可以与已启动的任务一起调用,但是我已经失去了泛型类型参数,我留下了反射和各种各样的讨厌(如果可能的话)。

例如:

public class RequestScheduler 
{
    private readonly BlockingCollection<IRequestItem> _queue = new BlockingCollection<IRequestItem>();
    public RequestScheduler()
    {
        this.Start();
    }
    // This can't return Task<TResult>, so returns void.
    // Instead RequestItem is generic but this poses problems when adding to the queue
    public void Schedule<TResult>(RequestItem<TResult> request)
    {
        _queue.Add(request);
    }
    private void Start()
    {
        Task.Factory.StartNew(() =>
        {
            foreach (var item in _queue.GetConsumingEnumerable())
            {
                // I want to be able to use the original type parameters here
                // is there a nice way without reflection?
                // ProcessItem submits an HttpWebRequest
                Task.Factory.StartNew(() => ProcessItem(item))
                   .ContinueWith(t => { item.Callback(t); });
            }
        });
    }
    public void Stop()
    {
        _queue.CompleteAdding();
    }
}
public class RequestItem<TResult> : IRequestItem
{
    public IOperation<TResult> Operation { get; set; }
    public Action<Task<TResult>> Callback { get; set; }
}

我怎么能继续缓冲我的请求,但返回一个Task<TResult>的客户端当请求从缓冲区拉并提交给API?

如何使用c#任务并行库和IProducerConsumerCollection实现泛型回调

首先,您可以Schedule()返回Task<TResult>,您只需要使用TaskCompletionSource

其次,为了避免泛型问题,您可以将所有内容隐藏在(非泛型)Action s中。在Schedule()中,使用lambda创建一个您所需要的操作。然后消费循环将执行该动作,它不需要知道里面是什么。

第三,我不明白为什么你要在循环的每次迭代中开始一个新的Task。首先,这意味着你实际上不会得到任何节流。

通过这些修改,代码看起来像这样:
public class RequestScheduler
{
    private readonly BlockingCollection<Action> m_queue = new BlockingCollection<Action>();
    public RequestScheduler()
    {
        this.Start();
    }
    private void Start()
    {
        Task.Factory.StartNew(() =>
        {
            foreach (var action in m_queue.GetConsumingEnumerable())
            {
                action();
            }
        }, TaskCreationOptions.LongRunning);
    }
    public Task<TResult> Schedule<TResult>(IOperation<TResult> operation)
    {
        var tcs = new TaskCompletionSource<TResult>();
        Action action = () =>
        {
            try
            {
                tcs.SetResult(ProcessItem(operation));
            }
            catch (Exception e)
            {
                tcs.SetException(e);
            }
        };
        m_queue.Add(action);
        return tcs.Task;
    }
    private T ProcessItem<T>(IOperation<T> operation)
    {
        // whatever
    }
}
相关文章:
  • 没有找到相关文章