使用Rx对下一个消息发送进行响应门控

本文关键字:响应 Rx 下一个 消息 使用 | 更新日期: 2023-09-27 18:09:30

给定List<Message>,我用Send(message)发出第一条消息。现在我想等待(异步)响应回来之前,我发出下一个消息…

阻塞直到通知'old' way

我知道如何实现基于事件的解决方案的这种情况下,使用线程锁定/与Monitor.WaitMonitor.Pulse

反应性"新"方式?

但我想知道它是否会有意义利用响应式扩展在这里?

如果Rx在这里传达了有价值的好处,那么我如何使响应响应地屏蔽下一个发送调用?显然,这将涉及IObservable,可能两个作为主要来源,但然后呢?

使用Rx对下一个消息发送进行响应门控

这个问题不是很具体,似乎很一般,因为你没有提到什么是发送者和接收者等,所以答案也会很一般:)

var receiveObs = //You have created a observable around the receive mechanism 
var responses = messages.Select(m => {
   send(m);
   return receiveObs.First();
}).ToList();

我认为Rx是一个很好的选择,但我认为我可能在您的要求中遗漏了一些东西。在我看来,Rx提供了一个非常简单的解决方案。

如果你已经有了一个消息列表,那么你可以像这样发送它们:

messages
    .ToObservable()
    .ObserveOn(Scheduler.ThreadPool)
    .Subscribe(m =>
    {
        Send(m);
    });

这会将对Send的调用推到线程池中,并且根据可观察对象的内置行为,每次对Send的调用都会等待直到上一个调用完成。

因为这些都发生在不同的线程上,所以你的代码是非阻塞的。

Rx的额外好处是您不需要更改Send方法的行为或签名来使其工作。

简单,嗯?

我测试了这个,它工作得很好,因为我理解你的问题。这就是你想要的,还是我错过了什么?

我不确定Rx是否适合这里。Rx基于"推式收集"的概念,即向消费者推送数据而不是提取数据。您需要的是拉出第一个项目,异步发送它,并在异步操作完成后继续处理下一个元素。对于这种工作,完美的工具是async/await*!

async void SendMessages(List<Message> messages)
{
    foreach (Message message in messages)
    {
        await SendAsync(message);
    }
}

Task SendAsync(Message message);

* 在Async CTP或。net 4.5预览版中可用

假设您的Send方法遵循APM模型,那么以下方法应该可以为您工作

List<Message> messages;
IObservable<Response> xs;
xs =  messages.ToObservable().SelectMany(msg => Observable.FromAsyncPattern(Send, msg));

编辑-这不会像安德森建议的那样工作,这里有一个例子显示问题

Func<int,string> Send = (ii) => { "in Send".Dump(); Thread.Sleep(2000); return ii.ToString(); };
Func<int,IObservable<string>> sendIO =  Observable.FromAsyncPattern<int,string>(Send.BeginInvoke, Send.EndInvoke);
(new [] { 1, 2, 3 }).ToObservable().SelectMany(sendIO).Dump();