C#TPL以并行方式调用任务并异步创建新文件
本文关键字:新文件 创建 文件 异步 并行 方式 调用 任务 C#TPL | 更新日期: 2023-09-27 17:59:31
我正在努力学习TPL。我以类似这样的并行方式写入文件:
public async Task SaveToFilesAsync(string path, List<string> list, CancellationToken ct)
{
int count = 0;
foreach (var str in list)
{
string fullPath = path + @"'" + count.ToString() + "_element.txt";
using (var sw = File.CreateText(fullPath))
{
await sw.WriteLineAsync(str);
}
count++;
Log("Saved in thread: {0} to {1}",
Environment.CurrentManagedThreadId,
fullPath);
if (ct.IsCancellationRequested)
ct.ThrowIfCancellationRequested();
}
}
这样称呼它:
var tasks = new List<Task>();
try
{
tasks.Add(SaveToFilesAsync(path, myListOfStrings, cts.Token));
}
catch (Exception ex)
{
Log("Failed to save: " + ex.Message);
throw;
}
tasks.Add(MySecondFuncAsync(), cts.Token);
//...
tasks.Add(MyLastFuncAsync(), cts.Token);
try
{
//Or should I call await Task.WhenAll(tasks) ? What should I call here?
Task.WaitAll(tasks.ToArray());
}
catch (AggregateException ex)
{
foreach (var v in ex.InnerExceptions)
Error(ex.Message + " " + v.Message);
}
finally
{
cts.Dispose();
}
foreach (task in tasks)
{
// Now, how to print results from the tasks?
//Considering that all tasks return bool value,
//I need to do something like this:
if (task.Status != TaskStatus.Faulted)
Console.Writeline(task.Result);
else
Log("Error...");
}
我的目标是使所有函数(SaveToFilesAsync
、MySecondFuncAsync
)以并行方式同时运行,使用计算机上的所有核心并节省时间。但当我看到SaveToFilesAsync
的日志时,我意识到保存到文件总是发生在同一个线程中,而不是并行的。我做错了什么?第二个问题:如何从代码末尾的任务列表中的每个任务中获得Task.Result?如果第二个函数返回Task(bool),我如何在代码中获得bool值?此外,所有关于我的代码的评论都是非常受欢迎的,因为我是TPL的新员工。
您需要将foreach循环(从第一个项目到最后一个项目按顺序运行)替换为可配置为并行的Parallel.foreach()循环,或为您提供当前处理项目的索引的Parallel.for()循环。由于您需要对文件名使用计数器,因此您需要修改list参数以提供创建列表时填充的文件号,或者使用Parallel.for()提供的索引。另一种选择是使用一个长变量,您可以在创建文件名后对其执行Interlocked.Increment,但我不确定这是否是最佳的,我还没有尝试过。
下面是它的样子。
在try/catch中包装将调用SaveFilesAsync的代码,以处理通过CancellationTokenSource 取消的操作
var cts = new CancellationTokenSource();
try
{
Task.WaitAll(SaveFilesAsync(@"C:'Some'Path", files, cts.Token));
}
catch (Exception)
{
Debug.Print("SaveFilesAsync Exception");
}
finally
{
cts.Dispose();
}
然后用这种方法进行并行处理。
public async Task SaveFilesAsync(string path, List<string> list, CancellationToken token)
{
int counter = 0;
var options = new ParallelOptions
{
CancellationToken = token,
MaxDegreeOfParallelism = Environment.ProcessorCount,
TaskScheduler = TaskScheduler.Default
};
await Task.Run(
() =>
{
try
{
Parallel.ForEach(
list,
options,
(item, state) =>
{
// if cancellation is requested, this will throw an OperationCanceledException caught outside the Parallel loop
options.CancellationToken.ThrowIfCancellationRequested();
// safely increment and get your next file number
int index = Interlocked.Increment(ref counter);
string fullPath = string.Format(@"{0}'{1}_element.txt", path, index);
using (var sw = File.CreateText(fullPath))
{
sw.WriteLine(item);
}
Debug.Print(
"Saved in thread: {0} to {1}",
Thread.CurrentThread.ManagedThreadId,
fullPath);
});
}
catch (OperationCanceledException)
{
Debug.Print("Operation Canceled");
}
});
}
代码的另一部分不会更改,只需调整创建文件内容列表的位置即可。
编辑:调用SaveFileAsync方法的try/catch实际上什么都不做,它都是在SaveFileAsync。
试试这个:
public async Task SaveToFileAsync(string fullPath, line)
{
using (var sw = File.CreateText(fullPath))
{
await sw.WriteLineAsync(str);
}
Log("Saved in thread: {0} to {1}",
Environment.CurrentManagedThreadId,
fullPath);
}
public async Task SaveToFilesAsync(string path, List<string> list)
{
await Task.WhenAll(
list
.Select((line, i) =>
SaveToFileAsync(
string.Format(
@"{0}'{1}_element.txt",
path,
i),
line));
}
由于每个文件只写一行,并且您想将其全部并行化,我认为这是不可取消的。