当我需要特定于线程的资源时使用并行函数

本文关键字:资源 函数 并行 线程 于线程 | 更新日期: 2023-09-27 17:57:41

当我希望每个线程初始化并使用自己的资源时,我如何使用System.Threading.Tasks.Pararallel函数?例如,我想做的工作需要一个SqlConnection,每个线程都必须有自己的。如何初始化和使用这样的资源?

一个糟糕的解决方案是:

const int PARALLELISM = 8;
SqlConnection[] connections = ...; // init array of connections
JobObject[] jobs = ...; // init array of jobs
Parallel.For(0, PARALLELISM, mod => {
    for(int j=0; j<jobs.Length;j++)
        if (j % PARALLELISM == mod)
            jobs[j].Do(connections[mod]);
    }
);

这个解决方案的糟糕之处在于,它不给下一个可用线程提供工作,不让系统决定并行度,坦率地说,这是对并行库的滥用。

我想要的是:

JobObject[] jobs = ...; // init array of jobs
Parallel.For(0, PARALLELISM, i => {
    SqlConnection connection = // get the connection for the current thread
    jobs[i].Do(connection);
}

当我需要特定于线程的资源时使用并行函数

首先,在我介绍如何执行之前,让我说一下SqlConnection是专门为快速重复地创建和销毁而设计的。当您处理连接时,它不会立即释放资源,而是等待关闭,如果在超时之前使用相同的连接字符串创建了新连接,它将重新使用旧连接。这被称为"连接池",默认情况下它是打开的(您必须在连接字符串中明确设置特殊设置才能禁用它。)

我建议您在函数中创建一个新的连接(我还纠正了一些关于如何设置并行度的错误,您需要传入ParallelOptions参数)

JobObject[] jobs = ...; // init array of jobs
Parallel.For(0, 
             jobs.Length, 
             new ParallelOptions { MaxDegreeOfParallelism = PARALLELISM},
            (i) => 
            {
                using(SqlConnection connection = new SqlConnection(_connectionString)
                {
                    jobs[i].Do(connection);
                }
            });

如果除了访问数组之外不需要索引i,那么也可以将其重写为Parallel.ForEach

JobObject[] jobs = ...; // init array of jobs
Parallel.ForEach(jobs, 
             new ParallelOptions { MaxDegreeOfParallelism = PARALLELISM},
            (job) => 
            {
                using(SqlConnection connection = new SqlConnection(_connectionString)
                {
                    job.Do(connection);
                }
            });

然而,为了回答您的问题,如果您想要一个可重复使用的资源,每个线程,您需要使用这个重载,它为您提供了另外两个函数,您可以传入这些函数来创建和销毁线程本地资源。

JobObject[] jobs = ...; // init array of jobs
Parallel.For(0, 
             jobs.Length,
             new ParallelOptions { MaxDegreeOfParallelism = PARALLELISM},
             () => new SqlConnection(_connectionString), //Thread local init
             (i, loopstate, connection) => 
             {
                 jobs[i].Do(connection);
                 return connection; //Passes the object to the next function that will re-use it.
             },
             (connection) => connection.Dispose()); //thread local finally.

再一次,我不建议您以这种方式执行,而是使用寿命短的SqlConnection以第一种方式执行,类就是这样使用的。