使用Rx对下一个消息发送进行响应门控
本文关键字:响应 Rx 下一个 消息 使用 | 更新日期: 2023-09-27 18:09:30
给定List<Message>
,我用Send(message)
发出第一条消息。现在我想等待(异步)响应回来之前,我发出下一个消息…
阻塞直到通知'old' way
我知道如何实现基于事件的解决方案的这种情况下,使用线程锁定/与Monitor.Wait
和Monitor.Pulse
反应性"新"方式?
但我想知道它是否会有意义利用响应式扩展在这里?
如果Rx在这里传达了有价值的好处,那么我如何使响应响应地屏蔽下一个发送调用?显然,这将涉及IObservable
,可能两个作为主要来源,但然后呢?
这个问题不是很具体,似乎很一般,因为你没有提到什么是发送者和接收者等,所以答案也会很一般:)
var receiveObs = //You have created a observable around the receive mechanism
var responses = messages.Select(m => {
send(m);
return receiveObs.First();
}).ToList();
我认为Rx是一个很好的选择,但我认为我可能在您的要求中遗漏了一些东西。在我看来,Rx提供了一个非常简单的解决方案。
如果你已经有了一个消息列表,那么你可以像这样发送它们:
messages
.ToObservable()
.ObserveOn(Scheduler.ThreadPool)
.Subscribe(m =>
{
Send(m);
});
这会将对Send
的调用推到线程池中,并且根据可观察对象的内置行为,每次对Send
的调用都会等待直到上一个调用完成。
因为这些都发生在不同的线程上,所以你的代码是非阻塞的。
Rx的额外好处是您不需要更改Send
方法的行为或签名来使其工作。
简单,嗯?
我测试了这个,它工作得很好,因为我理解你的问题。这就是你想要的,还是我错过了什么?
我不确定Rx是否适合这里。Rx基于"推式收集"的概念,即向消费者推送数据而不是提取数据。您需要的是拉出第一个项目,异步发送它,并在异步操作完成后继续处理下一个元素。对于这种工作,完美的工具是async/await*!
async void SendMessages(List<Message> messages)
{
foreach (Message message in messages)
{
await SendAsync(message);
}
}
Task SendAsync(Message message);
* 在Async CTP或。net 4.5预览版中可用
假设您的Send
方法遵循APM模型,那么以下方法应该可以为您工作
List<Message> messages;
IObservable<Response> xs;
xs = messages.ToObservable().SelectMany(msg => Observable.FromAsyncPattern(Send, msg));
编辑-这不会像安德森建议的那样工作,这里有一个例子显示问题
Func<int,string> Send = (ii) => { "in Send".Dump(); Thread.Sleep(2000); return ii.ToString(); };
Func<int,IObservable<string>> sendIO = Observable.FromAsyncPattern<int,string>(Send.BeginInvoke, Send.EndInvoke);
(new [] { 1, 2, 3 }).ToObservable().SelectMany(sendIO).Dump();