C# Parallel.foreach - 使变量线程安全

本文关键字:变量 线程 安全 Parallel foreach | 更新日期: 2023-09-27 18:30:36

我一直在重写一些进程密集型循环以使用 TPL 来提高速度。这是我第一次尝试线程,所以想检查我正在做的事情是否正确。

结果很好 - 从标准foreach循环移动到Parallel.ForEach循环时,在DataTable中处理 1000 行的数据将处理时间从 34 分钟减少到 9 分钟。对于此测试,我删除了非线程安全操作,例如将数据写入日志文件和递增计数器。

我仍然需要写回日志文件并递增计数器,所以我尝试实现一个锁来包裹流编写器/增量代码块。

FileStream filestream = new FileStream("path_to_file.txt", FileMode.Create);
StreamWriter streamwriter = new StreamWriter(filestream);
streamwriter.AutoFlush = true;
try
{
    object locker = new object();
    // Lets assume we have a DataTable containing 1000 rows of data.
    DataTable datatable_results;
    if (datatable_results.Rows.Count > 0)
    {
        int row_counter = 0;
        Parallel.ForEach(datatable_results.AsEnumerable(), data_row =>
        { 
            // Process data_row as normal.
            // When ready to write to log, do so.
            lock (locker)
            {
                row_counter++;
                streamwriter.WriteLine("Processing row: {0}", row_counter);
                // Write any data we want to log.
            }
        });                    
    }
}
catch (Exception e)
{
    // Catch the exception.
}
streamwriter.Close();

上述方法似乎按预期工作,性能成本最低(仍为 9 分钟执行时间)。当然,锁中包含的操作本身并不重要 - 我假设随着在锁中处理代码所花费的时间增加,线程被锁定的时间越长,它对处理时间的影响就越大。

我的问题:上述方法是否是一种有效的方法,或者是否有一种更快或更安全的其他方法来实现上述目标?

另外,假设我们的原始DataTable实际上包含 30000 行。将这个DataTable拆分成每个 1000 行的块,然后在Parallel.ForEach中处理它们,而不是一次性处理所有 300000 行,有什么好处吗?

C# Parallel.foreach - 使变量线程安全

写入文件很昂贵,您在写入文件时持有独占锁,这很糟糕。这将引入争用。

您可以将其添加到缓冲区中,然后一次性写入所有文件。这应该可以消除争用并提供缩放方式。

if (datatable_results.Rows.Count > 0)
{
    ConcurrentQueue<string> buffer = new ConcurrentQueue<string>();
    Parallel.ForEach(datatable_results.AsEnumerable(), (data_row, state, index) =>
    {
        // Process data_row as normal.
        // When ready to write to log, do so.
        buffer.Enqueue(string.Format( "Processing row: {0}", index));
    });
    streamwriter.AutoFlush = false;
    string line;
    while (buffer.TryDequeue(out line))
    {
        streamwriter.WriteLine(line);
    }
    streamwriter.Flush();//Flush once when needed
}
  1. 请注意,您不需要维护循环计数器, Parallel.ForEach为您提供了一个。不同的是它不是计数器但索引。如果我更改了预期的行为,您可以仍然添加计数器背面并使用Interlocked.Increment递增它。
  2. 我看到您正在使用streamwriter.AutoFlush = true,这会损害性能,您可以将其设置为false并在完成写入所有数据后刷新它。

如果可能,将StreamWriter包装在 using 语句中,这样您甚至不需要刷新流(您可以免费获得它)。

或者,您可以查看日志记录框架,它们的工作做得很好。例如:NLog,Log4net等。

如果您避免日志记录,或者仅登录特定于线程的日志文件(不确定这对您是否有意义),您可以尝试改进这一点。

TPL启动尽可能多的内核的线程 Parallel.ForEach 是否限制了活动线程的数量?。

所以你能做的是:

1) 获取目标机器上的内核数

2)创建一个计数器列表,

其中包含尽可能多的内核

3) 每个内核的更新计数器

4)在并行执行终止后将它们全部汇总。

所以,在实践中:

//KEY(THREAD ID, VALUE: THREAD LOCAL COUNTER)
Dictionary<int,int> counters = new Dictionary<int, int>(NUMBER_OF_CORES); 
....
 Parallel.ForEach(datatable_results.AsEnumerable(), data_row =>
        { 
            // Process data_row as normal.
            // When ready to write to log, do so.
            //lock (locker) //NO NEED FOR LOCK, EVERY THREAD UPDATES ITS _OWN_ COUNTER
            //{
                //row_counter++;
                 counters[Thread.CurrentThread.ManagedThreadId].Value +=1;
                //NO WRITING< OR WRITING THREAD SPECIFIC FILE ONLY
                //streamwriter.WriteLine("Processing row: {0}", row_counter);

            //}
        });             
....
//AFTER EXECUTION OF PARALLEL LOOP SUM ALL COUNTERS AND GET TOTAL OF ALL THREADS. 

这样做的好处是根本不涉及锁定这将大大提高性能。使用 .net 并发集合时,它们始终在内部使用某种锁定。

这自然是一个基本的想法,如果您复制粘贴,可能无法按预期工作。我们正在谈论 多线程 ,这始终是一个困难的话题。但是,希望它能为您提供一些可以传达的想法。

首先,处理表中的行大约需要 2 秒,递增计数器并写入日志文件可能需要几毫秒。由于实际处理比您需要序列化的部分多 1000 倍,因此该方法并不重要。

此外,您实现它的方式非常可靠。有一些方法可以优化它,但没有一种值得在您的情况下实施。

避免锁定增量的一种有用方法是使用 Interlocked.Increment。它比x++慢一点,但比lock {x++;}快得多。不过,在您的情况下,这并不重要。

至于文件输出,请记住,无论如何输出都会被序列化,所以充其量你可以最大限度地减少在锁中花费的时间。您可以通过在进入锁之前缓冲所有输出来执行此操作,然后只需在锁内执行写入操作即可。您可能希望执行异步写入以避免不必要的 I/O 阻塞。

您可以在新方法中传输并行代码。例如:

    // Class scope
    private string GetLogRecord(int rowCounter, DataRow row)
    {
        return string.Format("Processing row: {0}", rowCounter); // Write any data we want to log.
    }
    //....
    Parallel.ForEach(datatable_results.AsEnumerable(), data_row =>
    { 
        // Process data_row as normal.
        // When ready to write to log, do so.
        lock (locker)
            row_counter++;
        var logRecord = GetLogRecord(row_counter, data_row);
        lock (locker)
            streamwriter.WriteLine(logRecord);
    });
这是我

使用并行的代码。 这个概念是相似的,也许更容易实现。 仅供参考,为了调试,我在代码中保留了一个常规的 for 循环,并有条件地编译并行代码。 希望这有帮助。 但是,此方案中 i 的值与处理的记录数不同。 您可以创建一个计数器并使用锁并为其添加值。 对于我确实有计数器的其他代码,我没有使用锁,只是允许值可能关闭以避免较慢的代码。 我有一个状态机制来指示处理的记录数。 对于我的实现,计数不是问题的微小可能性 - 在循环结束时,我发布了一条消息,说所有记录都已处理。

#if DEBUG
        for (int i = 0; i < stend.PBBIBuckets.Count; i++)
        {
            //int serverIndex = 0;
#else
        ParallelOptions options = new ParallelOptions();
        options.MaxDegreeOfParallelism = m_maxThreads;
        Parallel.For(0, stend.PBBIBuckets.Count, options, (i) =>
        {
#endif
            g1client.Message request;
            DataTable requestTable;
            request = new g1client.Message();
            requestTable = request.GetDataTable();
            requestTable.Columns.AddRange(
                Locations.Columns.Cast<DataColumn>().Select(x => new DataColumn(x.ColumnName, x.DataType)).ToArray
                    ());
            FillPBBIRequestTables(requestTable, request, stend.PBBIBuckets[i], stend.BucketLen[i], stend.Hierarchies);
#if DEBUG
        }
#else
        });
#endif