等待的基于任务的队列

本文关键字:任务 队列 于任务 等待 | 更新日期: 2023-09-27 18:12:00

我想知道是否存在ConcurrentQueue的实现/包装器,类似于BlockingCollection,从集合中获取不会阻塞,而是异步的,并将导致异步等待,直到项目被放置在队列中。

我已经提出了我自己的实现,但它似乎不像预期的那样执行。我在想我是不是在重新发明一些已经存在的东西。

这是我的实现:

public class MessageQueue<T>
{
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
        new ConcurrentQueue<TaskCompletionSource<T>>();
    object queueSyncLock = new object();
    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }
    public async Task<T> Dequeue()
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }
    private void ProcessQueues()
    {
        TaskCompletionSource<T> tcs=null;
        T firstItem=default(T);
        while (true)
        {
            bool ok;
            lock (queueSyncLock)
            {
                ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem);
                if (ok)
                {
                    waitingQueue.TryDequeue(out tcs);
                    queue.TryDequeue(out firstItem);
                }
            }
            if (!ok) break;
            tcs.SetResult(firstItem);
        }
    }
}

等待的基于任务的队列

我不知道有没有无锁的解决方案,但是您可以看看新的Dataflow库,它是异步CTP的一部分。一个简单的BufferBlock<T>就足够了,例如:

BufferBlock<int> buffer = new BufferBlock<int>();

生产和消费最容易通过数据流块类型的扩展方法来完成。

生产过程很简单:

buffer.Post(13);

和消费是async-ready:

int item = await buffer.ReceiveAsync();

如果可能的话,我建议你使用Dataflow;要使这样的缓冲区既有效又正确,比最初看起来要困难得多。

c# 8.0 IAsyncEnumerable和Dataflow库的简单方法

// Instatiate an async queue
var queue = new AsyncQueue<int>();
// Then, loop through the elements of queue.
// This loop won't stop until it is canceled or broken out of
// (for that, use queue.WithCancellation(..) or break;)
await foreach(int i in queue) {
    // Writes a line as soon as some other Task calls queue.Enqueue(..)
    Console.WriteLine(i);
}

AsyncQueue的实现如下:

public class AsyncQueue<T> : IAsyncEnumerable<T>
{
    private readonly SemaphoreSlim _enumerationSemaphore = new SemaphoreSlim(1);
    private readonly BufferBlock<T> _bufferBlock = new BufferBlock<T>();
    public void Enqueue(T item) =>
        _bufferBlock.Post(item);
    public async IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken token = default)
    {
        // We lock this so we only ever enumerate once at a time.
        // That way we ensure all items are returned in a continuous
        // fashion with no 'holes' in the data when two foreach compete.
        await _enumerationSemaphore.WaitAsync();
        try {
            // Return new elements until cancellationToken is triggered.
            while (true) {
                // Make sure to throw on cancellation so the Task will transfer into a canceled state
                token.ThrowIfCancellationRequested();
                yield return await _bufferBlock.ReceiveAsync(token);
            }
        } finally {
            _enumerationSemaphore.Release();
        }
    }
}

现在有一个官方的方法来做到这一点:System.Threading.Channels。它被内置于。net core 3.0及更高版本(包括。net 5.0和6.0)的核心运行时中,但它也可以作为。net Standard 2.0和2.1的NuGet包使用。你可以在这里阅读文档。

var channel = System.Threading.Channels.Channel.CreateUnbounded<int>();

排队工作:

// This will succeed and finish synchronously if the channel is unbounded.
channel.Writer.TryWrite(42);

完成通道:

channel.Writer.TryComplete();

从通道读取:

var i = await channel.Reader.ReadAsync();

或者,如果你有。net Core 3.0或更高版本:

await foreach (int i in channel.Reader.ReadAllAsync())
{
    // whatever processing on i...
}

实现这一点的一个简单易行的方法是使用SemaphoreSlim:

public class AwaitableQueue<T>
{
    private SemaphoreSlim semaphore = new SemaphoreSlim(0);
    private readonly object queueLock = new object();
    private Queue<T> queue = new Queue<T>();
    public void Enqueue(T item)
    {
        lock (queueLock)
        {
            queue.Enqueue(item);
            semaphore.Release();
        }
    }
    public T WaitAndDequeue(TimeSpan timeSpan, CancellationToken cancellationToken)
    {
        semaphore.Wait(timeSpan, cancellationToken);
        lock (queueLock)
        {
            return queue.Dequeue();
        }
    }
    public async Task<T> WhenDequeue(TimeSpan timeSpan, CancellationToken cancellationToken)
    {
        await semaphore.WaitAsync(timeSpan, cancellationToken);
        lock (queueLock)
        {
            return queue.Dequeue();
        }
    }
}

这样做的美妙之处在于SemaphoreSlim处理了实现Wait()WaitAsync()功能的所有复杂性。缺点是队列长度由信号量和队列本身跟踪,它们都神奇地保持同步。

我的尝试(它有一个事件,当一个"promise"被创建时,它可以被外部生产者使用,以知道何时产生更多的项目):

public class AsyncQueue<T>
{
    private ConcurrentQueue<T> _bufferQueue;
    private ConcurrentQueue<TaskCompletionSource<T>> _promisesQueue;
    private object _syncRoot = new object();
    public AsyncQueue()
    {
        _bufferQueue = new ConcurrentQueue<T>();
        _promisesQueue = new ConcurrentQueue<TaskCompletionSource<T>>();
    }
    /// <summary>
    /// Enqueues the specified item.
    /// </summary>
    /// <param name="item">The item.</param>
    public void Enqueue(T item)
    {
        TaskCompletionSource<T> promise;
        do
        {
            if (_promisesQueue.TryDequeue(out promise) &&
                !promise.Task.IsCanceled &&
                promise.TrySetResult(item))
            {
                return;                                       
            }
        }
        while (promise != null);
        lock (_syncRoot)
        {
            if (_promisesQueue.TryDequeue(out promise) &&
                !promise.Task.IsCanceled &&
                promise.TrySetResult(item))
            {
                return;
            }
            _bufferQueue.Enqueue(item);
        }            
    }
    /// <summary>
    /// Dequeues the asynchronous.
    /// </summary>
    /// <param name="cancellationToken">The cancellation token.</param>
    /// <returns></returns>
    public Task<T> DequeueAsync(CancellationToken cancellationToken)
    {
        T item;
        if (!_bufferQueue.TryDequeue(out item))
        {
            lock (_syncRoot)
            {
                if (!_bufferQueue.TryDequeue(out item))
                {
                    var promise = new TaskCompletionSource<T>();
                    cancellationToken.Register(() => promise.TrySetCanceled());
                    _promisesQueue.Enqueue(promise);
                    this.PromiseAdded.RaiseEvent(this, EventArgs.Empty);
                    return promise.Task;
                }
            }
        }
        return Task.FromResult(item);
    }
    /// <summary>
    /// Gets a value indicating whether this instance has promises.
    /// </summary>
    /// <value>
    /// <c>true</c> if this instance has promises; otherwise, <c>false</c>.
    /// </value>
    public bool HasPromises
    {
        get { return _promisesQueue.Where(p => !p.Task.IsCanceled).Count() > 0; }
    }
    /// <summary>
    /// Occurs when a new promise
    /// is generated by the queue
    /// </summary>
    public event EventHandler PromiseAdded;
}

对于您的用例来说,这可能有点小题大做了(考虑到学习曲线),但是Reactive extensions提供了异步组合所需的所有胶水。

你基本上订阅了更改,当它们可用时它们会推送给你,你可以让系统在一个单独的线程上推送更改。

查看https://github.com/somdoron/AsyncCollection,你既可以异步解队列,也可以使用c# 8.0 IAsyncEnumerable。

API与BlockingCollection非常相似。

AsyncCollection<int> collection = new AsyncCollection<int>();
var t = Task.Run(async () =>
{
    while (!collection.IsCompleted)
    {
        var item = await collection.TakeAsync();
        // process
    }
});
for (int i = 0; i < 1000; i++)
{
    collection.Add(i);
}
collection.CompleteAdding();
t.Wait();
与IAsyncEnumeable:

AsyncCollection<int> collection = new AsyncCollection<int>();
var t = Task.Run(async () =>
{
    await foreach (var item in collection)
    {
        // process
    }
});
for (int i = 0; i < 1000; i++)
{
    collection.Add(i);
}
collection.CompleteAdding();
t.Wait();

这是我目前使用的实现。

public class MessageQueue<T>
{
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
        new ConcurrentQueue<TaskCompletionSource<T>>();
    object queueSyncLock = new object();
    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }
    public async Task<T> DequeueAsync(CancellationToken ct)
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        ct.Register(() =>
        {
            lock (queueSyncLock)
            {
                tcs.TrySetCanceled();
            }
        });
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }
    private void ProcessQueues()
    {
        TaskCompletionSource<T> tcs = null;
        T firstItem = default(T);
        lock (queueSyncLock)
        {
            while (true)
            {
                if (waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem))
                {
                    waitingQueue.TryDequeue(out tcs);
                    if (tcs.Task.IsCanceled)
                    {
                        continue;
                    }
                    queue.TryDequeue(out firstItem);
                }
                else
                {
                    break;
                }
                tcs.SetResult(firstItem);
            }
        }
    }
}

它工作得很好,但是在queueSyncLock上有很多争论,因为我使用CancellationToken来取消一些等待任务。当然,这导致了相当少的阻塞,我将看到一个BlockingCollection,但是……

我想知道是否有一种更平滑,无锁的方法来达到相同的目的

8年后,我遇到了这个问题,并准备实现nuget包/命名空间中的MS AsyncQueue<T>类:Microsoft.VisualStudio.Threading

感谢@Theodor Zoulias提到这个api可能已经过时了,DataFlow库将是一个很好的选择。

所以我编辑我的AsyncQueue<>实现使用BufferBlock<>。几乎一样,但效果更好。

我在asp.net Core后台线程中使用它,它是完全异步运行的。

protected async Task MyRun()
{
    BufferBlock<MyObj> queue = new BufferBlock<MyObj>();
    Task enqueueTask = StartDataIteration(queue);
    while (await queue.OutputAvailableAsync())
    {
        var myObj = queue.Receive();
        // do something with myObj
    }
}
public async Task StartDataIteration(BufferBlock<MyObj> queue)
{
    var cursor = await RunQuery();
    while(await cursor.Next()) { 
        queue.Post(cursor.Current);
    }
    queue.Complete(); // <<< signals the consumer when queue.Count reaches 0
}

我发现使用queue. outputavailableasync()修复了我使用AsyncQueue<>时遇到的问题——试图确定队列何时完成,而不必检查dequeue任务。

您可以使用BlockingCollection(使用默认的ConcurrentQueue)并将对Take的调用包装在Task中,以便您可以await它:

var bc = new BlockingCollection<T>();
T element = await Task.Run( () => bc.Take() );