用响应式扩展处理一批请求
本文关键字:一批 请求 响应 扩展 处理 | 更新日期: 2023-09-27 18:13:03
我正在学习响应式扩展,我一直在试图找出它是否适合这样的任务。
我有一个Process()方法,它将一批请求作为一个工作单元处理,并在所有请求完成时调用回调。
重要的是,每个请求将调用回调或同步或异步取决于它的实现,批处理处理器必须能够处理这两个。
但是没有线程从批处理程序启动,任何新的线程(或其他异步执行)将在必要时从请求处理程序内部启动。我不知道这是否符合rx的用例。
我当前的工作代码看起来(几乎)是这样的:
public void Process(ICollection<IRequest> requests, Action<List<IResponse>> onCompleted)
{
IUnitOfWork uow = null;
try
{
uow = unitOfWorkFactory.Create();
var responses = new List<IResponse>();
var outstandingRequests = requests.Count;
foreach (var request in requests)
{
var correlationId = request.CorrelationId;
Action<IResponse> requestCallback = response =>
{
response.CorrelationId = correlationId;
responses.Add(response);
outstandingRequests--;
if (outstandingRequests != 0)
return;
uow.Commit();
onCompleted(responses);
};
requestProcessor.Process(request, requestCallback);
}
}
catch(Exception)
{
if (uow != null)
uow.Rollback();
}
if (uow != null)
uow.Commit();
}
如何使用rx实现这个?这合理吗?
请注意,即使存在尚未返回的异步请求,也要同步提交工作单元。
我的方法是两步走。
首先创建一个通用运算符,将Action<T, Action<R>>
变为Func<T, IObservable<R>>
:
public static class ObservableEx
{
public static Func<T, IObservable<R>> FromAsyncCallbackPattern<T, R>(
this Action<T, Action<R>> call)
{
if (call == null) throw new ArgumentNullException("call");
return t =>
{
var subject = new AsyncSubject<R>();
try
{
Action<R> callback = r =>
{
subject.OnNext(r);
subject.OnCompleted();
};
call(t, callback);
}
catch (Exception ex)
{
return Observable.Throw<R>(ex, Scheduler.ThreadPool);
}
return subject.AsObservable<R>();
};
}
}
接下来,将呼叫void Process(ICollection<IRequest> requests, Action<List<IResponse>> onCompleted)
变为IObservable<IResponse> Process(IObservable<IRequest> requests)
:
public IObservable<IResponse> Process(IObservable<IRequest> requests)
{
Func<IRequest, IObservable<IResponse>> rq2rp =
ObservableEx.FromAsyncCallbackPattern
<IRequest, IResponse>(requestProcessor.Process);
var query = (
from rq in requests
select rq2rp(rq)).Concat();
var uow = unitOfWorkFactory.Create();
var subject = new ReplaySubject<IResponse>();
query.Subscribe(
r => subject.OnNext(r),
ex =>
{
uow.Rollback();
subject.OnError(ex);
},
() =>
{
uow.Commit();
subject.OnCompleted();
});
return subject.AsObservable();
}
现在,这不仅异步运行处理,而且还确保了结果的正确顺序。
实际上,由于您从一个集合开始,您甚至可以这样做:
var rqs = requests.ToObservable();
var rqrps = rqs.Zip(Process(rqs),
(rq, rp) => new
{
Request = rq,
Response = rp,
});
那么你就会有一个可观察对象来配对每个请求/响应,而不需要CorrelationId
属性。
这是Rx的一部分天赋,因为您可以自由地以同步或异步方式返回结果:
public IObservable<int> AddNumbers(int a, int b) {
return Observable.Return(a + b);
}
public IObservable<int> AddNumbersAsync(int a, int b) {
return Observable.Start(() => a + b, Scheduler.NewThread);
}
它们都有IObservable类型,所以它们的工作方式是一样的。如果你想知道所有的IObservables是什么时候完成的,Aggregate会这样做,因为它会把Observable中的n个item变成1个 item,并在最后返回:
IObservable<int> listOfObservables[];
listObservables.ToObservable()
.Merge()
.Aggregate(0, (acc, x) => acc+1)
.Subscribe(x => Console.WriteLine("{0} items were run", x));