多线程文件处理
本文关键字:处理 文件 多线程 | 更新日期: 2023-09-27 18:22:18
我想优化这个代码:
public static void ProcessTo(this StreamReader sr, StreamWriter sw, Action<StreamWriter, string> action, FileProcessOptions fpo = null)
{
if (fpo == null)
{
fpo = new FileProcessOptions();
}
List<string> buffer = new List<string>(fpo.BuferSize);
while (!sr.EndOfStream)
{
buffer.Clear();
while (!sr.EndOfStream && buffer.Count < fpo.BuferSize)
{
buffer.Add(sr.ReadLine());
}
if (fpo.UseThreads)
{
buffer.AsParallel().ForAll(line => action(sw, line));
}
else
{
buffer.ForEach(line => action(sw, line));
}
}
}
我处理大量的数据,并希望将这个过程并行化。通常数据是存档的,因此使用多个线程来处理数据流是非常重要的。
如果不传递StreamReader
,而只传递文件名,则可以编写:
Parallel.Foreach(File.ReadLines(filename), (line) => action(sw, line));
如果你通过了StreamReader
,你仍然可以这样做。你只需要创建一个枚举器来读取它。类似于这里所做的:GetEnumerator()在实现IEnumerable<T>并且IEnumerator<T>。使用它,你会写:
LineReaderEnumerable myEnumerable = new LineEnumerator(sr);
Parallel.Foreach(myEnumerable, (line) => action(sw, line));
然而,您可能会遇到一个潜在的问题,因为您可能会有多个线程向该流编写器写入。并且StreamWriter
不支持并发写入。它将抛出一个异常。如果您正在同步对输出文件的访问(例如使用锁),那么您在这里就可以了。
你会遇到的另一个问题是事物的输出顺序。几乎可以肯定的是,如果按[1, 2, 3, 4, ... n]
的顺序读取行,则输出顺序会有所不同。你可能会得到[1, 2, 4, 3, 6, 5, 7, 9, 8 ... n, n-1]
。如果输出顺序很重要,那么您必须想出一种方法来确保按正确的顺序输出。
关于锁,您有:
sr.ProcessParalel(line =>
{
string[] ls = line.Split(''t');
lock (sw)
{
sw.Write(float.Parse(ls[0]));
sw.Write(int.Parse(ls[1]) * 10 + 1);
for (int i = 2; i < ls.Length; i++)
{
sw.Write(int.Parse(ls[1]));
}
}
});
问题不在于锁。问题是你锁的时间太长了。按照您的编写方式,代码实际上是单线程的,因为所有线程都在等待该锁来进行处理。您需要更改处理,以便锁定的时间尽可能短。
将输出构建为StringBuilder
,将其转换为字符串,然后输出该字符串。例如:
string[] ls = line.Split(''t');
StringBuilder sb = new StringBuilder();
sb.Append(float.Parse(ls[0]));
sb.Append(' ');
sb.Append(int.Parse(ls[1])) * 10 + 1);
for (int i = 2; i < ls.Length; i++)
{
sb.Append(' ');
sb.Append(int.Parse(ls[i])); }
}
var sout = sb.ToString();
// lock and write
lock (sw)
{
sw.Write(sout);
}
使用StringWriter也可以做同样的事情。
最终解决方案:
public static IEnumerable<string> GetEnumirator(this StreamReader sr)
{
while (!sr.EndOfStream)
{
yield return sr.ReadLine();
}
}
public static void ProcessParalel(this StreamReader sr, Action<string> action)
{
sr.GetEnumirator().AsParallel().ForAll(action);
}
public static void ProcessTo(this StreamReader sr, BinaryWriter bw, Action<BinaryWriter, string> action, FileProcessOptions fpo = null)
{
sr.ProcessParalel(line =>
{
using (MemoryStream ms = new MemoryStream())
{
BinaryWriter lbw = new BinaryWriter(ms);
action(lbw, line);
ms.Seek(0, SeekOrigin.Begin);
lock (bw)
{
ms.WriteTo(bw.BaseStream);
}
}
});
}
在压缩输入流的情况下,我得到了3倍的加速度