当我需要特定于线程的资源时使用并行函数
本文关键字:资源 函数 并行 线程 于线程 | 更新日期: 2023-09-27 17:57:41
当我希望每个线程初始化并使用自己的资源时,我如何使用System.Threading.Tasks.Pararallel
函数?例如,我想做的工作需要一个SqlConnection,每个线程都必须有自己的。如何初始化和使用这样的资源?
一个糟糕的解决方案是:
const int PARALLELISM = 8;
SqlConnection[] connections = ...; // init array of connections
JobObject[] jobs = ...; // init array of jobs
Parallel.For(0, PARALLELISM, mod => {
for(int j=0; j<jobs.Length;j++)
if (j % PARALLELISM == mod)
jobs[j].Do(connections[mod]);
}
);
这个解决方案的糟糕之处在于,它不给下一个可用线程提供工作,不让系统决定并行度,坦率地说,这是对并行库的滥用。
我想要的是:
JobObject[] jobs = ...; // init array of jobs
Parallel.For(0, PARALLELISM, i => {
SqlConnection connection = // get the connection for the current thread
jobs[i].Do(connection);
}
首先,在我介绍如何执行之前,让我说一下SqlConnection
是专门为快速重复地创建和销毁而设计的。当您处理连接时,它不会立即释放资源,而是等待关闭,如果在超时之前使用相同的连接字符串创建了新连接,它将重新使用旧连接。这被称为"连接池",默认情况下它是打开的(您必须在连接字符串中明确设置特殊设置才能禁用它。)
我建议您在函数中创建一个新的连接(我还纠正了一些关于如何设置并行度的错误,您需要传入ParallelOptions
参数)
JobObject[] jobs = ...; // init array of jobs
Parallel.For(0,
jobs.Length,
new ParallelOptions { MaxDegreeOfParallelism = PARALLELISM},
(i) =>
{
using(SqlConnection connection = new SqlConnection(_connectionString)
{
jobs[i].Do(connection);
}
});
如果除了访问数组之外不需要索引i
,那么也可以将其重写为Parallel.ForEach
。
JobObject[] jobs = ...; // init array of jobs
Parallel.ForEach(jobs,
new ParallelOptions { MaxDegreeOfParallelism = PARALLELISM},
(job) =>
{
using(SqlConnection connection = new SqlConnection(_connectionString)
{
job.Do(connection);
}
});
然而,为了回答您的问题,如果您想要一个可重复使用的资源,每个线程,您需要使用这个重载,它为您提供了另外两个函数,您可以传入这些函数来创建和销毁线程本地资源。
JobObject[] jobs = ...; // init array of jobs
Parallel.For(0,
jobs.Length,
new ParallelOptions { MaxDegreeOfParallelism = PARALLELISM},
() => new SqlConnection(_connectionString), //Thread local init
(i, loopstate, connection) =>
{
jobs[i].Do(connection);
return connection; //Passes the object to the next function that will re-use it.
},
(connection) => connection.Dispose()); //thread local finally.
再一次,我不建议您以这种方式执行,而是使用寿命短的SqlConnection
以第一种方式执行,类就是这样使用的。