使用具有多个任务/使用者的阻塞集合

本文关键字:使用者 集合 任务 | 更新日期: 2023-09-27 18:19:38

我有以下代码,我从源代码填充用户,为了示例起见,如下所示。我想做的是与多个消费者一起使用BlockingCollection。

下面是正确的方法吗?此外,线程的最佳数量是多少?好吧,这取决于硬件、内存等。或者我该如何做得更好?

下面的实现还能确保我处理集合中的所有内容,直到它为空吗?

    class Program
    {
        public static readonly BlockingCollection<User> users = new BlockingCollection<User>();
        static void Main(string[] args)
        {
            for (int i = 0; i < 100000; i++)
            {
                var u = new User {Id = i, Name = "user " + i};
                users.Add(u);
            }
            Run(); 
        }
        static void Run()
        {
            for (int i = 0; i < 100; i++)
            {
                Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning);
            }
        }
        static void Process()
        {
            foreach (var user in users.GetConsumingEnumerable())
            {
                Console.WriteLine(user.Id);
            }
        }
    }
    public class User
    {
        public int Id { get; set; }
        public string Name { get; set; }
    }

使用具有多个任务/使用者的阻塞集合

的一些小事情

  1. 您从未调用过CompleteAdding,如果不这样做,您消耗的foreach循环将永远不会完成并挂起。通过在初始for循环之后执行users.CompleteAdding()来修复此问题
  2. 你从不等待工作完成,Run()会让你的100个线程加速(这可能太多了,除非你的真实过程需要大量等待无争议的资源)。因为任务不是前台线程,所以当Main退出时,它们不会保持程序打开。您需要一个CountdownEvent来跟踪何时完成所有操作
  3. 在生产者完成所有工作之前,你不会启动消费者,你应该将生产者转移到一个单独的线程中,或者先启动消费者,这样当你在主线程上填充生产者时,他们就可以工作了

这是带有修复的代码的更新版本

class Program
{
    private const int MaxThreads = 100; //way to high for this example.
    private static readonly CountdownEvent cde = new CountdownEvent(MaxThreads);
    public static readonly BlockingCollection<User> users = new BlockingCollection<User>();
    static void Main(string[] args)
    {
        Run(); 
        for (int i = 0; i < 100000; i++)
        {
            var u = new User {Id = i, Name = "user " + i};
            users.Add(u);
        }
        users.CompleteAdding();
        cde.Wait();
    }
    static void Run()
    {
        for (int i = 0; i < MaxThreads; i++)
        {
            Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning);
        }
    }
    static void Process()
    {
        foreach (var user in users.GetConsumingEnumerable())
        {
            Console.WriteLine(user.Id);
        }
        cde.Signal();
    }
}
public class User
{
    public int Id { get; set; }
    public string Name { get; set; }
}

对于我前面说过的"最佳线程数",这实际上取决于你在等待什么

如果您正在处理的是CPU限制,那么线程的最佳数量可能是Enviorment.ProcessorCount.

如果您正在等待外部资源,但新请求不会影响旧请求(例如,向20个不同的服务器询问信息,服务器n上的负载不会影响服务器n+1上的负载),在这种情况下,我会让Parallel.ForEach只为您选择线程数。

如果您正在等待被争用的资源(例如,对硬盘的读/写),您将希望根本不使用太多线程(甚至可能只使用一个)。我刚刚发布了另一个问题的答案,当从硬盘读取时,一次只能使用一个线程,这样硬盘就不会为了一次完成所有读取而到处乱跳。