. net中线程队列实现的问题
本文关键字:问题 实现 队列 线程 net | 更新日期: 2023-09-27 18:05:23
我试图在。net中实现一个线程队列,但是当我通过测试运行它时,我遇到了一些麻烦。
实现允许放弃线程的一些复杂性,因为它强制只有一个线程将项目放入队列,并且只有一个线程将它们取出(这是设计的)。
问题是,有时,Take()会跳过一个项目,好像它从来没有存在过,在我的测试中,我将得到"预期:736,但是:737"。我在这段代码中看不到任何地方会出现这种效果;Put只会放置在最后一项之后(所以它不应该影响这个)。m_Head直接)和Take使用Interlocked。交换从头部取出物品。
这个实现是如何允许这个问题发生的?
实现:
using System;
using System.Threading;
#pragma warning disable 420
namespace Tychaia.Threading
{
public class TaskPipeline<T>
{
private int? m_InputThread;
private int? m_OutputThread;
private volatile TaskPipelineEntry<T> m_Head;
/// <summary>
/// Creates a new TaskPipeline with the current thread being
/// considered to be the input side of the pipeline. The
/// output thread should call Connect().
/// </summary>
public TaskPipeline()
{
this.m_InputThread = Thread.CurrentThread.ManagedThreadId;
this.m_OutputThread = null;
}
/// <summary>
/// Connects the current thread as the output of the pipeline.
/// </summary>
public void Connect()
{
if (this.m_OutputThread != null)
throw new InvalidOperationException("TaskPipeline can only have one output thread connected.");
this.m_OutputThread = Thread.CurrentThread.ManagedThreadId;
}
/// <summary>
/// Puts an item into the queue to be processed.
/// </summary>
/// <param name="value">Value.</param>
public void Put(T value)
{
if (this.m_InputThread != Thread.CurrentThread.ManagedThreadId)
throw new InvalidOperationException("Only the input thread may place items into TaskPipeline.");
// Walk the queued items until we find one that
// has Next set to null.
var head = this.m_Head;
while (head != null)
{
if (head.Next != null)
head = head.Next;
if (head.Next == null)
break;
}
if (head == null)
this.m_Head = new TaskPipelineEntry<T> { Value = value };
else
head.Next = new TaskPipelineEntry<T> { Value = value };
}
/// <summary>
/// Takes the next item from the pipeline, or blocks until an item
/// is recieved.
/// </summary>
/// <returns>The next item.</returns>
public T Take()
{
if (this.m_OutputThread != Thread.CurrentThread.ManagedThreadId)
throw new InvalidOperationException("Only the output thread may retrieve items from TaskPipeline.");
// Wait until there is an item to take.
var spin = new SpinWait();
while (this.m_Head == null)
spin.SpinOnce();
// Return the item and exchange the current head with
// the next item, all in an atomic operation.
return Interlocked.Exchange(ref this.m_Head, this.m_Head.Next).Value;
}
}
}
#pragma warning restore 420
测试失败:
[Test]
public void TestPipelineParallelTo100()
{
var random = new Random();
var pipeline = new TaskPipeline<int>();
var success = true;
int expected = 0, actual = 0;
ThreadStart processor = () =>
{
pipeline.Connect();
for (int i = 0; i < 100; i++)
{
var v = pipeline.Take();
if (v != i)
{
success = false;
expected = i;
actual = v;
break;
}
Thread.Sleep(random.Next(1, 10));
}
};
var thread = new Thread(processor);
thread.Start();
for (int i = 0; i < 100; i++)
{
pipeline.Put(i);
Thread.Sleep(random.Next(1, 10));
}
thread.Join();
if (!success)
Assert.AreEqual(expected, actual);
}
如果在Take
中读取m_Head.Next
的值后将其赋值给Interlocked.Exchange(ref this.m_Head, this.m_Head.Next)
,则指针将丢失,因为访问它的唯一方法是通过m_Head
。
-
Take
读取m_Head.Next
(==null
) -
Put
写入m_Head.Next
(!=null
) -
Take
写入m_Head
(==null
)
Edit:这应该可以工作。我使用了一个非空的哨兵值和Interlocked.CompareExchange
,以确保Put
不会试图重用Take
已经删除的条目。
编辑2:调整到Take
.
编辑3:我相信如果确定的尾巴是Entry.Sentinel
,我仍然需要在Put
中添加goto retry;
。
using System;
using System.Threading;
#pragma warning disable 420
namespace Tychaia.Threading
{
public class TaskPipeline<T>
{
private int? m_InputThread;
private int? m_OutputThread;
private volatile Entry m_Head;
private sealed class Entry
{
public static readonly Entry Sentinel = new Entry(default(T));
public readonly T Value;
public Entry Next;
public Entry(T value)
{
Value = value;
Next = null;
}
}
/// <summary>
/// Creates a new TaskPipeline with the current thread being
/// considered to be the input side of the pipeline. The
/// output thread should call Connect().
/// </summary>
public TaskPipeline()
{
this.m_InputThread = Thread.CurrentThread.ManagedThreadId;
this.m_OutputThread = null;
}
/// <summary>
/// Connects the current thread as the output of the pipeline.
/// </summary>
public void Connect()
{
if (this.m_OutputThread != null)
throw new InvalidOperationException("TaskPipeline can only have one output thread connected.");
this.m_OutputThread = Thread.CurrentThread.ManagedThreadId;
}
/// <summary>
/// Puts an item into the queue to be processed.
/// </summary>
/// <param name="value">Value.</param>
public void Put(T value)
{
if (this.m_InputThread != Thread.CurrentThread.ManagedThreadId)
throw new InvalidOperationException("Only the input thread may place items into TaskPipeline.");
retry:
// Walk the queued items until we find one that
// has Next set to null.
var head = this.m_Head;
while (head != null)
{
if (head.Next != null)
head = head.Next;
if (head.Next == null)
break;
}
if (head == null)
{
if (Interlocked.CompareExchange(ref m_Head, new Entry(value), null) != null)
goto retry;
}
else
{
if (Interlocked.CompareExchange(ref head.Next, new Entry(value), null) != null)
goto retry;
}
}
/// <summary>
/// Takes the next item from the pipeline, or blocks until an item
/// is recieved.
/// </summary>
/// <returns>The next item.</returns>
public T Take()
{
if (this.m_OutputThread != Thread.CurrentThread.ManagedThreadId)
throw new InvalidOperationException("Only the output thread may retrieve items from TaskPipeline.");
// Wait until there is an item to take.
var spin = new SpinWait();
while (this.m_Head == null)
spin.SpinOnce();
// Return the item and exchange the current head with
// the next item, all in an atomic operation.
Entry head = m_Head;
retry:
Entry next = head.Next;
// replace m_Head.Next with a non-null sentinel to ensure Put won't try to reuse it
if (Interlocked.CompareExchange(ref head.Next, Entry.Sentinel, next) != next)
goto retry;
m_Head = next;
return head.Value;
}
}
}