使用具有多个任务/使用者的阻塞集合
本文关键字:使用者 集合 任务 | 更新日期: 2023-09-27 18:19:38
我有以下代码,我从源代码填充用户,为了示例起见,如下所示。我想做的是与多个消费者一起使用BlockingCollection。
下面是正确的方法吗?此外,线程的最佳数量是多少?好吧,这取决于硬件、内存等。或者我该如何做得更好?
下面的实现还能确保我处理集合中的所有内容,直到它为空吗?
class Program
{
public static readonly BlockingCollection<User> users = new BlockingCollection<User>();
static void Main(string[] args)
{
for (int i = 0; i < 100000; i++)
{
var u = new User {Id = i, Name = "user " + i};
users.Add(u);
}
Run();
}
static void Run()
{
for (int i = 0; i < 100; i++)
{
Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning);
}
}
static void Process()
{
foreach (var user in users.GetConsumingEnumerable())
{
Console.WriteLine(user.Id);
}
}
}
public class User
{
public int Id { get; set; }
public string Name { get; set; }
}
的一些小事情
- 您从未调用过CompleteAdding,如果不这样做,您消耗的foreach循环将永远不会完成并挂起。通过在初始
for
循环之后执行users.CompleteAdding()
来修复此问题 - 你从不等待工作完成,
Run()
会让你的100个线程加速(这可能太多了,除非你的真实过程需要大量等待无争议的资源)。因为任务不是前台线程,所以当Main
退出时,它们不会保持程序打开。您需要一个CountdownEvent来跟踪何时完成所有操作 - 在生产者完成所有工作之前,你不会启动消费者,你应该将生产者转移到一个单独的线程中,或者先启动消费者,这样当你在主线程上填充生产者时,他们就可以工作了
这是带有修复的代码的更新版本
class Program
{
private const int MaxThreads = 100; //way to high for this example.
private static readonly CountdownEvent cde = new CountdownEvent(MaxThreads);
public static readonly BlockingCollection<User> users = new BlockingCollection<User>();
static void Main(string[] args)
{
Run();
for (int i = 0; i < 100000; i++)
{
var u = new User {Id = i, Name = "user " + i};
users.Add(u);
}
users.CompleteAdding();
cde.Wait();
}
static void Run()
{
for (int i = 0; i < MaxThreads; i++)
{
Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning);
}
}
static void Process()
{
foreach (var user in users.GetConsumingEnumerable())
{
Console.WriteLine(user.Id);
}
cde.Signal();
}
}
public class User
{
public int Id { get; set; }
public string Name { get; set; }
}
对于我前面说过的"最佳线程数",这实际上取决于你在等待什么
如果您正在处理的是CPU限制,那么线程的最佳数量可能是Enviorment.ProcessorCount.
如果您正在等待外部资源,但新请求不会影响旧请求(例如,向20个不同的服务器询问信息,服务器n
上的负载不会影响服务器n+1
上的负载),在这种情况下,我会让Parallel.ForEach只为您选择线程数。
如果您正在等待被争用的资源(例如,对硬盘的读/写),您将希望根本不使用太多线程(甚至可能只使用一个)。我刚刚发布了另一个问题的答案,当从硬盘读取时,一次只能使用一个线程,这样硬盘就不会为了一次完成所有读取而到处乱跳。