如何创建一个FIFO/强信号量

本文关键字:FIFO 信号量 一个 何创建 创建 | 更新日期: 2023-09-27 18:01:18

我需要在c#中编写自己的FIFO/强信号量,使用我自己的信号量类作为基础。我发现了这个例子,但它不太正确,因为我不应该使用Monitor。进入/退出。

这些是我的常规信号量的方法,我想知道是否有一种简单的方法将其调整为FIFO。

public virtual void Acquire()
{
    lock (this)
    {
        while (uintTokens == 0)
        {
            Monitor.Wait(this);
        }
        uintTokens--;
    }
}
public virtual void Release(uint tokens = 1)
{
    lock (this)
    {
        uintTokens += tokens;
        Monitor.PulseAll(this);
    }
}

如何创建一个FIFO/强信号量

所以SemaphoreSlim给了我们一个很好的起点,所以我们将从在一个新类中包装其中一个开始,并将除了wait方法之外的所有内容定向到该信号量

为了获得类似队列的行为,我们需要一个队列对象,并确保它在面对多线程访问时是安全的,我们将使用ConcurrentQueue

在这个队列中我们将放置TaskCompletionSource对象。当我们想要开始等待时,它可以创建一个TCS,将其添加到队列中,然后通知信号量异步地从队列中弹出下一个项目,并在等待结束时将其标记为"完成"。我们将知道,总是有相等或更少数量的延续,因为队列中有项目。

然后我们就等着TCS的Task

我们也可以简单地创建一个返回任务的WaitAsync方法,通过直接返回它而不是等待它。

public class SemaphoreQueue
{
    private SemaphoreSlim semaphore;
    private ConcurrentQueue<TaskCompletionSource<bool>> queue =
        new ConcurrentQueue<TaskCompletionSource<bool>>();
    public SemaphoreQueue(int initialCount)
    {
        semaphore = new SemaphoreSlim(initialCount);
    }
    public SemaphoreQueue(int initialCount, int maxCount)
    {
        semaphore = new SemaphoreSlim(initialCount, maxCount);
    }
    public void Wait()
    {
        WaitAsync().Wait();
    }
    public Task WaitAsync()
    {
        var tcs = new TaskCompletionSource<bool>();
        queue.Enqueue(tcs);
        semaphore.WaitAsync().ContinueWith(t =>
        {
            TaskCompletionSource<bool> popped;
            if (queue.TryDequeue(out popped))
                popped.SetResult(true);
        });
        return tcs.Task;
    }
    public void Release()
    {
        semaphore.Release();
    }
}

我已经创建了一个FifoSemaphore类,我成功地在我的解决方案中使用了它。当前的限制是它的行为类似于信号量(1,1)。

public class FifoSemaphore
{
    private readonly object lockObj = new object();
    private List<Semaphore> WaitingQueue = new List<Semaphore>();

    private Semaphore RequestNewSemaphore()
    {
        lock (lockObj)
        {
            Semaphore newSemaphore = new Semaphore(1, 1);
            newSemaphore.WaitOne();
            return newSemaphore;
        }
    }
    #region Public Functions
    public void Release()
    {
        lock (lockObj)
        {
            WaitingQueue.RemoveAt(0);
            if (WaitingQueue.Count > 0)
            {
                WaitingQueue[0].Release();
            }
        }
    }
    public void WaitOne()
    {
        Semaphore semaphore = RequestNewSemaphore();
        lock (lockObj)
        {
            WaitingQueue.Add(semaphore);
            semaphore.Release();
            if(WaitingQueue.Count > 1)
            {
                semaphore.WaitOne();
            }
        }
        semaphore.WaitOne();
    }
    #endregion
}

的用法与常规信号量类似:

FifoSemaphore fifoSemaphore = new FifoSemaphore();

每个线程:

fifoSemaphore.WaitOne();
//do work
fifoSemaphore.Release();