C# Parallel.foreach - 使变量线程安全
本文关键字:变量 线程 安全 Parallel foreach | 更新日期: 2023-09-27 18:30:36
我一直在重写一些进程密集型循环以使用 TPL 来提高速度。这是我第一次尝试线程,所以想检查我正在做的事情是否正确。
结果很好 - 从标准foreach
循环移动到Parallel.ForEach
循环时,在DataTable
中处理 1000 行的数据将处理时间从 34 分钟减少到 9 分钟。对于此测试,我删除了非线程安全操作,例如将数据写入日志文件和递增计数器。
我仍然需要写回日志文件并递增计数器,所以我尝试实现一个锁来包裹流编写器/增量代码块。
FileStream filestream = new FileStream("path_to_file.txt", FileMode.Create);
StreamWriter streamwriter = new StreamWriter(filestream);
streamwriter.AutoFlush = true;
try
{
object locker = new object();
// Lets assume we have a DataTable containing 1000 rows of data.
DataTable datatable_results;
if (datatable_results.Rows.Count > 0)
{
int row_counter = 0;
Parallel.ForEach(datatable_results.AsEnumerable(), data_row =>
{
// Process data_row as normal.
// When ready to write to log, do so.
lock (locker)
{
row_counter++;
streamwriter.WriteLine("Processing row: {0}", row_counter);
// Write any data we want to log.
}
});
}
}
catch (Exception e)
{
// Catch the exception.
}
streamwriter.Close();
上述方法似乎按预期工作,性能成本最低(仍为 9 分钟执行时间)。当然,锁中包含的操作本身并不重要 - 我假设随着在锁中处理代码所花费的时间增加,线程被锁定的时间越长,它对处理时间的影响就越大。
我的问题:上述方法是否是一种有效的方法,或者是否有一种更快或更安全的其他方法来实现上述目标?
另外,假设我们的原始DataTable
实际上包含 30000 行。将这个DataTable
拆分成每个 1000 行的块,然后在Parallel.ForEach
中处理它们,而不是一次性处理所有 300000 行,有什么好处吗?
写入文件很昂贵,您在写入文件时持有独占锁,这很糟糕。这将引入争用。
您可以将其添加到缓冲区中,然后一次性写入所有文件。这应该可以消除争用并提供缩放方式。
if (datatable_results.Rows.Count > 0)
{
ConcurrentQueue<string> buffer = new ConcurrentQueue<string>();
Parallel.ForEach(datatable_results.AsEnumerable(), (data_row, state, index) =>
{
// Process data_row as normal.
// When ready to write to log, do so.
buffer.Enqueue(string.Format( "Processing row: {0}", index));
});
streamwriter.AutoFlush = false;
string line;
while (buffer.TryDequeue(out line))
{
streamwriter.WriteLine(line);
}
streamwriter.Flush();//Flush once when needed
}
- 请注意,您不需要维护循环计数器,
Parallel.ForEach
为您提供了一个。不同的是它不是计数器但索引。如果我更改了预期的行为,您可以仍然添加计数器背面并使用Interlocked.Increment
递增它。 - 我看到您正在使用
streamwriter.AutoFlush = true
,这会损害性能,您可以将其设置为false
并在完成写入所有数据后刷新它。
如果可能,将StreamWriter
包装在 using 语句中,这样您甚至不需要刷新流(您可以免费获得它)。
或者,您可以查看日志记录框架,它们的工作做得很好。例如:NLog,Log4net等。
如果您避免日志记录,或者仅登录特定于线程的日志文件(不确定这对您是否有意义),您可以尝试改进这一点。
TPL
启动尽可能多的内核的线程 Parallel.ForEach 是否限制了活动线程的数量?。
所以你能做的是:
1) 获取目标机器上的内核数
2)创建一个计数器列表,
其中包含尽可能多的内核3) 每个内核的更新计数器
4)在并行执行终止后将它们全部汇总。
所以,在实践中:
//KEY(THREAD ID, VALUE: THREAD LOCAL COUNTER)
Dictionary<int,int> counters = new Dictionary<int, int>(NUMBER_OF_CORES);
....
Parallel.ForEach(datatable_results.AsEnumerable(), data_row =>
{
// Process data_row as normal.
// When ready to write to log, do so.
//lock (locker) //NO NEED FOR LOCK, EVERY THREAD UPDATES ITS _OWN_ COUNTER
//{
//row_counter++;
counters[Thread.CurrentThread.ManagedThreadId].Value +=1;
//NO WRITING< OR WRITING THREAD SPECIFIC FILE ONLY
//streamwriter.WriteLine("Processing row: {0}", row_counter);
//}
});
....
//AFTER EXECUTION OF PARALLEL LOOP SUM ALL COUNTERS AND GET TOTAL OF ALL THREADS.
这样做的好处是根本不涉及锁定,这将大大提高性能。使用 .net 并发集合时,它们始终在内部使用某种锁定。
这自然是一个基本的想法,如果您复制粘贴,可能无法按预期工作。我们正在谈论 多线程 ,这始终是一个困难的话题。但是,希望它能为您提供一些可以传达的想法。
首先,处理表中的行大约需要 2 秒,递增计数器并写入日志文件可能需要几毫秒。由于实际处理比您需要序列化的部分多 1000 倍,因此该方法并不重要。
此外,您实现它的方式非常可靠。有一些方法可以优化它,但没有一种值得在您的情况下实施。
避免锁定增量的一种有用方法是使用 Interlocked.Increment。它比x++
慢一点,但比lock {x++;}
快得多。不过,在您的情况下,这并不重要。
至于文件输出,请记住,无论如何输出都会被序列化,所以充其量你可以最大限度地减少在锁中花费的时间。您可以通过在进入锁之前缓冲所有输出来执行此操作,然后只需在锁内执行写入操作即可。您可能希望执行异步写入以避免不必要的 I/O 阻塞。
您可以在新方法中传输并行代码。例如:
// Class scope
private string GetLogRecord(int rowCounter, DataRow row)
{
return string.Format("Processing row: {0}", rowCounter); // Write any data we want to log.
}
//....
Parallel.ForEach(datatable_results.AsEnumerable(), data_row =>
{
// Process data_row as normal.
// When ready to write to log, do so.
lock (locker)
row_counter++;
var logRecord = GetLogRecord(row_counter, data_row);
lock (locker)
streamwriter.WriteLine(logRecord);
});
使用并行的代码。 这个概念是相似的,也许更容易实现。 仅供参考,为了调试,我在代码中保留了一个常规的 for 循环,并有条件地编译并行代码。 希望这有帮助。 但是,此方案中 i 的值与处理的记录数不同。 您可以创建一个计数器并使用锁并为其添加值。 对于我确实有计数器的其他代码,我没有使用锁,只是允许值可能关闭以避免较慢的代码。 我有一个状态机制来指示处理的记录数。 对于我的实现,计数不是问题的微小可能性 - 在循环结束时,我发布了一条消息,说所有记录都已处理。
#if DEBUG
for (int i = 0; i < stend.PBBIBuckets.Count; i++)
{
//int serverIndex = 0;
#else
ParallelOptions options = new ParallelOptions();
options.MaxDegreeOfParallelism = m_maxThreads;
Parallel.For(0, stend.PBBIBuckets.Count, options, (i) =>
{
#endif
g1client.Message request;
DataTable requestTable;
request = new g1client.Message();
requestTable = request.GetDataTable();
requestTable.Columns.AddRange(
Locations.Columns.Cast<DataColumn>().Select(x => new DataColumn(x.ColumnName, x.DataType)).ToArray
());
FillPBBIRequestTables(requestTable, request, stend.PBBIBuckets[i], stend.BucketLen[i], stend.Hierarchies);
#if DEBUG
}
#else
});
#endif