多线程:限制并发线程

本文关键字:并发 线程 多线程 | 更新日期: 2023-09-27 17:56:28

我需要开发一个使用多线程的应用程序。

基本上,我有一个包含大约 200k 行的数据表。从每一行中,我需要取一个字段,将其与网页进行比较,,然后将其从数据表中删除。

问题是,为这些页面提供服务的服务器对并发请求有限制。所以我最多可以同时要求 3 页。

我想通过使用线程池来做到这一点,我什至设法构建了一个简单的应用程序来做到这一点(锁定数据表)但我无法限制并发线程(即使使用 SetMaxThreads),似乎它只是忽略了限制。

有没有人有现成的东西可以做类似的事情?我很想看看。

我尝试使用信号量,但遇到了问题:

        static SemaphoreSlim _sem = new SemaphoreSlim(3);    // Capacity of 3
    static List<string> records = new List<string>();
    static void Main()
    {
        records.Add("aaa");
        records.Add("bbb");
        records.Add("ccc");
        records.Add("ddd");
        records.Add("eee");
        records.Add("fff");
        records.Add("ggg");
        records.Add("iii");
        records.Add("jjj");
        for (int i = 0; i < records.Count; i++ )
        {
            new Thread(ThreadJob).Start(records[i]);
        }
        Console.WriteLine(records.Count);
        Console.ReadLine();
    }
    static void ThreadJob(object id)
    {
        Console.WriteLine(id + " wants to enter");
        _sem.Wait();
        Console.WriteLine(id + " is in!");           // Only three threads
        //Thread.Sleep(1000 * (int)id);               // can be here at
        Console.WriteLine(id + " is leaving");       // a time.
        lock (records)
        {
            records.Remove((string)id);
        }
        _sem.Release();
    }

这运行得很好,唯一的问题是,

Console.WriteLine(records.count);

返回不同的结果。即使到期,我也明白这是因为并非所有线程都已完成(在所有记录被删除之前调用 Records.Count),我找不到如何等待所有线程完成。

多线程:限制并发线程

要等待多个线程完成,您可以使用多个EventWaitHandle,然后调用 WaitHandle.WaitAll 来阻止主线程,直到所有事件都发出信号:

// we need to keep a list of synchronization events
var finishEvents = new List<EventWaitHandle>();
for (int i = 0; i < records.Count; i++ )
{
    // for each job, create an event and add it to the list
    var signal = new EventWaitHandle(false, EventResetMode.ManualReset);
    finishEvents.Add(signal);
    // we need to catch the id in a separate variable
    // for the closure to work as expected
    var id = records[i];
    var thread = new Thread(() =>
        {
            // do the job
            ThreadJob(id);
            // signal the main thread
            signal.Set();
        });
}
WaitHandle.WaitAll(finishEvents.ToArray());

由于这些线程中的大多数最终会在大部分时间暂停,因此在这种情况下最好使用 ThreadPool,因此您可以将new Thread替换为:

    ThreadPool.QueueUserWorkItem(s =>
    {
        ThreadJob(id);
        signal.Set();
    });

完成事件后,不要忘记处理它们:

foreach (var evt in finishEvents)
{
    evt.Dispose();
}

[编辑]

为了将它们全部放在一个地方,下面是您的示例代码应该如下所示:

static Semaphore _sem = new Semaphore(3, 3);    // Capacity of 3
static List<string> _records = new List<string>(new string[] { "aaa", "bbb", "ccc", "ddd", "eee", "fff", "ggg", "hhh" });
static void Main()
{
    var finishEvents = new List<EventWaitHandle>();
    for (int i = 0; i < _records.Count; i++)
    {
        var signal = new EventWaitHandle(false, EventResetMode.ManualReset);
        finishEvents.Add(signal);
        var id = _records[i];
        var t = new Thread(() =>
        {
            ThreadJob(id);
            signal.Set();
        });
        t.Start();
    }
    WaitHandle.WaitAll(finishEvents.ToArray());
    Console.WriteLine(_records.Count);
    Console.ReadLine();
}
static void ThreadJob(object id)
{
    Console.WriteLine(id + " wants to enter");
    _sem.WaitOne();
    Console.WriteLine(id + " is in!");
    Thread.Sleep(1000);
    Console.WriteLine(id + " is leaving");
    lock (_records)
    {
        _records.Remove((string)id);
    }
    _sem.Release();
}

(请注意,我使用了Semaphore而不是SemaphoreSlim,因为我在这台机器上没有 .NET 4,我想在更新答案之前测试代码)

为什么不使用并行扩展 - 这将使事情变得容易得多。

无论如何,您可能想看的是信号量之类的东西。一两个月前,我写了一篇关于这个主题的博客文章,你可能会觉得有用:https://colinmackay.scot/2011/03/30/using-semaphores-to-restrict-access-to-resources/

您可以使用信号量(如果您未.net 3.5

信号量苗条 .net 4.0

首先,应该 Console.WriteLine(id + " is reaving");不是再晚一点,在锁之后,就在它释放信号灯之前?

至于实际等待所有线程完成,从长远来看,Groo的答案看起来更好,更健壮,但作为这段特定代码的更快/更简单的解决方案,我认为您也可以通过调用来侥幸逃脱。Join() 按顺序在您要等待的所有线程上。

static List<Thread> ThreadList = new List<Thread>(); // To keep track of them

然后在启动线程时,将当前新的线程行替换为:

ThreadList.Add(new Thread(ThreadJob).Start(records[i]));

然后在 Console.WriteLine 之前:

foreach( Thread t in ThreadList )
{
    t.Join();
}

如果任何线程没有终止,这将锁定,如果你想知道哪些线程还没有完成,这种方法将不起作用。