具有BlockingCollection的线程池

本文关键字:线程 BlockingCollection 具有 | 更新日期: 2023-09-27 18:25:00

问题:有多个线程访问一个资源。我需要将它们的数量限制为一个常数MaxThreads。无法进入线程池的线程应该得到一条错误消息。

解决方案:我开始在下面的算法中使用BlockingCollection<string> pool,但我发现BlockingCollection需要调用CompleteAdding,但我做不到,因为我总是收到传入线程(出于调试目的,我在下面的示例中硬编码为10),比如web请求。

public class MyTest {
    private const int MaxThreads = 3;
    private BlockingCollection<string> pool;
    public MyTest() { 
        pool = new BlockingCollection<string>(MaxThreads);
    }
    public void Go() {
        var addSuccess = this.pool.TryAdd(string.Format("thread ID#{0}", Thread.CurrentThread.ManagedThreadId));
        if (!addSuccess) Console.WriteLine(string.Format("thread ID#{0}", Thread.CurrentThread.ManagedThreadId));
        Console.WriteLine(string.Format("Adding thread ID#{0}", Thread.CurrentThread.ManagedThreadId));
        Console.WriteLine(string.Format("Pool size: {0}", pool.Count));
        // simulate work
        Thread.Sleep(1000);
        Console.WriteLine("Thread ID#{0} " + Thread.CurrentThread.ManagedThreadId + " is done doing work.");
        string val;
        var takeSuccess = this.pool.TryTake(out val);
        if (!takeSuccess) Console.WriteLine(string.Format("Failed to take out thread ID#{0}", Thread.CurrentThread.ManagedThreadId));
        Console.WriteLine("Taking out " + val);
        Console.WriteLine(string.Format("Pool size: {0}", pool.Count));
        Console.WriteLine(Environment.NewLine);
    }
}
static void Main()
{
    var t = new MyTest();
    Parallel.For(0, 10, x => t.Go());
}

关于如何更好地实现这一点,有什么想法吗?

谢谢!

附言:这里的多线程新手,如果你对阅读材料有任何建议,我将不胜感激。

LE:根据我得到的答案,我能够使用以下算法实现所需的行为:

public class MyTest {
    private const int MaxThreads = 3;
    private SemaphoreSlim semaphore;
    public MyTest() { 
        semaphore = new SemaphoreSlim(MaxThreads, MaxThreads);
    }
    public void Go() {
        Console.WriteLine(string.Format("In comes thread ID#{0}", Thread.CurrentThread.ManagedThreadId));
        semaphore.Wait();
        try {
        Console.WriteLine(string.Format("Serving thread ID#{0}", Thread.CurrentThread.ManagedThreadId));
        // simulate work
        Thread.Sleep(1000);
        Console.WriteLine(string.Format("Out goes thread ID#{0}", Thread.CurrentThread.ManagedThreadId));
        }
        finally {
            semaphore.Release();
        }
    }
}
static void Main()
{
    var t = new MyTest();
    Parallel.For(0, 10, x=> t.Go());
}

具有BlockingCollection的线程池

如果您想保护一定数量的线程,这些线程一次可以访问关键区域,您必须使用Semaphore或SemaphoreSlim。我建议后一种,与前一种相比重量轻。

SemaphoreSlim的一个缺点是它们不能跨流程工作,但我们有Semaphore可以帮助。

您可以通过框架提供的带有超时的Wait方法之一来测试Semaphore是否已满。

SemaphoreSlim semaphore = new SemaphoreSlim(3, 3);
if (!semaphore.Wait(0))
{
    //Already semaphore full.
    //Handle it as you like
}

http://www.albahari.com/threading/是一个非常好的线程资源。