可观察字节数组到对象的转换

本文关键字:对象 转换 数组 观察 字节 字节数 | 更新日期: 2023-09-27 18:24:42

我第一次在项目中使用Reactive,遇到了一个性能非常重要的问题。

概述:

我通过TCP套接字检索大量数据,我必须将其解析为对象并插入数据库。每条消息都有以下签名:

<payload-size> <payload>

其中size是uint32(4kb),它以字节为单位描述以下有效载荷的大小。

问题:

我想使用Reactive Framework提供的功能来并行化以下步骤(见下文),以最大限度地提高性能并避免成为瓶颈。此外,我要求为实现这一点提供"最佳实践"。

TCP Socket ---> Observable (ArraySegment<byte>) --> Observable (Message)

我已经实现了下面的代码,它为我提供了一个Observable (ArraySegment<byte>)

IObservable<TcpClient> observableTcpClient = endPoint.ListenerObservable(1);
IObservable<ArraySegment<byte>> observableSocket = observableTcpClient
.SelectMany(client => client.ToClientObservable(bufferSize));

现在我想将Observable (ArraySegment<byte>)转换为Observable (Message)。我的第一个解决方案看起来有点像这样,因为我认为我可以像流一样使用可观察的。

使用TcpClient和Reactive Extensions 从Stream读取连续字节流

问题:

是否有可能(以及如何)使用以下方法创建可观察到的?或者你会推荐一种更好的方法吗?我真的很感激一个好的例子。

注意:Observable(ArraySegment)的行为就像一个流,所以我不知道它推送给我的数据的大小。(我需要实现某种缓冲区吗?或者Reactive Framework能帮我吗?)

    Observable (ArraySegment<byte>) 
    --> Buffer(4kb) 
    --> ReadSize --> Buffer(payload-size) 
    --> ReadPayload 
    --> Parse Payload
    --> (Start over)

提前感谢!:)

可观察字节数组到对象的转换

编辑:在Dimitri发表评论后,我在下面提出了一个修订后的解决方案。有一条线需要迫切的重构,但它似乎奏效了。。

使用了Window重载,因此我们可以编写自定义缓冲。

var hinge = new Subject<Unit>();
observableSocket
.SelectMany(i => i) // to IObservable<byte> 
.Window(() => hinge) // kinda-like-buffer
.Select(buff =>
{    
    return
        from size in buff.Buffer(SIZEOFUINT32).Select(ConvertToUINT32)
        from payload in buff.Buffer(size)
        //Refactor line below! Window must be closed somehow..
        from foo in Observable.Return(Unit.Default).Do( _ => hinge.OnNext(Unit.Default)) 
        select payload;                     
})
.SelectMany(i=>i)
.ObserveOn(ThreadPoolScheduler.Instance)
.Select(ConvertToMessage);

编辑2:删除旧的解决方案

这是我最终使用的解决方案。欢迎对可能的改进发表评论。

    public static IObservable<DataMessage> Convert(IObservable<ArraySegment<byte>> bytes)
            {
                const int headerSize = 12; // bytes
                return bytes.Scan(
                    new
                    {
                        Leftovers = new byte[0],
                        Messages = new List<DataMessage>(),
                        Header = (Header) null
                    },
                    (saved, current) =>
                    {
                        var data = ConcatdArrays(saved.Leftovers, current.ToArray());
                        var messages = new List<DataMessage>();
                        var header = saved.Header;
                        while (true)
                        {
                            // Header
                            if (header == null && data.Count >= headerSize)
                            {
                                header = ReadHeader(ref data, headerSize);
                            }
                            // Payload
                            else if (header != null)
                            {
                                var type = header.Type;
                                var size = DataItem.Size(type);
                                if (data.Count < size) break; // Still missing data
                                // Create new message with the gathered data
                                var payload = ReadPayload(ref data, size);
                                messages.Add(new DataMessage(header, payload));
                                header = null;
                            }
                            // Can't do more with the available data - try again next round.
                            else
                            {
                                break;
                            }
                        }
                        return new
                        {
                            Leftovers = data.ToArray(),
                            Messages = messages,
                            Header = header
                        };
                    }).SelectMany(list => list.Messages);