生产者/消费者线程池支持主线程-很少死锁

本文关键字:线程 死锁 支持 消费者 生产者 | 更新日期: 2023-09-27 18:06:58

我有一个c#线程池类,它很大程度上基于https://stackoverflow.com/a/1656662/782181的生产者/消费者代码。注意:我这样做,而不是使用BlockingCollection,因为我坚持使用。net 2.0!

我在类中添加了一个可以从主线程调用的函数,以允许主线程做一些工作。我的想法是,在某些时候,主线程等待工作完成,但与其等待,我还可以让主线程做一些工作来加快速度。

下面是这个类的精简版来演示:

public static class SGThreadPool
{
    // Shared object to lock access to the queue between threads.
    private static object locker = new object();
    // The various threads that are doing our work.
    private static List<Thread> workers = null;
    // A queue of tasks to be completed by the workers.
    private static Queue<object> taskQueue = new Queue<object>();
    private static Queue<WaitCallback> taskCallbacks = new Queue<WaitCallback>();
    //OMMITTED: Init function (starts threads)
    // Enqueues a task for a thread to do.
    public static void EnqueueTask(WaitCallback callback, object context)
    {
        lock(locker)
        {
            taskQueue.Enqueue(context);
            taskCallbacks.Enqueue(callback);
            Monitor.PulseAll(locker); //Q: should I just use 'Pulse' here?
        }
    }
    // Can be called from main thread to have it "help out" with tasks.
    public static bool PerformTask()
    {
        WaitCallback taskCallback = null;
        object task = null;
        lock(locker)
        {
            if(taskQueue.Count > 0)
            {
                task = taskQueue.Dequeue();
            }
            if(taskCallbacks.Count > 0)
            {
                taskCallback = taskCallbacks.Dequeue();
            }
        }
        // No task means no work, return false.
        if(task == null || taskCallback == null) { return false; }
        // Do the work!
        taskCallback(task);
        return true;
    }
    private static void Consume()
    {
        while(true)
        {
            WaitCallback taskCallback = null;
            object task = null;
            lock(locker)
            {
                // While no tasks in the queue, wait.
                while(taskQueue.Count == 0)
                {
                    Monitor.Wait(locker);
                }
                // Get a task.
                task = taskQueue.Dequeue();
                taskCallback = taskCallbacks.Dequeue();
            }
            // Null task signals an exit.
            if(task == null || taskCallback == null) { return; }
            // Call consume callback with task as context.
            taskCallback(task);
        }
    }
}

基本上,我可以将一些任务排队给后台线程执行。但是主线程也可以通过调用PerformTask()来接受任务并执行它。

我遇到了一个罕见的问题,主线程将尝试在PerformTask()中"锁定",但锁已经被占用了。主线程在等待,但是由于某些原因,锁永远不会变得可用。

代码中没有任何问题引起死锁-我希望其他人可能能够发现问题。我一直在看这个几个小时,我不知道为什么主线程会卡在PerformTask()中的"lock()"调用。似乎没有其他线程会无限期地持有锁?允许主线程以这种方式与池交互是一个坏主意吗?

生产者/消费者线程池支持主线程-很少死锁

嗯,所以,虽然我仍然想知道为什么上面的代码在某些情况下会死锁,但我想我已经找到了一个解决方法。

如果主线程要在这里工作,我想确保主线程不会被阻塞很长一段时间。毕竟,一般的开发规则是:不要阻塞主线程!

所以,我正在尝试的解决方案是使用Monitor。直接尝试输入,而不是对主线程使用lock()。这允许我指定一个超时时间,主线程愿意等待多长时间的锁。

public static bool PerformTask()
{
    WaitCallback taskCallback = null;
    object task = null;
    // Use TryEnter, rather than "lock" because 
    // it allows us to specify a timeout as a failsafe.
    if(Monitor.TryEnter(locker, 500))
    {
        try 
        {
            // Pull a task from the queue.
            if(taskQueue.Count > 0)
            {
                task = taskQueue.Dequeue();
            }
            if(taskCallbacks.Count > 0)
            {
                taskCallback = taskCallbacks.Dequeue();
            }
        }
        finally
        {
            Monitor.Exit(locker);
        }
    }
    // No task means no work, return false.
    if(task == null || taskCallback == null) { return false; }
    // Do the work!
    taskCallback(task);
    return true;
}

在这段代码中,线程将等待获取锁的时间长达500ms。如果它因为某种原因不能,它就无法完成任何任务,但至少它不会被卡住。不把主线程放在可以无限期等待的位置似乎是个好主意。

我相信当你使用lock()时,编译器无论如何都会生成类似的代码,所以我认为这种解决方案不会有任何性能问题。