多线程:限制并发线程
本文关键字:并发 线程 多线程 | 更新日期: 2023-09-27 17:56:28
我需要开发一个使用多线程的应用程序。
基本上,我有一个包含大约 200k 行的数据表。从每一行中,我需要取一个字段,将其与网页进行比较,,然后将其从数据表中删除。
问题是,为这些页面提供服务的服务器对并发请求有限制。所以我最多可以同时要求 3 页。
我想通过使用线程池来做到这一点,我什至设法构建了一个简单的应用程序来做到这一点(锁定数据表)但我无法限制并发线程(即使使用 SetMaxThreads),似乎它只是忽略了限制。
有没有人有现成的东西可以做类似的事情?我很想看看。
我尝试使用信号量,但遇到了问题:
static SemaphoreSlim _sem = new SemaphoreSlim(3); // Capacity of 3
static List<string> records = new List<string>();
static void Main()
{
records.Add("aaa");
records.Add("bbb");
records.Add("ccc");
records.Add("ddd");
records.Add("eee");
records.Add("fff");
records.Add("ggg");
records.Add("iii");
records.Add("jjj");
for (int i = 0; i < records.Count; i++ )
{
new Thread(ThreadJob).Start(records[i]);
}
Console.WriteLine(records.Count);
Console.ReadLine();
}
static void ThreadJob(object id)
{
Console.WriteLine(id + " wants to enter");
_sem.Wait();
Console.WriteLine(id + " is in!"); // Only three threads
//Thread.Sleep(1000 * (int)id); // can be here at
Console.WriteLine(id + " is leaving"); // a time.
lock (records)
{
records.Remove((string)id);
}
_sem.Release();
}
这运行得很好,唯一的问题是,
Console.WriteLine(records.count);
返回不同的结果。即使到期,我也明白这是因为并非所有线程都已完成(在所有记录被删除之前调用 Records.Count),我找不到如何等待所有线程完成。
要等待多个线程完成,您可以使用多个EventWaitHandle
,然后调用 WaitHandle.WaitAll
来阻止主线程,直到所有事件都发出信号:
// we need to keep a list of synchronization events
var finishEvents = new List<EventWaitHandle>();
for (int i = 0; i < records.Count; i++ )
{
// for each job, create an event and add it to the list
var signal = new EventWaitHandle(false, EventResetMode.ManualReset);
finishEvents.Add(signal);
// we need to catch the id in a separate variable
// for the closure to work as expected
var id = records[i];
var thread = new Thread(() =>
{
// do the job
ThreadJob(id);
// signal the main thread
signal.Set();
});
}
WaitHandle.WaitAll(finishEvents.ToArray());
由于这些线程中的大多数最终会在大部分时间暂停,因此在这种情况下最好使用 ThreadPool
,因此您可以将new Thread
替换为:
ThreadPool.QueueUserWorkItem(s =>
{
ThreadJob(id);
signal.Set();
});
完成事件后,不要忘记处理它们:
foreach (var evt in finishEvents)
{
evt.Dispose();
}
[编辑]
为了将它们全部放在一个地方,下面是您的示例代码应该如下所示:
static Semaphore _sem = new Semaphore(3, 3); // Capacity of 3
static List<string> _records = new List<string>(new string[] { "aaa", "bbb", "ccc", "ddd", "eee", "fff", "ggg", "hhh" });
static void Main()
{
var finishEvents = new List<EventWaitHandle>();
for (int i = 0; i < _records.Count; i++)
{
var signal = new EventWaitHandle(false, EventResetMode.ManualReset);
finishEvents.Add(signal);
var id = _records[i];
var t = new Thread(() =>
{
ThreadJob(id);
signal.Set();
});
t.Start();
}
WaitHandle.WaitAll(finishEvents.ToArray());
Console.WriteLine(_records.Count);
Console.ReadLine();
}
static void ThreadJob(object id)
{
Console.WriteLine(id + " wants to enter");
_sem.WaitOne();
Console.WriteLine(id + " is in!");
Thread.Sleep(1000);
Console.WriteLine(id + " is leaving");
lock (_records)
{
_records.Remove((string)id);
}
_sem.Release();
}
(请注意,我使用了Semaphore
而不是SemaphoreSlim
,因为我在这台机器上没有 .NET 4,我想在更新答案之前测试代码)
为什么不使用并行扩展 - 这将使事情变得容易得多。
无论如何,您可能想看的是信号量之类的东西。一两个月前,我写了一篇关于这个主题的博客文章,你可能会觉得有用:https://colinmackay.scot/2011/03/30/using-semaphores-to-restrict-access-to-resources/
您可以使用信号量(如果您未.net 3.5
或
信号量苗条 .net 4.0
首先,应该 Console.WriteLine(id + " is reaving");不是再晚一点,在锁之后,就在它释放信号灯之前?
至于实际等待所有线程完成,从长远来看,Groo的答案看起来更好,更健壮,但作为这段特定代码的更快/更简单的解决方案,我认为您也可以通过调用来侥幸逃脱。Join() 按顺序在您要等待的所有线程上。
static List<Thread> ThreadList = new List<Thread>(); // To keep track of them
然后在启动线程时,将当前新的线程行替换为:
ThreadList.Add(new Thread(ThreadJob).Start(records[i]));
然后在 Console.WriteLine 之前:
foreach( Thread t in ThreadList )
{
t.Join();
}
如果任何线程没有终止,这将锁定,如果你想知道哪些线程还没有完成,这种方法将不起作用。