正在读取大量文件“;同时”;

本文关键字:同时 文件 读取 | 更新日期: 2023-09-27 18:25:25

我使用FileSystemWatcher是为了捕获文件夹中任何文件上的每个createdchangeddeletedrenamed更改。

通过这些更改,我需要对这些文件的内容执行简单的校验和。简单地说,我打开一个文件流并将其传递给MD5类:

private byte[] calculateChecksum(string frl)
{
    using (FileStream stream = File.Open(frl, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
    {
        return this.md5.ComputeHash(stream);
    }
}

问题在于我需要处理的文件数量。例如,假设我在一个文件夹中创建了200个文件,然后我将所有文件复制并粘贴到同一个文件夹上。此操作将导致200个事件和200个calculateChecksum()执行。

我该如何解决这类问题?

正在读取大量文件“;同时”;

FileSystemWatcher中,处理程序将任务放入将由某个工作人员处理的队列中。Worker可以以目标速度或/和频率处理校验和计算任务。也许一个工作人员会更好,因为许多读者可以通过多次阅读来减慢hdd的速度。

尝试阅读有关BlockingCollection的信息:https://msdn.microsoft.com/ru-ru/library/dd997371(v=vs.110).aspx

和生产者-消费者数据流模式https://msdn.microsoft.com/ru-ru/library/hh228601(v=vs.110).aspx

var workerCount = 2;
BlockingCollection<String>[] filesQueues= new BlockingCollection<String>[workerCount];
for(int i = 0; i < workerCount; i++)
{
    filesQueues[i] = new BlockingCollection<String>(500);
    // Worker
    Task.Run(() => 
    {
        while (!filesQueues[i].IsCompleted)
        {
            string url;
            try
            {
                url= filesQueues[i].Take();
            }
            catch (InvalidOperationException) { }
            if (!string.IsNullOrWhiteSpace(url))
            {
                calculateChecksum(url);
            }
        }
    }
}

//FileSystemWatcher处理程序内部

    var queueIndex = hash(filename) % workersCount
    // Warning!!
    // Blocks if numbers.Count == dataItems.BoundedCapacity
    filesQueues[queueIndex].Add(fileName);
    filesQueues[queueIndex].CompleteAdding();

此外,您还可以创建多个消费者,只需同时调用Take或TryTake-每个商品将只由一个消费者消费。但考虑到在这种情况下,一个文件可以由许多工作人员处理,而多个hdd读取器可能会减慢hdd的速度。

如果有多个工作人员,则最好制作多个BlockingCollections,并使用索引将文件推送到队列中:

我已经创建了一个消费者生产者模式来解决这个问题,并且我尝试使用线程池来平滑大量的工作,共享BlockingCollection

阻止集合&线程池:

private BlockingCollection<Index.ResourceIndexDocument> documents;
this.pool = new SmartThreadPool(SmartThreadPool.DefaultIdleTimeout, 4);
this.documents = new BlockingCollection<string>();

正如您所看到的,我已经创建了一个I treadPool,将并发性设置为4。因此,无论池中是否有x > 4工作的单元要处理,在同一时间只有4个线程在工作。

生产者

public void warn(string channel, string frl)
{
    this.pool.QueueWorkItem<string, string>(
        (file) => this.files.Add(file),
        channel,
        frl
    );
}

消费者

Task.Factory.StartNew(() =>
{
    Index.ResourceIndexDocument document = null;
    while (this.documents.TryTake(out document, TimeSpan.FromSeconds(1)))
    {
        IEnumerable<Index.ResourceIndexDocument> documents = this.documents.Take(this.documents.Count);
        Index.IndexEngine.Instance.index(documents);
    }
},
TaskCreationOptions.LongRunning
);