将Foreach划分为线程示例

本文关键字:线程 Foreach 划分 | 更新日期: 2023-09-27 18:29:08

我想用多线程在Foreach中运行"SearchResultByOrderNumber(string orderNumber)"方法。OrderNumbers数据表中有十个订单号。在OrderResults数据表中搜索这些OrderNumber时,我想将这些OrderNumber划分为五个线程。在每个线程中,将有两个搜索OrderNumbers。如何使用Asp.Net3.5框架进行线程处理?

我想,我必须再次提出我的问题。如何自动将"OrderNumbers"划分为Async方法?首先,我得到了rowCount。我将定义异步方法计数。然后通过rowCountasyncMethodCount的除法得到rowsPerAsyncMethods

rowsPerAsyncMethods = rowCount / asyncMethodCount

谢谢。

void Main()
{   
    var MyTask1Caller = new Func<DataTable>(MyTask1);
    var asyncResultMyTask1 = MyTask1Caller.BeginInvoke(null, null);
    var MyTask2Caller = new Func<DataTable>(MyTask2);
    var asyncResultMyTask2 = MyTask2Caller.BeginInvoke(null, null);
    DataTable dtMyTask1 = MyTask1Caller.EndInvoke(asyncResultMyTask1);
    DataTable dtMyTask2 = MyTask2Caller.EndInvoke(asyncResultMyTask2);
    Console.WriteLine("dtMyTask1");
    Console.WriteLine("dtMyTask2");
    asyncResultMyTask1.AsyncWaitHandle.WaitOne();
    asyncResultMyTask2.AsyncWaitHandle.WaitOne();

}
public int RowCount()
{
    DataTable dt = OrderNumbers();
    int items = dt.Rows.Count;
    return items;
}

public DataTable MyTask1()
{
    DataTable dtResult = new DataTable();
    DataColumn dc = new DataColumn("OrderNumber", typeof(System.Int32));
    dtResult.Columns.Add(dc);
    dc = new DataColumn("OrderResult", typeof(string));
    dtResult.Columns.Add(dc);
    DataTable dtOrders = new DataTable();
    dtOrders = OrderNumbers();
    var items = dtOrders.AsEnumerable()
    .Select(n => n).Take(3).CopyToDataTable();
    foreach(var order in items.AsEnumerable())
    {   
        string orderNumber = order["OrderNumber"].ToString();
        string orderResult = SearchResultByOrderNumber(orderNumber);
        DataRow dr = dtResult.NewRow();
        dr["OrderNumber"] = orderNumber;
        dr["OrderResult"] = orderResult;
        dtResult.Rows.Add(dr);
    }
    //Thread.Sleep(5000);       
    return dtResult;
}

public DataTable MyTask2()
{
    DataTable dtResult = new DataTable();
    DataColumn dc = new DataColumn("OrderNumber", typeof(System.Int32));
    dtResult.Columns.Add(dc);
    dc = new DataColumn("OrderResult", typeof(string));
    dtResult.Columns.Add(dc);
    DataTable dtOrders = new DataTable();
    dtOrders = OrderNumbers();
    var items = dtOrders.AsEnumerable()
    .Select(n => n).Skip(3).Take(3).CopyToDataTable();
    foreach(var order in items.AsEnumerable())
    {   
        string orderNumber = order["OrderNumber"].ToString();
        string orderResult = SearchResultByOrderNumber(orderNumber);
        DataRow dr = dtResult.NewRow();
        dr["OrderNumber"] = orderNumber;
        dr["OrderResult"] = orderResult;
        dtResult.Rows.Add(dr);
    }

    return dtResult;
}
    public string SearchResultByOrderNumber(string orderNumber)
    {
        DataTable dt = new DataTable();
        dt = OrderResults();
        var query = (from n in dt.AsEnumerable()
                    where n["OrderNumber"].ToString() ==orderNumber
                    select n["OrderResult" ].ToString()).FirstOrDefault();
        return query;
    }
    public DataTable OrderResults()
    {
                DataTable dt = new DataTable("OrderResults");
                DataColumn dc = new DataColumn("OrderNumber", typeof(System.Int32));
                dt.Columns.Add(dc);
                dc = new DataColumn("OrderResult", typeof(string));
                dt.Columns.Add(dc);
                for(int i=1; i<10; i++)
                {
                    DataRow dr = dt.NewRow();
                    dr["OrderNumber"] = i;
                    dr["OrderResult"] =i +" Result";
                    dt.Rows.Add(dr);
                }
                return dt;
    }

    public DataTable OrderNumbers()
    {
                DataTable dt = new DataTable("OrderNumbers");
                DataColumn dc = new DataColumn("OrderNumber", typeof(System.Int32));
                dt.Columns.Add(dc);
                for(int i=0; i<10; i++)
                {
                    DataRow dr = dt.NewRow();
                    dr["OrderNumber"] = i;
                    dt.Rows.Add(dr);
                }
                return dt;
    }

将Foreach划分为线程示例

如果.NET 4.0可用,您可以只使用Parallel.ForEach构造。

如果没有,那么并行处理它就像使用ThreadPool类一样简单,还需要一些额外的同步工作:

int tasks = 0; // keep track of number of active tasks
object locker = new object(); // synchronization object
foreach(var order1 in dtOrders.AsEnumerable())
{
    lock(locker) tasks++; // added a new task
    var order = order1; // local copy to avoid data races
    ThreadPool.QueueUserWorkItem(
       o =>
       {          
            string orderNumber = order["OrderNumber"].ToString();
            string orderResult = SearchResultByOrderNumber(orderNumber);
            DataRow dr = dtResult.NewRow();
            dr["OrderNumber"] = orderNumber;
            dr["OrderResult"] = orderResult;
            lock(locker) // update shared data structure and signal termination
            {
                dtResult.Rows.Add(dr);
                tasks--;
                Monitor.Pulse(locker);
            }                
       });
}
// barrier to wait for all tasks to finish
lock(locker)
{
   while(tasks > 0) Monitor.Wait(locker); 
}

您可以使用CountdownEvent来阻塞当前线程,直到计数为0:

var sync = new object();
var cd = new CountDownEvent(dtOrders.Rows.Count);
foreach(var order in dtOrders)
{
    var dr = dtResult.NewRow();
    dr["OrderNumber"] = order["OrderNumber"].ToString();
    ThreadPool.QueueUserWorkItem(o => 
    {
        dr["OrderResult"] = SearchResultByOrderNumber(dr["OrderNumber"].ToString());
        lock(sync) dtResult.Rows.Add(dr);
        cd.Signal();
    });
}
cd.Wait();
  • 计数值是在构造函数中设置的
  • cd.Signal()将计数递减一
  • cd.Wait()阻塞当前线程,直到计数为0