可观察的网络 IO 解析

本文关键字:IO 解析 网络 观察 | 更新日期: 2023-09-27 18:32:54

我正在尝试使用 Rx 从 TCPClient 接收流中读取并将数据解析为字符串的 IObservable,由换行符"''r'"分隔以下是我从套接字流接收的方式......

var messages = new Subject<string>();
var functionReceiveSocketData =
            Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>
            (client.Client.BeginReceive, client.Client.EndReceive);
Func<byte[], int, byte[]> copy = (bs, n) =>
        {
            var rs = new byte[buffer.Length];
            bs.CopyTo(rs, 0);
            return rs;
        };
Observable
    .Defer(() =>
            {
                var buffer = new byte[50];
                return
                    from n in functionReceiveSocketData(buffer, 0, buffer.Length, SocketFlags.None)
                select copy(buffer, n);
            }).Repeat().Subscribe(x => messages.OnNext(System.Text.Encoding.UTF8.GetString(x)));

这是我想出的解析字符串的方法。 这目前不起作用...

obsStrings = messages.Buffer<string,string>(() =>  
                messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("'r'n"))
            );
消息主题以块

的形式接收消息,因此我尝试将它们连接起来并测试串联的字符串是否包含换行符,从而向缓冲区发出关闭并输出缓冲块的信号。 不知道为什么它不起作用。 似乎我只从 obsString 中获取了第一个块。

所以我正在寻找两件事。 我想简化io流的读取并消除消息主题的使用。 其次,我想让我的字符串解析工作。 我已经对此进行了一段时间的黑客攻击,但无法提出可行的解决方案。 我是 Rx 的初学者。

编辑:这是问题解决后的成品....

var receivedStrings = socket.ReceiveUntilCompleted(SocketFlags.None)
            .SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray())
            .Scan(String.Empty, (a, b) => (a.EndsWith("'r'n") ? "" : a) + b)
            .Where(x => x.EndsWith("'r'n"))
            .Select(buffered => String.Join("", buffered))
            .Select(a => a.Replace("'n", ""));

"ReceiveUntilComplete"是RXX项目的扩展。

可观察的网络 IO 解析

messages
    .Scan(String.Empty, (a, b) => (a.EndsWith("'r'n") ? "" : a) + b)
    .Where(x => x.EndsWith("'r'n"))

与其Subscribe并使用Subject,不如试试Select

.Repeat().Select(x => System.Text.Encoding.UTF8.GetString(x));

现在假设这一切都进入了一个名为 messages 的新可观察量,你的下一个问题是在这一行中

var obsStrings = messages.Buffer<string,string>(() =>  
                messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("'r'n"))
            );

您同时使用BufferScan,并尝试在两者中做同样的事情!请注意,Buffer需要一个关闭选择器。

你真正想要的是:

var obsStrings = messages.Buffer(() => messages.Where(x => x.Contains("'r'n")))
                         .Select(buffered => String.Join(buffered));

这为 Buffered 提供了关于何时关闭窗口(当它包含 ''r' 时(的可观察量,并给出了选择要连接的缓冲量。这将导致拆分字符串的新可观察量。

一个问题是您仍然可以在块的中间使用新行,这会导致问题。一个简单的想法是观察字符而不是完整的字符串块,例如:

obsStrings.Repeat().SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray().ToObservable());

然后,您可以执行messages.Where(c => c != ''r')跳过'r并将缓冲区更改为:

var obsStrings = messages.Buffer(() => messages.Where(x => x == ''n')))
                         .Select(buffered => String.Join("", buffered));