如何拆分和管道多个NAudio流

本文关键字:管道 NAudio 何拆分 拆分 | 更新日期: 2023-09-27 18:00:55

我有一个C#项目,处理来自Kinect 1、Kinect 2、麦克风或其他任何东西的输入音频流。

waveIn.DataAvailable += (object sender, WaveInEventArgs e) => {
  lock(buffer){
    var pos = buffer.Position;
              buffer.Write(e.Buffer, 0, e.BytesRecorded);
              buffer.Position = pos;
  }
};

缓冲区变量是来自组件a的流,该流将由处理流的SpeechRecognition组件B处理。

我将添加新的组件C、D、E,在Streams上工作,以计算音高、检测声音、进行指纹或其他任何操作。。。

如何为组件C、D、E复制该Stream

  • 组件A发送一个事件"我有一个流做你想做的事"我不想通过一个事件来反转逻辑"给我你的流">

  • 我正在寻找一个"MultiStream",它可以给我一个Stream实例,并将处理的工作

组件A

var MultiStream buffer = new MultiStream()
...
SendMyEventWith(buffer)

成分B、C、D、E

public void HandleMyEvent(MultiStream buffer){
  var stream = buffer.GetNewStream();
  var engine = new EngineComponentB()
      engine.SetStream(stream);
}
  • MultiStream必须是Stream to wrap Write((方法(因为Stream没有可用的数据机制(
  • 如果组件B对Stream进行Dispose((处理,那么MultiStream应该将其从数组中删除吗
  • MultiStream必须在Read((上引发异常才能要求使用GetNewStream((

编辑:Kinect 1提供流本身…:-(我应该使用线程将其泵入MultiStream吗?

有人上过那种MultiStream课程吗?

感谢

如何拆分和管道多个NAudio流

我不确定这是最好的方法,还是比上一个答案更好,我也不能保证这段代码是完美的,但我编码了一些你想要的东西,因为它很有趣——MultiStream类。

您可以在此处找到该类的代码:http://pastie.org/10289142

用法示例:

MultiStream ms = new MultiStream();
Stream copy1 = ms.CloneStream();
ms.Read( ... );
Stream copy2 = ms.CloneStream();
ms.Read( ... );

运行示例后,copy1copy2将包含相同的数据,并且它们将随着MultiStream的写入而继续更新。您可以分别读取、更新位置和处理克隆的流。如果已处置,则克隆的流将从MultiStream中删除,而处置Multistream将关闭所有相关的和克隆的流(如果这不是您想要的行为,则可以对此进行更改(。尝试写入克隆的流将引发不支持的异常。

不知何故,我认为流并不真正适合你想要做的事情。你正在设置一种情况,即程序的长期运行将毫无明显原因地不断扩展数据需求。

我建议使用pub/sub模型,将接收到的音频数据发布给订阅者,最好使用多线程方法,以最大限度地减少坏订阅者的影响。这里可以找到一些想法。

我以前用一个实现IObserver<byte[]>并使用Queue<byte[]>存储样本块的处理器类完成过这项工作,直到进程线程为它们做好准备。以下是基本类:

public abstract class BufferedObserver<T> : IObserver<T>, IDisposable
{
    private object _lck = new object();
    private IDisposable _subscription = null;
    public bool Subscribed { get { return _subscription != null; } }
    private bool _completed = false;
    public bool Completed { get { return _completed; } }
    protected readonly Queue<T> _queue = new Queue<T>();
    protected bool DataAvailable { get { lock(_lck) { return _queue.Any(); } } }
    protected int AvailableCount { get { lock (_lck) { return _queue.Count; } } }
    protected BufferedObserver()
    {
    }
    protected BufferedObserver(IObservable<T> observable)
    {
        SubscribeTo(observable);
    }
    public virtual void Dispose()
    {
        if (_subscription != null)
        {
            _subscription.Dispose();
            _subscription = null;
        }
    }
    public void SubscribeTo(IObservable<T> observable)
    {
        if (_subscription != null)
            _subscription.Dispose();
        _subscription = observable.Subscribe(this);
        _completed = false;
    }
    public virtual void OnCompleted()
    {
        _completed = true;
    }
    public virtual void OnError(Exception error)
    { }
    public virtual void OnNext(T value)
    {
        lock (_lck)
            _queue.Enqueue(value);
    }
    protected bool GetNext(ref T buffer)
    {
        lock (_lck)
        {
            if (!_queue.Any())
                return false;
            buffer = _queue.Dequeue();
            return true;
        }
    }
    protected T NextOrDefault()
    {
        T buffer = default(T);
        GetNext(ref buffer);
        return buffer;
    }
}
public abstract class Processor<T> : BufferedObserver<T>
{
    private object _lck = new object();
    private Thread _thread = null;
    private object _cancel_lck = new object();
    private bool _cancel_requested = false;
    private bool CancelRequested
    {
        get { lock(_cancel_lck) return _cancel_requested; }
        set { lock(_cancel_lck) _cancel_requested = value; }
    }
    public bool Running { get { return _thread == null ? false : _thread.IsAlive; } }
    public bool Finished { get { return _thread == null ? false : !_thread.IsAlive; } }
    protected Processor(IObservable<T> observable)
        : base(observable)
    { }
    public override void Dispose()
    {
        if (_thread != null && _thread.IsAlive)
        {
            //CancelRequested = true;
            _thread.Join(5000);
        }
        base.Dispose();
    }
    public bool Start()
    {
        if (_thread != null)
            return false;
        _thread = new Thread(threadfunc);
        _thread.Start();
        return true;
    }
    private void threadfunc()
    {
        while (!CancelRequested && (!Completed || _queue.Any()))
        {
            if (DataAvailable)
            {
                T data = NextOrDefault();
                if (data != null && !data.Equals(default(T)))
                    ProcessData(data);
            }
            else
                Thread.Sleep(10);
        }
    }
    // implement this in a sub-class to process the blocks
    protected abstract void ProcessData(T data);
}

通过这种方式,您只在需要的时候保留数据,并且可以将所需数量的进程线程附加到相同的可观察数据源。


为了完整起见,这里有一个实现IObservable<T>的泛型类,这样您就可以看到它是如何组合在一起的。这个甚至有评论:

/// <summary>Generic IObservable implementation</summary>
/// <typeparam name="T">Type of messages being observed</typeparam>
public class Observable<T> : IObservable<T>
{
    /// <summary>Subscription class to manage unsubscription of observers.</summary>
    private class Subscription : IDisposable
    {
        /// <summary>Observer list that this subscription relates to</summary>
        public readonly ConcurrentBag<IObserver<T>> _observers;
        /// <summary>Observer to manage</summary>
        public readonly IObserver<T> _observer;
        /// <summary>Initialize subscription</summary>
        /// <param name="observers">List of subscribed observers to unsubscribe from</param>
        /// <param name="observer">Observer to manage</param>
        public Subscription(ConcurrentBag<IObserver<T>> observers, IObserver<T> observer)
        {
            _observers = observers;
            _observer = observer;
        }
        /// <summary>On disposal remove the subscriber from the subscription list</summary>
        public void Dispose()
        {
            IObserver<T> observer;
            if (_observers != null && _observers.Contains(_observer))
                _observers.TryTake(out observer);
        }
    }
    // list of subscribed observers
    private readonly ConcurrentBag<IObserver<T>> _observers = new ConcurrentBag<IObserver<T>>();
    /// <summary>Subscribe an observer to this observable</summary>
    /// <param name="observer">Observer instance to subscribe</param>
    /// <returns>A subscription object that unsubscribes on destruction</returns>
    /// <remarks>Always returns a subscription.  Ensure that previous subscriptions are disposed
    /// before re-subscribing.</remarks>
    public IDisposable Subscribe(IObserver<T> observer)
    {
        // only add observer if it doesn't already exist:
        if (!_observers.Contains(observer))
            _observers.Add(observer);
        // ...but always return a new subscription.
        return new Subscription(_observers, observer);
    }
    // delegate type for threaded invocation of IObserver.OnNext method
    private delegate void delNext(T value);
    /// <summary>Send <paramref name="data"/> to the OnNext methods of each subscriber</summary>
    /// <param name="data">Data object to send to subscribers</param>
    /// <remarks>Uses delegate.BeginInvoke to send out notifications asynchronously.</remarks>
    public void Notify(T data)
    {
        foreach (var observer in _observers)
        {
            delNext handler = observer.OnNext;
            handler.BeginInvoke(data, null, null);
        }
    }
    // delegate type for asynchronous invocation of IObserver.OnComplete method
    private delegate void delComplete();
    /// <summary>Notify all subscribers that the observable has completed</summary>
    /// <remarks>Uses delegate.BeginInvoke to send out notifications asynchronously.</remarks>
    public void NotifyComplete()
    {
        foreach (var observer in _observers)
        {
            delComplete handler = observer.OnCompleted;
            handler.BeginInvoke(null, null);
        }
    }
}

现在,您可以创建一个Observable<byte[]>,用作感兴趣的Process<byte[]>实例的发送器。从输入流、音频读取器等中提取数据块,并将其传递给Notify方法。只需确保事先克隆阵列。。。