如何知道我的所有生产者-消费者工作何时完成
本文关键字:消费者 工作 何时完 生产者 何知道 我的 | 更新日期: 2023-09-27 18:21:19
我有Winforms
应用程序,它读取几个网络文件夹并搜索这些文件夹中的文件,该功能接收List<stirng> folders
:
private decimal _numberOfFiles;
private static List<string> _folders;
public delegate void OnFileAddDelegate(List<string> files);
public event OnFileAddDelegate OnFileAddEventHandler;
public delegate void OnFinishSearchDelegate();
public event OnFinishSearchDelegate OnFinishSearchEventHandler;
public void SearchFiles()
{
foreach (string folder in _folders)
{
if (Directory.Exists(folder))
{
var files = Directory.EnumerateFiles(folder, "*.doc", SearchOption.TopDirectoryOnly)
.OrderByDescending(x => new FileInfo(x).CreationTime).Take((int)_numberOfFiles).ToList<string>();
if (OnFileAddEventHandler != null)
OnFileAddEventHandler(files);
}
}
if (OnFinishSearchEventHandler != null)
OnFinishSearchEventHandler();
}
在OnFileAddEventHandler(files)
事件被激发后,我的ProducerConsumer
类开始检查找到并执行工作的文件的List
(如果文件正常,则将事件激发到我的主UI,将这些文件添加到我的ListView
中):
public class ProducerConsumer
{
public delegate void OnFileAddDelegate(PcapFileDetails pcapFileDetails);
public event OnFileAddDelegate OnFileAddEventHandler;
public delegate void AllFilesProcessedDelegate();
public event AllFilesProcessedDelegate AllFilesProcessedEventHandler;
private readonly Queue<string> _queue;
private int counter;
public ProducerConsumer(int workerCount, IEnumerable<string> list)
{
_isSearchFinished = true;
_queue = new Queue<string>(list); // fill the queue
counter = _queue.Count; // set up counter
for (int i = 0; i < workerCount; i++)
Task.Factory.StartNew(Consumer);
}
private void Consumer()
{
FileChecker fileChecker = new FileChecker();
for (; ; )
{
string file;
lock (_queue)
{
// synchronize on the queue
if (_queue.Count == 0) return; // we are done
file = _queue.Dequeue(); // get file name to process
} // release the lock to allow other consumers to access the queue
// do the job
string result = fileChecker.Check(file); // Check my file
if (OnFileAddEventHandler != null && result ) // In case my file OK, fired up event to my main UI
OnFileAddEventHandler(file);
// decrement the counter
if (Interlocked.Decrement(ref counter) != 0)
continue; // not the last
// all done - we were the last
if (AllFilesProcessedEventHandler != null)
AllFilesProcessedEventHandler();
return;
}
}
}
现在,在这个搜索过程中,我的UI被锁定,以防止不必要的点击,我想知道我的所有文件夹何时完成搜索以解锁。但我的问题是,因为我正在搜索几个文件夹,事件AllFilesProcessedEventHandler()
启动了好几次,我想知道我的所有搜索何时完成。
以下是QuickIO.Net 的递归示例
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using SchwabenCode.QuickIO;
namespace ConsoleApplication3
{
internal class Program
{
private static readonly BlockingCollection<QuickIOFileInfo> fileInfos = new BlockingCollection<QuickIOFileInfo>();
private static void Main(string[] args)
{
var task = Task.Factory.StartNew(() =>
{
Int32 totalSize = 0;
Parallel.ForEach(fileInfos.GetConsumingEnumerable(), fi =>
{
Interlocked.Add(ref totalSize, (int)fi.Bytes);
});
Console.WriteLine("All docs bytes amount to {0}", totalSize);
});
ProcessDirectory("C:''");
fileInfos.CompleteAdding();
Task.WaitAll(task);
}
private static void ProcessDirectory(string path)
{
Parallel.ForEach(QuickIODirectory.EnumerateDirectories(path), dir =>
{
try
{
Parallel.ForEach(QuickIODirectory.EnumerateFiles(dir), file =>
{
if (file.AsFileInfo().Extension.Equals(".docx"))
fileInfos.Add(file);
});
ProcessDirectory(dir.FullName);
}
catch (Exception e)
{
Console.WriteLine("Unable to access directory {0}", dir.FullName);
}
});
}
}
}
当添加完所有元素后,阻塞集合将通过调用CompleteAdding()自动向Parallel ForEach发出信号。
要扫描256GB SSD,还剩74GB,总共738k+个文件需要16.8秒。