具有大量文件IO任务的多线程
本文关键字:任务 多线程 IO 文件 | 更新日期: 2023-09-27 18:14:52
我对c#并不完全陌生,但我对这门语言还不够熟悉,不知道如何做我需要做的事情。
我有一个文件,叫做File1.txt。File1.txt大约有10万行。我将复制File1.txt并将其命名为file_untesting .txt。我还将创建一个空文件success .txt对于文件中的每一行:
- 从file1_untesting .txt中删除这一行
- 如果这行通过了测试,将其写入success .txt
所以,我的问题是,我怎么能多线程这个?
到目前为止,我的方法是创建一个对象(LineChecker),为对象提供要检查的行,并将对象传递到ThreadPool。我了解如何使用threadpool与CountdownEvent的几个任务。然而,一次将100,000个任务全部排队似乎是不合理的。我怎样才能逐渐地给水池喂食?可能一次1000行或者类似的。另外,我需要确保没有两个线程同时添加到success .txt或从file1_untesting .txt中删除。我可以用lock()来处理这个,对吧?我应该向lock()传递什么?我可以使用LineChecker的静态成员吗?
我只是想对如何设计这样的东西有一个大致的了解。
由于测试需要相当多的时间,因此使用多个CPU内核是有意义的。但是,这种利用应该只用于相对昂贵的测试,而不是用于读取/更新文件。这是因为读取/更新文件相对便宜。
下面是一些可以使用的示例代码:
假设你有一个相对昂贵的Test方法:
private bool Test(string line)
{
//This test is expensive
}
下面是一个可以利用多个CPU进行测试的代码示例:
这里我们将集合中的项数限制为10,以便正在从文件中读取的线程在从文件中读取更多行之前等待其他线程赶上。
这个输入线程的读取速度比其他线程的测试速度要快得多,所以在最坏的情况下,我们将比测试线程多读取10行。这确保我们有良好的内存消耗。
CancellationTokenSource cancellation_token_source = new CancellationTokenSource();
CancellationToken cancellation_token = cancellation_token_source.Token;
BlockingCollection<string> blocking_collection = new BlockingCollection<string>(10);
using (StreamReader reader = new StreamReader(new FileStream(filename, FileMode.Open, FileAccess.Read)))
{
using (
StreamWriter writer =
new StreamWriter(new FileStream(success_filename, FileMode.OpenOrCreate, FileAccess.Write)))
{
var input_task = Task.Factory.StartNew(() =>
{
try
{
while (!reader.EndOfStream)
{
if (cancellation_token.IsCancellationRequested)
return;
blocking_collection.Add(reader.ReadLine());
}
}
finally //In all cases, even in the case of an exception, we need to make sure that we mark that we have done adding to the collection so that the Parallel.ForEach loop will exit. Note that Parallel.ForEach will not exit until we call CompleteAdding
{
blocking_collection.CompleteAdding();
}
});
try
{
Parallel.ForEach(blocking_collection.GetConsumingEnumerable(), (line) =>
{
bool test_reault = Test(line);
if (test_reault)
{
lock (writer)
{
writer.WriteLine(line);
}
}
});
}
catch
{
cancellation_token_source.Cancel(); //If Paralle.ForEach throws an exception, we inform the input thread to stop
throw;
}
input_task.Wait(); //This will make sure that exceptions thrown in the input thread will be propagated here
}
}
如果您的"测试"很快,那么多线程不会给您带来任何好处,因为您的代码将是100%磁盘绑定的,并且假定您将所有文件放在同一个磁盘上:您无法通过多线程提高单个磁盘的吞吐量。
但是因为你的"测试"将等待一个web服务器的响应,这意味着测试将是缓慢的,所以有很多空间可以通过多线程来改进。基本上,您需要的线程数取决于web服务器可以同时处理多少请求而不会降低web服务器的性能。这个数字可能仍然很低,所以您可能最终不会获得任何东西,但至少您可以尝试。
如果你的文件不是很大,那么你可以一次读完它,然后一次写完它。如果每行只有80个字符长,那么这意味着你的文件只有8兆字节,这是微不足道的,所以你可以把所有的行读入一个列表,在列表上工作,产生另一个列表,最后写出整个列表。
这将允许您创建一个结构,例如,MyLine
,其中包含每行的索引和每行的文本,以便您可以在编写之前对所有行进行排序,这样您就不必担心来自服务器的乱序响应。
然后,你需要做的是像@Paul建议的那样使用BlockingCollection
这样的边界阻塞队列。
BlockingCollection
接受其最大容量作为构造函数参数。这意味着一旦达到它的最大容量,任何进一步添加它的尝试都将被阻止(调用者坐在那里等待),直到一些项目被删除。因此,如果您希望同时有多达10个挂起请求,您可以这样构造它:
var sourceCollection = new BlockingCollection<MyLine>(10);
主线程将用MyLine
对象填充sourceCollection
,并且您将有10个线程阻塞等待从集合中读取MyLine
。每个线程向服务器发送请求,等待响应,将结果保存到线程安全的resultCollection
中,并尝试从sourceCollection
中获取下一项。
你可以使用c#的async
特性来代替使用多线程,但是我对它们不是很熟悉,所以我不能准确地建议你如何做到这一点。
最后,将resultCollection
的内容复制到List
中,对列表进行排序,并写入输出文件。(复制到一个单独的List
可能是一个好主意,因为排序线程安全的resultCollection
将可能比排序非线程安全的List
要慢得多。我说可能)