自动重置事件和多个集合

本文关键字:集合 事件 | 更新日期: 2023-09-27 18:18:10

我正在尝试围绕堆栈设计一个数据结构,该结构阻塞直到堆栈有可用的项目。我尝试使用AutoResetEvent但我想我误解了同步过程的工作原理。基本上,查看以下代码,当没有任何可用内容时,我正在尝试从堆栈中弹出。

似乎AutoResetEvent的行为就像一个信号量。这是对的吗?我可以摆脱BlockingStack.Get()中的Set()并完成它吗?或者这会导致我只使用我的一个堆栈项目的情况。

public class BlockingStack
{
    private Stack<MyType> _internalStack;
    private AutoResetEvent _blockUntilAvailable;
    public BlockingStack()
    {
        _internalStack = new Stack<MyType>(5);
        _blockUntilAvailable = new AutoResetEvent(false);
        for (int i = 0; i < 5; ++i)
        {
            var obj = new MyType();
            Add(obj);
        }
    }
    public MyType Get()
    {
        _blockUntilAvailable.WatiOne();
        lock (_internalStack)
        {
            var obj = _internalStack.Pop();
            if (_internalStack.Count > 0)
            {
                _blockUntilAvailable.Set(); // do I need to do this?
            }
            return obj;
        }
    }
    public void Add(MyType obj)
    {
        lock (_internalStack)
        {
            _internalStack.Push(obj);
            _blockUntilAvailable.Set();
        }
    }
}

我的假设是,当一个线程通过WaitOne()函数调用时,所有等待线程的AutoResetEvent重置。但是,似乎有多个线程进入。除非我在某处搞砸了我的逻辑。

编辑:这是针对Silverlight的。

自动重置事件和多个集合

你最好

使用阻塞集合,除非你只是想了解线程是如何工作的。这将为您提供一个由堆栈支持的阻塞集合:

ConcurrentStack<SomeType> MyStack = new ConcurrentStack<SomeType>();
BlockingCollection<SomeType> SharedStack = new BlockingCollection<SomeType>(MyStack)

然后,您可以以线程安全的方式访问它,并为您正确完成所有阻塞。看这里

您可以通过调用 sharedStack.Take() 来使用 sharedStack,然后它会阻止获取,直到从堆栈中获取某些内容。


编辑:花了我一段时间(和两次尝试(,但我认为我已经解决了你的问题。

考虑一个空堆栈,其中有 3 个线程在等待事件。

调用 Add,堆栈有一个对象,并且允许一个线程通过事件。

立即再次调用"添加"。

第一个线程现在等待从 Add 获取锁。

Add 将第二个对象添加到堆栈中,并允许另一个线程通过事件。

现在堆栈上的两个对象和通过事件的 2 个线程,都在等待锁。

第一个获取线程现在需要锁定和弹出。看到堆栈上的一个对象静止并调用 SET。

通过事件允许第三个线程。

第二个 Get 线程现在锁定并弹出。在堆栈中看不到任何内容,也不会调用 set。

但。太迟了。第三个线程已经被允许通过,所以当第二个线程放弃锁时,第三个线程会尝试从空堆栈弹出并抛出。

不,您当前的代码毫无意义。目前,每次调用 Get 方法(.WaitOne调用(时都会阻塞线程。

您可能想要类似以下内容:
public class BlockingStack<T>
{
    private Stack<T> _internalStack;
    private AutoResetEvent _blockUntilAvailable;
    public BlockingStack()
    {
        _internalStack = new Stack<T>(5);
        _blockUntilAvailable = new AutoResetEvent(false);
    }
    public T Pop()
    {
        lock (_internalStack)
        {
            if (_internalStack.Count == 0)
                _blockUntilAvailable.WaitOne();
            return _internalStack.Pop();
        }
    }
    public void Push(T obj)
    {
        lock (_internalStack)
        {
            _internalStack.Push(obj);
            if(_internalStack.Count == 0)
                _blockUntilAvailable.Set();
        }
    }
}

这个想法是,如果_internalStack中的当前项数为 0,那么它应该等待来自 Push 方法的信号。一旦收到信号,它就会继续前进并从堆栈中弹出一个项目。

编辑:上面的代码有 2 个问题:

  1. 每当Pop.WaitOne块时,它都不会释放锁定 _internalStack,因此Push永远无法获得锁。

  2. 当在同一线程上多次调用Pop时,它们共享自动重置事件的相同初始状态 - 例如。推送信号 添加项目时AutoResetEvent。现在,当我弹出一个项目时,它第一次工作正常,因为实际上有一个项目 Stack .但是第二次,Stack没有价值,所以它通过在AutoResetEvent上调用.WaitOne来等待 - 但自从对Push的调用表示此事件,它将返回 true,并且没有像预期的那样等待。

一个(有效的(替代方案:

public class BlockingStack<T>
{
    private Stack<T> _internalStack;
    public BlockingStack()
    {
        _internalStack = new Stack<T>(5);
    }
    public T Pop()
    {
        lock (_internalStack)
        {
            if (_internalStack.Count == 0)
                Monitor.Wait(_internalStack);
            return _internalStack.Pop();
        }
    }
    public void Push(T obj)
    {
        lock (_internalStack)
        {
            _internalStack.Push(obj);
            Monitor.Pulse(_internalStack);
        }
    }
}

我没有验证基于Monitor的解决方案,但我确实编写了一个似乎有效的基于信号量的解决方案:

public class Semaphore
{
    private int _count;
    private int _maximum;
    private object _countGuard;
    public Semaphore(int maximum)
    {
        _count = 0;
        _maximum = maximum;
        _countGuard = new object();
    }
    public void WaitOne()
    {
        while (true)
        {
            lock (_countGuard)
            {
                if (_count < _maximum)
                {
                    _count++;
                    return;
                }
            }
            Thread.Sleep(50);
        }
    }
    public void ReleaseOne()
    {
        lock (_countGuard)
        {
            if (_count > 0)
            {
                _count--;
            }
        }
    }
}
public class BlockingStack
{
    private Stack<MyType> _internalStack;
    private Semaphore _blockUntilAvailable;
    public BlockingStack()
    {
        _internalStack = new Stack<MyType>(5);
        _blockUntilAvailable = new Semaphore(5);
        for (int i = 0; i < 5; ++i)
        {
            var obj = new MyType();
            Add(obj);
        }
    }
    public MyType Get()
    {
        _blockUntilAvailable.WaitOne();
        lock (_internalStack)
        {
            var obj = _internalStack.Pop();
            return obj;
        }
    }
    public void Add(MyType obj)
    {
        lock (_internalStack)
        {
            _internalStack.Push(obj);
            _blockUntilAvailable.ReleaseOne();
        }
    }
}