合并延迟的可观察对象Udp

本文关键字:对象 Udp 观察 延迟 合并 | 更新日期: 2023-09-27 18:00:49

我正试图将一组延迟的可观察(源自UDP.RecieveAsync(调用合并为一个我也可以订阅的可观察。我是反应式扩展的新手,我确信我在deffering方面做得不对。

Log.Information("InboundUdpListener is starting");
IObservable<UdpReceiveResult> receiveStream = null;
IList<IObservable<UdpReceiveResult>> receivingStreams = new List<IObservable<UdpReceiveResult>>();
foreach (var devicePortMapping in _deviceTypeMapper.GetDeviceTypes())
{
      Log.Information("InboundUdpListener is starting for DeviceType: {DeviceType}, Port: {Port}",
      devicePortMapping.DeviceType, devicePortMapping.Port);
      var client = new UdpClient(devicePortMapping.Port);
      receivingStreams.Add(Observable.Defer(() => client
          .ReceiveAsync()
          .ToObservable())
          .Repeat());
      _clients.Add(client);
}
receiveStream = receivingStreams.Merge();
_listener = receiveStream.Subscribe(async r =>
{
    Log.Information("InboundUdpListener received {BytesReceived} bytes from IPAddress : {IPAddress}, Port : {Port}", r.Buffer.Length, r.RemoteEndPoint.Address.MapToIPv4(),r.RemoteEndPoint.Port);
    var message = new IncomingMessage(r.RemoteEndPoint, r.Buffer);
    var deviceTypeMap = _deviceTypeMapper.GetDeviceType(message);
    message.DeviceType = deviceTypeMap?.DeviceType ?? DeviceTypeEnum.UnIdentified;
    Log.Information("InboundUdpListener is publishing message {@Message}", message);
    await _messagePublisher.Publish(message);
});
Log.Information("InboundUdpListener is started");

合并延迟的可观察对象Udp

您肯定是在以程序的方式思考,而不是以函数的方式思考。

您需要尝试将处理保持在可观测范围内,并避免foreach和可观测的临时列表。

您也在使用UdpClient来产生您的值——这个对象是一次性的,所以您的可观察对象应该为您管理它的生命周期。使用Observable.Using可以做到这一点。

此外,异步方法可以与Observable.FromAsync一起使用,所以您也应该使用它。

所以,考虑到所有这些,你的receiveStream可能应该是这样的:

IObservable<UdpReceiveResult> receiveStream =
    from devicePortMapping in _deviceTypeMapper.GetDeviceTypes().ToObservable()
    from stream in
        Observable
            .Using(
                () => new UdpClient(devicePortMapping.Port),
                client =>
                    Observable
                        .FromAsync(() => client.ReceiveAsync())
                        .Repeat())
    select stream;

现在,考虑到我在你的订阅电话中看到的情况,你可能可以更进一步,这样做:

IObservable<UdpReceiveResult> receiveStream =
    from devicePortMapping in _deviceTypeMapper.GetDeviceTypes().ToObservable()
    from stream in
        Observable
            .Using(
                () => new UdpClient(devicePortMapping.Port),
                client =>
                    Observable
                        .FromAsync(() => client.ReceiveAsync())
                        .Repeat())
    select new IncomingMessage(stream.RemoteEndPoint, stream.Buffer)
    {
        DeviceType = devicePortMapping
    };

现在,这意味着您可以访问查询本身中的原始设备类型,因此无需查找它——如果我正确理解了您的代码在做什么的话。

如果确实需要进行查找,则应将其作为查询的一部分进行查找。试着这样做:

IObservable<UdpReceiveResult> receiveStream =
    from devicePortMapping in _deviceTypeMapper.GetDeviceTypes().ToObservable()
    from stream in
        Observable
            .Using(
                () => new UdpClient(devicePortMapping.Port),
                client =>
                    Observable
                        .FromAsync(() => client.ReceiveAsync())
                        .Repeat())
    from message in Observable.Start(() =>
    {
        var message = new IncomingMessage(r.RemoteEndPoint, r.Buffer);
        var deviceTypeMap = _deviceTypeMapper.GetDeviceType(message);
        message.DeviceType = deviceTypeMap?.DeviceType ?? DeviceTypeEnum.UnIdentified;
    })
    select message;

如果可以的话,最好使用第二个查询。

实际上,您并没有通过在循环中调用merge来合并可观察对象。您只需在每次迭代中创建一个可观察的对象。您想要做的是将您在循环中创建的所有可观察对象传递到最后进行合并,以创建单个合并的可观察对象。

编辑:把我上面的评论变成了答案。