线程池按顺序运行

本文关键字:运行 顺序 线程 | 更新日期: 2023-09-27 18:29:01

我正在编写一个函数,该函数获取目录中的所有文件,但通过将每个子目录添加到线程池来并行执行。我认为这意味着每个目录都将被并行遍历,并且由于有许多子目录,所以它会比按顺序遍历快得多。我的代码如下:

    private object addlock = new object();
    private void addFiles(string[] newFiles)
    {
        lock (addlock) {
            files.AddRange( newFiles );
            Console.WriteLine( "Added {0} files", newFiles.Length );
        }
    }
    private void getFilesParallel(string dir)
    {
        if (!Directory.Exists( dir )) {
            return;
        }
        string[] dirs = Directory.GetDirectories( dir, "*", SearchOption.TopDirectoryOnly );
        ManualResetEvent mre = new ManualResetEvent( false );
        ThreadPool.QueueUserWorkItem( (object obj) =>
        {
            addFiles( Directory.GetFiles( dir, "*", SearchOption.TopDirectoryOnly ) );
            mre.Set();
        } );
        Process currentProcess = Process.GetCurrentProcess();
        long memorySize = currentProcess.PrivateMemorySize64;
        Console.WriteLine( "Used {0}", memorySize );
        foreach (string str in dirs) {
            getFilesParallel( str );
        }
        mre.WaitOne();
    }

问题是我得到的输出是这样的:

Added 34510 files
Used 301420544
Added 41051 files
Used 313937920
Added 39093 files
Used 322764800
Added 44426 files
Used 342536192
Added 30772 files
Used 350728192
Added 36262 files
Used 360329216
Added 31686 files
Used 368685056
Added 33194 files
Used 374894592
Added 34486 files
Used 384057344
Added 37298 files
Used 393998336

这表明我的代码是按顺序运行的,正如我所期望的那样,当它们在不同的线程上运行时,会发现每个语句都是成块的。我已经使用不同的文件夹运行了好几次,结果总是一样的。为什么这是按顺序运行的?

线程池按顺序运行

您只有一个物理磁盘驱动器。磁盘的磁头一次只能位于一个位置。你同时要求它提供两条信息,这不允许它同时出现在两个地方。

程序中有一些少量的CPU绑定工作实际上可以并行化,但这不是主要的瓶颈。

如果您有多个物理磁盘驱动器,并且每个驱动器上都有数据,那么您可以并行访问每个驱动器上的数据,并且实际上可以并行完成有问题的工作。

要准确地对其进行基准测试有些困难,因为只要有足够的内存,第一次运行就会缓存数据,随后对同一文件夹的枚举可能根本不访问磁盘。

同样值得考虑的是,如果你有一个SSD,它将从并行操作中受益更多,因为它支持更多的IOPS,因为它没有任何移动部件需要等待。

这段代码显示,在我的四核i5上,当在SSD上运行时,或者当数据已经缓存时,并行速度可能比单线程快2-3倍。

它演示了Parallel.ForEach的使用,它可以减轻任务并行性带来的很多痛苦。

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Threading.Tasks;
namespace FilesReader
{
    class Program
    {
        static void Main(string[] args)
        {
            string path = args[0];
            RunTrial(path, false);
            RunTrial(path, true);
        }
        private static void RunTrial(string path, bool useParallel)
        {
            Console.WriteLine("Parallel: {0}", useParallel);
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();
            FileListing listing = new FileListing(path, useParallel);
            stopwatch.Stop();
            Console.WriteLine("Added {0} files in {1} ms ({2} file/second)", 
                listing.Files.Count, stopwatch.ElapsedMilliseconds, 
                (listing.Files.Count * 1000 / stopwatch.ElapsedMilliseconds));
        }
    }
    class FileListing
    {
        private ConcurrentList<string> _files;
        private bool _parallelExecution;
        public FileListing(string path, bool parallelExecution)
        {
            _parallelExecution = parallelExecution;
            _files = new ConcurrentList<string>();
            BuildListing(path);
        }
        public ConcurrentList<string> Files
        {
            get { return _files; }
        }
        private void BuildListing(string path)
        {
            string[] dirs = null;
            string[] files = null;
            bool success = false;
            try
            {
                dirs = Directory.GetDirectories(path, "*", SearchOption.TopDirectoryOnly);
                files = Directory.GetFiles(path);
                success = true;
            }
            catch (SystemException) { /* Suppress security exceptions etc*/ }
            if (success)
            {
                Files.AddRange(files);
                if (_parallelExecution)
                {
                    Parallel.ForEach(dirs, d => BuildListing(d));
                }
                else
                {
                    foreach (string dir in dirs)
                    {
                        BuildListing(dir);
                    }
                }
            }
        }
    }
    class ConcurrentList<T>
    {
        object lockObject = new object();
        List<T> list;
        public ConcurrentList()
        {
            list = new List<T>();
        }
        public void Add(T item)
        {
            lock (lockObject) list.Add(item);
        }
        public void AddRange(IEnumerable<T> collection)
        {
            lock (lockObject) list.AddRange(collection);
        }
        public long Count
        {
            get { lock (lockObject) return list.Count; }
        }
    }
}

我曾考虑使用Concurrent集合,而不是滚动线程安全列表实现,但事实证明它们的速度慢了5%左右。