正在读取大量文件“;同时”;
本文关键字:同时 文件 读取 | 更新日期: 2023-09-27 18:25:25
我使用FileSystemWatcher
是为了捕获文件夹中任何文件上的每个created
、changed
、deleted
和renamed
更改。
通过这些更改,我需要对这些文件的内容执行简单的校验和。简单地说,我打开一个文件流并将其传递给MD5类:
private byte[] calculateChecksum(string frl)
{
using (FileStream stream = File.Open(frl, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
{
return this.md5.ComputeHash(stream);
}
}
问题在于我需要处理的文件数量。例如,假设我在一个文件夹中创建了200个文件,然后我将所有文件复制并粘贴到同一个文件夹上。此操作将导致200个事件和200个calculateChecksum()
执行。
我该如何解决这类问题?
在FileSystemWatcher
中,处理程序将任务放入将由某个工作人员处理的队列中。Worker可以以目标速度或/和频率处理校验和计算任务。也许一个工作人员会更好,因为许多读者可以通过多次阅读来减慢hdd的速度。
尝试阅读有关BlockingCollection的信息:https://msdn.microsoft.com/ru-ru/library/dd997371(v=vs.110).aspx
和生产者-消费者数据流模式https://msdn.microsoft.com/ru-ru/library/hh228601(v=vs.110).aspx
var workerCount = 2;
BlockingCollection<String>[] filesQueues= new BlockingCollection<String>[workerCount];
for(int i = 0; i < workerCount; i++)
{
filesQueues[i] = new BlockingCollection<String>(500);
// Worker
Task.Run(() =>
{
while (!filesQueues[i].IsCompleted)
{
string url;
try
{
url= filesQueues[i].Take();
}
catch (InvalidOperationException) { }
if (!string.IsNullOrWhiteSpace(url))
{
calculateChecksum(url);
}
}
}
}
//FileSystemWatcher处理程序内部
var queueIndex = hash(filename) % workersCount
// Warning!!
// Blocks if numbers.Count == dataItems.BoundedCapacity
filesQueues[queueIndex].Add(fileName);
filesQueues[queueIndex].CompleteAdding();
此外,您还可以创建多个消费者,只需同时调用Take或TryTake-每个商品将只由一个消费者消费。但考虑到在这种情况下,一个文件可以由许多工作人员处理,而多个hdd读取器可能会减慢hdd的速度。
如果有多个工作人员,则最好制作多个BlockingCollections,并使用索引将文件推送到队列中:
我已经创建了一个消费者生产者模式来解决这个问题,并且我尝试使用线程池来平滑大量的工作,共享BlockingCollection
阻止集合&线程池:
private BlockingCollection<Index.ResourceIndexDocument> documents;
this.pool = new SmartThreadPool(SmartThreadPool.DefaultIdleTimeout, 4);
this.documents = new BlockingCollection<string>();
正如您所看到的,我已经创建了一个I treadPool,将并发性设置为4。因此,无论池中是否有x > 4
工作的单元要处理,在同一时间只有4个线程在工作。
生产者:
public void warn(string channel, string frl)
{
this.pool.QueueWorkItem<string, string>(
(file) => this.files.Add(file),
channel,
frl
);
}
消费者:
Task.Factory.StartNew(() =>
{
Index.ResourceIndexDocument document = null;
while (this.documents.TryTake(out document, TimeSpan.FromSeconds(1)))
{
IEnumerable<Index.ResourceIndexDocument> documents = this.documents.Take(this.documents.Count);
Index.IndexEngine.Instance.index(documents);
}
},
TaskCreationOptions.LongRunning
);