IObservable<byte[]> conversion

本文关键字:gt conversion byte lt IObservable | 更新日期: 2023-09-27 17:59:28

如何将IObservable转换为byte[]?

我想将IObservable转换为wav文件并将其保存在磁盘上。

DisOutput d;
File.WriteAllBytes("outputsend.wav", d);

我有一个数组IOobservable,我想将其转换为byte[],以便写入文件。如何将IObservable转换为byte[],因为WriteAllBytes将byte[]作为输入

IObservable<byte[]> conversion

我假设您有一个框架,可以以IObservable<Byte>的形式动态生成WAV文件。据推测,当生成完成时,IObservable<Byte>将激发OnCompleted以发出信号。

IObservable<Byte> observable;

最简单的方法是使用ToList,它将生成一个IObservable<IList<Byte>>,当源可观测序列完成时激发:

observable.ToList().Subscribe(list => File.WriteAllBytes("outputsend.wav", list.ToArray());

发生的情况是,ToList运算符将收集随时间增长的列表中的所有传入字节。当传入序列完成时,会通知订阅者,在这种情况下,字节列表会写入文件。

但是,不需要缓冲内存中的字节。相反,它们可以直接写入文件。当传入的字节流完成时,关闭文件是很重要的,这可以使用这种稍微复杂但也更有效的扩展方法来实现:

static class ObservableExtensions {
  public static IObservable<Unit> WriteToFile(this IObservable<Byte> source, String fileName) {
    return Observable.Create<Unit>(
      observer => {
        var fileStream = new SerialDisposable();
        return new CompositeDisposable(
          source.Subscribe(
            value => {
              try {
                if (fileStream.Disposable == null)
                  fileStream.Disposable = File.Create(fileName);
                ((FileStream) fileStream.Disposable).WriteByte(value);
              }
              catch (SystemException ex) {
                observer.OnError(ex);
              }
            },
            observer.OnError,
            () => {
              observer.OnNext(Unit.Default);
              observer.OnCompleted();
            }
          ),
          fileStream
        );
      }
    );
  }
}

你可以使用这样的扩展方法:

observable.WriteToFile("outputsend.wav").Subscribe(_ => Console.WriteLine("Done"));

要处理错误,您可以使用Subscribe:的另一个过载

observable.WriteToFile("outputsend.wav").Subscribe(
  _ => Console.WriteLine("Done"),
  ex => Console.WriteLine(ex)
);

这将向控制台写入异常。在生产质量应用中需要一种更复杂的方法。