如何使用Rx.NET中的管道混合网络和文件系统IO
本文关键字:网络 混合 文件系统 IO 管道 何使用 Rx NET | 更新日期: 2023-09-27 18:16:15
我有以下要求:
- 收集多个远程站点的某些信息
- 将信息序列化到磁盘
- 联系同一站点,确认数据收集成功。
这是一个非常简化的流程,真正的流程还必须处理故障并具有其他方面,我认为这些方面与我的问题无关,至少目前看来是这样。
无论如何,这是我如何实现所描述的流程:
var data = await GetSitesSource()
.Select(site => Observable
.FromAsync(() => GetInformationFromSiteAsync(site))
.Select(site.MakeKeyValuePair))
.Merge(maxConcurrentSiteRequests)
.ToList();
if (data.Count > 0)
{
var filePath = GetFilePath();
using (var w = new StreamWriter(filePath))
{
await w.WriteAsync(YieldLines(data));
}
var tsUTC = DateTime.UtcNow;
await data.ToObservable()
.Select(o => Observable.FromAsync(() => AckInformationFromSiteAsync(o.Key, tsUTC, o.Value.InformationId)))
.Merge(maxConcurrentSiteRequests);
}
地点:
-
MakeKeyValuePair
是一个返回KeyValuePair<K,V>
实例的扩展方法 -
YieldLines
将data
转化为IEnumerable<string>
-
WriteAsync
是一个虚构的扩展方法,将一系列字符串写入StreamWriter
这似乎不是一个很好的实现,因为我没有利用这样一个事实,即我可以在第一个Merge
运算符出现时开始写出记录。
我可以使用SelectMany
+ Merge(1)
运算符异步地将块写入文件(顺序无关紧要),但是我如何确保各自的StreamWriter
仅在需要时初始化并正确处理?因为如果没有数据,我甚至不想初始化StreamWriter
.
我的问题-如何重写这段代码,使可观察管道不会在中间中断写入文件?它应该包括所有三个阶段:
- 从多个站点获取数据
- 逐条写入数据块,顺序无关
- 所有数据写入后确认数据
我还没有测试过这个,但是你的代码都不排除将它连接在一起。所以你可以这样做:
//The ToObservable extension for Task is only available through
using System.Reactive.Threading.Tasks;
GetSitesSource()
.Select(site => Observable
.FromAsync(() => GetInformationFromSiteAsync(site))
.Select(site.MakeKeyValuePair))
.Merge(maxConcurrentSiteRequests)
.ToList()
//Only proceed if we received data
.Where(data => data.Count > 0)
.SelectMany(data =>
//Gives the StreamWriter the same lifetime as this Observable once it subscribes
Observable.Using(
() => new StreamWriter(GetFilePath()),
(w) => w.WriteAsync(YieldLines(data)).ToObservable()),
//We are interested in the original data value, not the write result
(data, _) => data)
//Attach a timestamp of when data passed through here
.Timestamp()
.SelectMany(o=> {
var ts = o.Timestamp;
var data= o.Value;
//This is actually returning IEnumerable<IObservable<T>> but merge
//will implicitly handle it.
return data.Select(i => Observable.FromAsync(() =>
AckInformationFromSiteAsync(i.Key, ts,
i.Value.InformationId)))
.Merge(maxConcurrentSiteRequests);
})
//Handle the return values, fatal errors and the completion of the stream.
.Subscribe();
更全面地回答你的问题
Using
操作符将一个必须实现IDisposable
的资源绑定到Observable的生命周期。第一个参数是一个工厂函数,它会在Observable被订阅时被调用一次。