如何知道我的所有生产者-消费者工作何时完成

本文关键字:消费者 工作 何时完 生产者 何知道 我的 | 更新日期: 2023-09-27 18:21:19

我有Winforms应用程序,它读取几个网络文件夹并搜索这些文件夹中的文件,该功能接收List<stirng> folders:

private decimal _numberOfFiles;
private static List<string> _folders;
public delegate void OnFileAddDelegate(List<string> files);
public event OnFileAddDelegate OnFileAddEventHandler;
public delegate void OnFinishSearchDelegate();
public event OnFinishSearchDelegate OnFinishSearchEventHandler;
public void SearchFiles()
{
    foreach (string folder in _folders)
    {
        if (Directory.Exists(folder))
        {
            var files = Directory.EnumerateFiles(folder, "*.doc", SearchOption.TopDirectoryOnly)
                .OrderByDescending(x => new FileInfo(x).CreationTime).Take((int)_numberOfFiles).ToList<string>();
            if (OnFileAddEventHandler != null)
                OnFileAddEventHandler(files);
        }
    }
    if (OnFinishSearchEventHandler != null)
        OnFinishSearchEventHandler();
}

OnFileAddEventHandler(files)事件被激发后,我的ProducerConsumer类开始检查找到并执行工作的文件的List(如果文件正常,则将事件激发到我的主UI,将这些文件添加到我的ListView中):

public class ProducerConsumer
{
    public delegate void OnFileAddDelegate(PcapFileDetails pcapFileDetails);
    public event OnFileAddDelegate OnFileAddEventHandler;
    public delegate void AllFilesProcessedDelegate();
    public event AllFilesProcessedDelegate AllFilesProcessedEventHandler;
    private readonly Queue<string> _queue;
    private int counter;
    public ProducerConsumer(int workerCount, IEnumerable<string> list)
    {
        _isSearchFinished = true;
        _queue = new Queue<string>(list); // fill the queue
        counter = _queue.Count; // set up counter
        for (int i = 0; i < workerCount; i++)
            Task.Factory.StartNew(Consumer);
    }
    private void Consumer()
    {
        FileChecker fileChecker = new FileChecker();
        for (; ; )
        {
            string file;
            lock (_queue)
            {
                // synchronize on the queue
                if (_queue.Count == 0) return;  // we are done
                file = _queue.Dequeue(); // get file name to process
            } // release the lock to allow other consumers to access the queue
            // do the job
            string result = fileChecker.Check(file); // Check my file
            if (OnFileAddEventHandler != null && result ) // In case my file OK, fired up event to my main UI
                OnFileAddEventHandler(file);
            // decrement the counter
            if (Interlocked.Decrement(ref counter) != 0)
                continue; // not the last
            // all done - we were the last
            if (AllFilesProcessedEventHandler != null)
                AllFilesProcessedEventHandler();
            return;
        }
    }
}

现在,在这个搜索过程中,我的UI被锁定,以防止不必要的点击,我想知道我的所有文件夹何时完成搜索以解锁。但我的问题是,因为我正在搜索几个文件夹,事件AllFilesProcessedEventHandler()启动了好几次,我想知道我的所有搜索何时完成。

如何知道我的所有生产者-消费者工作何时完成

以下是QuickIO.Net 的递归示例

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using SchwabenCode.QuickIO;
namespace ConsoleApplication3
{
    internal class Program
    {
        private static readonly BlockingCollection<QuickIOFileInfo> fileInfos = new BlockingCollection<QuickIOFileInfo>();
        private static void Main(string[] args)
        {
            var task = Task.Factory.StartNew(() =>
            {
                Int32 totalSize = 0;
                Parallel.ForEach(fileInfos.GetConsumingEnumerable(), fi =>
                {
                    Interlocked.Add(ref totalSize, (int)fi.Bytes);
                });
                Console.WriteLine("All docs bytes amount to {0}", totalSize);
            });
            ProcessDirectory("C:''");
            fileInfos.CompleteAdding();
            Task.WaitAll(task);
        }
        private static void ProcessDirectory(string path)
        {
            Parallel.ForEach(QuickIODirectory.EnumerateDirectories(path), dir =>
            {
                try
                {
                    Parallel.ForEach(QuickIODirectory.EnumerateFiles(dir), file =>
                    {
                        if (file.AsFileInfo().Extension.Equals(".docx"))
                            fileInfos.Add(file);
                    });
                    ProcessDirectory(dir.FullName);
                }
                catch (Exception e)
                {
                    Console.WriteLine("Unable to access directory {0}", dir.FullName);
                }
            });
        }
    }
}

当添加完所有元素后,阻塞集合将通过调用CompleteAdding()自动向Parallel ForEach发出信号。

要扫描256GB SSD,还剩74GB,总共738k+个文件需要16.8秒。