我可以将while(true)循环转换为EventWaitHandle吗?

本文关键字:转换 EventWaitHandle 循环 while true 我可以 | 更新日期: 2023-09-27 18:31:30

我正在尝试通过将处理后的数据添加到BlockingCollection Parallel.ForEach来处理大量文本文件。

问题是我希望 taskWriteMergedFile Task使用集合并至少每 800000 行将它们写入结果文件。

我想我无法在迭代中测试集合大小,因为它是并行的,所以我创建了Task.

在这种情况下,我可以将任务中的 while(true) 循环转换为 EventWaitHandle 吗?

const int MAX_SIZE = 1000000;
static BlockingCollection<string> mergeData;
mergeData = new BlockingCollection<string>(new ConcurrentBag<string>(), MAX_SIZE);

string[] FilePaths = Directory.GetFiles("somepath");
var taskWriteMergedFile = new Task(() =>
{
    while ( true )
    {
        if ( mergeData.Count  > 800000)
        {
            String.Join(System.Environment.NewLine, mergeData.GetConsumingEnumerable());
            //Write to file
        }
        Thread.Sleep(10000); 
    }
}, TaskCreationOptions.LongRunning);
taskWriteMergedFile.Start();
Parallel.ForEach(FilePaths, FilePath => AddToDataPool(FilePath));
mergeData.CompleteAdding();

我可以将while(true)循环转换为EventWaitHandle吗?

你可能不想那样做。相反,让您的任务在收到文件时将每一行写入文件。如果要将文件大小限制为 80,000 行,则在写入第 80,000 行后,关闭当前文件并打开一个新文件。

想想看,您拥有的东西不起作用,因为GetConsumingEnumerable()在集合标记为完成以进行添加之前不会停止。将发生的情况是,该事物将通过睡眠循环,直到队列中有 80,000 个项目,然后它会阻塞String.Join,直到主线程调用CompleteAdding。如果有足够的数据,您的内存就会不足。

此外,除非您有很好的理由,否则您不应该在此处使用ConcurrentBag。只需使用 BlockingCollection 的默认值,即 ConcurrentQueueConcurrentBag是一种相当特殊的数据结构,其性能不如ConcurrentQueue

因此,您的任务变为:

var taskWriteMergedFile = new Task(() =>
{
    int recordCount = 0;
    foreach (var line in mergeData.GetConsumingEnumerable())
    {
        outputFile.WriteLine(line);
        ++recordCount;
        if (recordCount == 80,000)
        {
            // If you want to do something after 80,000 lines, do it here
            // and then reset the record count
            recordCount = 0;
        }
    }
}, TaskCreationOptions.LongRunning);

当然,前提是您已在其他地方打开了输出文件。最好在任务开始时打开输出,并在foreach退出后将其关闭。

另一方面,您可能不希望生产者循环是并行的。你有:

Parallel.ForEach(FilePaths, FilePath => AddToDataPool(FilePath));

我不确定AddToDataPool在做什么,但如果它是读取文件并将数据写入集合,那么您会遇到一些问题。首先,磁盘驱动器一次只能做一件事,因此它最终会读取一个文件的一部分,然后是另一个文件的一部分,然后是另一个文件的一部分,依此类推。为了读取下一个文件的每个块,它必须将磁头寻找到正确的位置。磁盘磁头寻道非常昂贵 - 5毫秒或更长时间。CPU 时间的永恒。除非您要执行比读取文件花费更长的繁重处理时间,否则您几乎总是最好一次处理一个文件。除非您可以保证输入文件位于单独的物理磁盘上。

第二个潜在问题是,在运行多个线程的情况下,无法保证将内容写入集合的顺序。当然,这可能不是问题,但是如果您希望单个文件中的所有数据在输出中组合在一起,则不会发生多个线程,每个线程将多行写入集合。

只是要记住的事情。