平行的.Foreach SQL查询有时会导致Connection

本文关键字:Connection 查询 Foreach SQL | 更新日期: 2023-09-27 18:09:59

我需要在我的应用程序中加速执行12个查询。我从常规的foreach切换到Parallel.ForEach。但有时我得到一个错误说"ExecuteReader需要一个开放的和可用的连接,连接的当前状态是连接。"我的理解是,由于12个查询中的许多都使用相同的InitialCatalog,因此12个查询中并没有真正的新连接,这可能是问题所在?我怎样才能解决这个问题?"sql"是一个"sql"类型的列表——一个类只是一个字符串名称、字符串连接和查询列表。下面是代码:

 /// <summary>
    /// Connects to SQL, performs all queries and stores results in a list of DataTables
    /// </summary>
    /// <returns>List of data tables for each query in the config file</returns>
    public List<DataTable> GetAllData()
    {
        Stopwatch sw = new Stopwatch();
        sw.Start();
        List<DataTable> data = new List<DataTable>();
         List<Sql> sql=new List<Sql>();
        Sql one = new Sql();
         one.connection = "Data Source=XXX-SQL1;Initial Catalog=XXXDB;Integrated Security=True";
         one.name = "Col1";
         one.queries.Add("SELECT Name FROM [Reports]");
         one.queries.Add("SELECT Other FROM [Reports2]");
         sql.Add(one);
        Sql two = new Sql();
         two.connection = "Data Source=XXX-SQL1;Initial Catalog=XXXDB;Integrated Security=True";
         two.name = "Col2";
         two.queries.Add("SELECT AlternateName FROM [Reports1]");
         sql.Add(two);
         Sql three = new Sql();
         three.connection = "Data Source=YYY-SQL2;Initial Catalog=YYYDB;Integrated Security=True";
         three.name = "Col3";
         three.queries.Add("SELECT Frequency FROM Times");
         sql.Add(three);

        try
        {
            // ParallelOptions options = new ParallelOptions();
            //options.MaxDegreeOfParallelism = 3;
            // Parallel.ForEach(sql, options, s =>
            Parallel.ForEach(sql, s =>
            //foreach (Sql s in sql)
            {
                foreach (string q in s.queries)
                {
                    using (connection = new SqlConnection(s.connection))
                    {
                        connection.Open();
                        DataTable dt = new DataTable();
                        dt.TableName = s.name;
                        command = new SqlCommand(q, connection);
                        SqlDataAdapter adapter = new SqlDataAdapter();
                        adapter.SelectCommand = command;
                        adapter.Fill(dt);
                        //adapter.Dispose();
                        lock (data)
                        {
                            data.Add(dt);
                        }
                    }
                }
            }
            );
        }
        catch (Exception ex)
        {
            MessageBox.Show(ex.ToString(), "GetAllData error");
        }
        sw.Stop();
        MessageBox.Show(sw.Elapsed.ToString());
        return data;
    }

下面是我创建的您需要的Sql类:

/// <summary>
/// Class defines a SQL connection and its respective queries
/// </summary>
public class Sql
{
    /// <summary>
    /// Name of the connection/query
    /// </summary>
    public string name { get; set; }
    /// <summary>
    /// SQL Connection string
    /// </summary>
    public string connection { get; set; }
    /// <summary>
    /// List of SQL queries for a connection
    /// </summary>
    public List<string> queries = new List<string>();
}

平行的.Foreach SQL查询有时会导致Connection

我将重构出您的业务逻辑(连接到数据库)。

public class SqlOperation
{
    public SqlOperation()
    {
        Queries = new List<string>();
    }
    public string TableName { get; set; }
    public string ConnectionString { get; set; }
    public List<string> Queries { get; set; }
}
public static List<DataTable> GetAllData(IEnumerable<SqlOperation> sql)
{
    var taskArray =
        sql.SelectMany(s =>
            s.Queries
             .Select(query =>
                Task.Run(() => //Task.Factory.StartNew for .NET 4.0
                    ExecuteQuery(s.ConnectionString, s.TableName, query))))
            .ToArray();
    try
    {
        Task.WaitAll(taskArray);
    }
    catch(AggregateException e)
    {
        MessageBox.Show(e.ToString(), "GetAllData error");
    }
    return taskArray.Where(t => !t.IsFaulted).Select(t => t.Result).ToList();
}
public static DataTable ExecuteQuery(string connectionString, string tableName, string query)
{
    DataTable dataTable = null;
    using (var connection = new SqlConnection(connectionString))
    {
        dataTable = new DataTable();
        dataTable.TableName = tableName;
        using(var command = new SqlCommand(query, connection))
        {
            connection.Open();
            using(var adapter = new SqlDataAdapter())
            {
                adapter.SelectCommand = command;
                adapter.Fill(dataTable);
            }
        }
    }
     return dataTable;
}

Ado。Net有一个非常聪明的连接池,所以通常您应该只打开和关闭每个命令的连接,并让池处理它们是否真的被打开或关闭。

所以每个命令一个连接:

  Parallel.ForEach(sql, s=>
            //foreach (Sql s in sql)
            {
                foreach (string q in s.queries)
                {
                    using (connection = new SqlConnection(s.connection))
                    {
                        connection.Open();
                        DataTable dt = new DataTable();
                        dt.TableName = s.name;
                        command = new SqlCommand(q, connection);
                        SqlDataAdapter adapter = new SqlDataAdapter();
                        adapter.SelectCommand = command;
                        adapter.Fill(dt);
                        //adapter.Dispose();
                        lock(data){
                            data.Add(dt);
                        }
                    }
                }
            }

您也可以在连接字符串中使用MultipleActiveResultSets=true;来支持多个阅读器