C#TPL以并行方式调用任务并异步创建新文件

本文关键字:新文件 创建 文件 异步 并行 方式 调用 任务 C#TPL | 更新日期: 2023-09-27 17:59:31

我正在努力学习TPL。我以类似这样的并行方式写入文件:

public async Task SaveToFilesAsync(string path, List<string> list, CancellationToken ct)
{
    int count = 0;
    foreach (var str in list)
    {
        string fullPath = path + @"'" + count.ToString() + "_element.txt";
        using (var sw = File.CreateText(fullPath))
        {
            await sw.WriteLineAsync(str);
        }
        count++;
        Log("Saved in thread: {0} to {1}", 
           Environment.CurrentManagedThreadId,
           fullPath);
        if (ct.IsCancellationRequested)
            ct.ThrowIfCancellationRequested();
    }
}

这样称呼它:

var tasks = new List<Task>();
try
{
    tasks.Add(SaveToFilesAsync(path, myListOfStrings, cts.Token));
}
catch (Exception ex)
{
    Log("Failed to save: " + ex.Message);
    throw;
}
tasks.Add(MySecondFuncAsync(), cts.Token);
//...
tasks.Add(MyLastFuncAsync(), cts.Token);
try
{
    //Or should I call await Task.WhenAll(tasks) ? What should I call here?
    Task.WaitAll(tasks.ToArray()); 
}
catch (AggregateException ex)
{
    foreach (var v in ex.InnerExceptions)
       Error(ex.Message + " " + v.Message);
}
finally
{
   cts.Dispose();
} 
foreach (task in tasks)
{
// Now, how to print results from the tasks? 
//Considering that all tasks return bool value, 
//I need to do something like this:
if (task.Status != TaskStatus.Faulted)
         Console.Writeline(task.Result);
else
         Log("Error...");
}

我的目标是使所有函数(SaveToFilesAsyncMySecondFuncAsync)以并行方式同时运行,使用计算机上的所有核心并节省时间。但当我看到SaveToFilesAsync的日志时,我意识到保存到文件总是发生在同一个线程中,而不是并行的。我做错了什么?第二个问题:如何从代码末尾的任务列表中的每个任务中获得Task.Result?如果第二个函数返回Task(bool),我如何在代码中获得bool值?此外,所有关于我的代码的评论都是非常受欢迎的,因为我是TPL的新员工。

C#TPL以并行方式调用任务并异步创建新文件

您需要将foreach循环(从第一个项目到最后一个项目按顺序运行)替换为可配置为并行的Parallel.foreach()循环,或为您提供当前处理项目的索引的Parallel.for()循环。由于您需要对文件名使用计数器,因此您需要修改list参数以提供创建列表时填充的文件号,或者使用Parallel.for()提供的索引。另一种选择是使用一个长变量,您可以在创建文件名后对其执行Interlocked.Increment,但我不确定这是否是最佳的,我还没有尝试过。

下面是它的样子。

在try/catch中包装将调用SaveFilesAsync的代码,以处理通过CancellationTokenSource 取消的操作

var cts = new CancellationTokenSource();
try
{
    Task.WaitAll(SaveFilesAsync(@"C:'Some'Path", files, cts.Token));
}
catch (Exception)
{
    Debug.Print("SaveFilesAsync Exception");
}
finally
{
    cts.Dispose();
}

然后用这种方法进行并行处理。

public async Task SaveFilesAsync(string path, List<string> list, CancellationToken token)
{
    int counter = 0;
    var options = new ParallelOptions
                      {
                          CancellationToken = token,
                          MaxDegreeOfParallelism = Environment.ProcessorCount,
                          TaskScheduler = TaskScheduler.Default
                      };
    await Task.Run(
        () =>
            {
                try
                {
                    Parallel.ForEach(
                        list,
                        options,
                        (item, state) =>
                            {
                                // if cancellation is requested, this will throw an OperationCanceledException caught outside the Parallel loop
                                options.CancellationToken.ThrowIfCancellationRequested();
                                // safely increment and get your next file number
                                int index = Interlocked.Increment(ref counter);
                                string fullPath = string.Format(@"{0}'{1}_element.txt", path, index);
                                using (var sw = File.CreateText(fullPath))
                                {
                                    sw.WriteLine(item);
                                }
                                Debug.Print(
                                    "Saved in thread: {0} to {1}",
                                    Thread.CurrentThread.ManagedThreadId,
                                    fullPath);
                            });
                }
                catch (OperationCanceledException)
                {
                    Debug.Print("Operation Canceled");
                }
            });
}

代码的另一部分不会更改,只需调整创建文件内容列表的位置即可。

编辑:调用SaveFileAsync方法的try/catch实际上什么都不做,它都是在SaveFileAsync。

试试这个:

public async Task SaveToFileAsync(string fullPath, line)
{
    using (var sw = File.CreateText(fullPath))
    {
        await sw.WriteLineAsync(str);
    }
    Log("Saved in thread: {0} to {1}", 
       Environment.CurrentManagedThreadId,
       fullPath);
}
public async Task SaveToFilesAsync(string path, List<string> list)
{
    await Task.WhenAll(
        list
            .Select((line, i) =>
                SaveToFileAsync(
                    string.Format(
                        @"{0}'{1}_element.txt",
                        path,
                        i),
                    line));
}

由于每个文件只写一行,并且您想将其全部并行化,我认为这是不可取消的。