如何使用c#任务并行库和IProducerConsumerCollection实现泛型回调
本文关键字:IProducerConsumerCollection 实现 泛型 回调 何使用 任务 并行 | 更新日期: 2023-09-27 18:04:56
我有一个向基于web的API提交请求的组件,但是必须对这些请求进行限制,以免违反API的数据限制。这意味着所有请求都必须通过一个队列来控制它们提交的速率,但是它们可以(也应该)并发执行以实现最大吞吐量。每个请求在将来完成时必须在某个时间点向调用代码返回一些数据。
我正在努力创建一个好的模型来处理数据的返回。
使用BlockingCollection
,我不能只从Schedule
方法返回Task<TResult>
,因为排队和脱队进程在缓冲区的两端。因此,我创建了一个RequestItem<TResult>
类型,其中包含Action<Task<TResult>>
形式的回调。
这个想法是,一旦一个项目已经从队列中取出,回调可以与已启动的任务一起调用,但是我已经失去了泛型类型参数,我留下了反射和各种各样的讨厌(如果可能的话)。
例如:public class RequestScheduler
{
private readonly BlockingCollection<IRequestItem> _queue = new BlockingCollection<IRequestItem>();
public RequestScheduler()
{
this.Start();
}
// This can't return Task<TResult>, so returns void.
// Instead RequestItem is generic but this poses problems when adding to the queue
public void Schedule<TResult>(RequestItem<TResult> request)
{
_queue.Add(request);
}
private void Start()
{
Task.Factory.StartNew(() =>
{
foreach (var item in _queue.GetConsumingEnumerable())
{
// I want to be able to use the original type parameters here
// is there a nice way without reflection?
// ProcessItem submits an HttpWebRequest
Task.Factory.StartNew(() => ProcessItem(item))
.ContinueWith(t => { item.Callback(t); });
}
});
}
public void Stop()
{
_queue.CompleteAdding();
}
}
public class RequestItem<TResult> : IRequestItem
{
public IOperation<TResult> Operation { get; set; }
public Action<Task<TResult>> Callback { get; set; }
}
我怎么能继续缓冲我的请求,但返回一个Task<TResult>
的客户端当请求从缓冲区拉并提交给API?
首先,您可以从Schedule()
返回Task<TResult>
,您只需要使用TaskCompletionSource
。
其次,为了避免泛型问题,您可以将所有内容隐藏在(非泛型)Action
s中。在Schedule()
中,使用lambda创建一个您所需要的操作。然后消费循环将执行该动作,它不需要知道里面是什么。
第三,我不明白为什么你要在循环的每次迭代中开始一个新的Task
。首先,这意味着你实际上不会得到任何节流。
public class RequestScheduler
{
private readonly BlockingCollection<Action> m_queue = new BlockingCollection<Action>();
public RequestScheduler()
{
this.Start();
}
private void Start()
{
Task.Factory.StartNew(() =>
{
foreach (var action in m_queue.GetConsumingEnumerable())
{
action();
}
}, TaskCreationOptions.LongRunning);
}
public Task<TResult> Schedule<TResult>(IOperation<TResult> operation)
{
var tcs = new TaskCompletionSource<TResult>();
Action action = () =>
{
try
{
tcs.SetResult(ProcessItem(operation));
}
catch (Exception e)
{
tcs.SetException(e);
}
};
m_queue.Add(action);
return tcs.Task;
}
private T ProcessItem<T>(IOperation<T> operation)
{
// whatever
}
}