可观察字节数组到对象的转换
本文关键字:对象 转换 数组 观察 字节 字节数 | 更新日期: 2023-09-27 18:24:42
我第一次在项目中使用Reactive,遇到了一个性能非常重要的问题。
概述:
我通过TCP套接字检索大量数据,我必须将其解析为对象并插入数据库。每条消息都有以下签名:
<payload-size> <payload>
其中size是uint32(4kb),它以字节为单位描述以下有效载荷的大小。
问题:
我想使用Reactive Framework提供的功能来并行化以下步骤(见下文),以最大限度地提高性能并避免成为瓶颈。此外,我要求为实现这一点提供"最佳实践"。
TCP Socket ---> Observable (ArraySegment<byte>) --> Observable (Message)
我已经实现了下面的代码,它为我提供了一个Observable (ArraySegment<byte>)
。
IObservable<TcpClient> observableTcpClient = endPoint.ListenerObservable(1);
IObservable<ArraySegment<byte>> observableSocket = observableTcpClient
.SelectMany(client => client.ToClientObservable(bufferSize));
现在我想将Observable (ArraySegment<byte>)
转换为Observable (Message)
。我的第一个解决方案看起来有点像这样,因为我认为我可以像流一样使用可观察的。
使用TcpClient和Reactive Extensions 从Stream读取连续字节流
问题:
是否有可能(以及如何)使用以下方法创建可观察到的?或者你会推荐一种更好的方法吗?我真的很感激一个好的例子。
注意:Observable(ArraySegment)的行为就像一个流,所以我不知道它推送给我的数据的大小。(我需要实现某种缓冲区吗?或者Reactive Framework能帮我吗?)
Observable (ArraySegment<byte>)
--> Buffer(4kb)
--> ReadSize --> Buffer(payload-size)
--> ReadPayload
--> Parse Payload
--> (Start over)
提前感谢!:)
编辑:在Dimitri发表评论后,我在下面提出了一个修订后的解决方案。有一条线需要迫切的重构,但它似乎奏效了。。
使用了Window
重载,因此我们可以编写自定义缓冲。
var hinge = new Subject<Unit>();
observableSocket
.SelectMany(i => i) // to IObservable<byte>
.Window(() => hinge) // kinda-like-buffer
.Select(buff =>
{
return
from size in buff.Buffer(SIZEOFUINT32).Select(ConvertToUINT32)
from payload in buff.Buffer(size)
//Refactor line below! Window must be closed somehow..
from foo in Observable.Return(Unit.Default).Do( _ => hinge.OnNext(Unit.Default))
select payload;
})
.SelectMany(i=>i)
.ObserveOn(ThreadPoolScheduler.Instance)
.Select(ConvertToMessage);
编辑2:删除旧的解决方案
这是我最终使用的解决方案。欢迎对可能的改进发表评论。
public static IObservable<DataMessage> Convert(IObservable<ArraySegment<byte>> bytes)
{
const int headerSize = 12; // bytes
return bytes.Scan(
new
{
Leftovers = new byte[0],
Messages = new List<DataMessage>(),
Header = (Header) null
},
(saved, current) =>
{
var data = ConcatdArrays(saved.Leftovers, current.ToArray());
var messages = new List<DataMessage>();
var header = saved.Header;
while (true)
{
// Header
if (header == null && data.Count >= headerSize)
{
header = ReadHeader(ref data, headerSize);
}
// Payload
else if (header != null)
{
var type = header.Type;
var size = DataItem.Size(type);
if (data.Count < size) break; // Still missing data
// Create new message with the gathered data
var payload = ReadPayload(ref data, size);
messages.Add(new DataMessage(header, payload));
header = null;
}
// Can't do more with the available data - try again next round.
else
{
break;
}
}
return new
{
Leftovers = data.ToArray(),
Messages = messages,
Header = header
};
}).SelectMany(list => list.Messages);