在多个线程中执行可工作数据包

本文关键字:工作 数据包 执行 线程 | 更新日期: 2023-09-27 18:14:20

我是C#/.Net多线程概念的新手,我有以下要求。

我们的应用程序应该首先获取需要进行数据处理的客户(仅id(,并根据WorkPacket的大小(用于将一组客户打包为一个可工作的数据包的术语(创建WorkPacket。因此,每个工作包都包含一组客户

现在,在运行时创建的每个工作包都需要在不同的线程上执行。因此,所有线程都需要在运行时创建并中止。由于需要处理的客户数量非常巨大,我们采用了多线程概念。

我们有单线程的工作功能,但可以确定如何使用实现。下面是使用的代码。有什么建议吗???

public class WorkAllocator
{
    private int workPacketSize;
    private List<WorkPacket> workPackets;
    public List<WorkPacket> WorkPackets
    {
        get { return workPackets; }
        set { workPackets = value; }
    }
    public int WorkPacketSize
    {
        get { return workPacketSize; }
        set { workPacketSize = value; }
    }
    public WorkAllocator(int size)
    {
        workPacketSize = size;
        int noOfPackets=0;
        DataManager objDAL = new DataManager(ConnectionString);
        IEnumerable<string> CustomerIds = objDAL.GetData();
        workPackets = new List<WorkPacket>();
        if (CustomerIds.Count() > 0)
        {
            noOfPackets = CustomerIds.Count() / workPacketSize;
            if (CustomerIds.Count() % workPacketSize != 0)
            {
                noOfPackets++;
            }
            var wps = CustomerIds
                .Select((c, i) => new { Index = i, Value = c })
                .GroupBy(c => c.Index / workPacketSize)
                .Select(c => c.Select(v => v.Value).ToList())
                .ToList();
            foreach (List<string> wp in wps)
            {
                workPackets.Add(new WorkPacket(wp));
            }
        }
        objDAL.Dispose();
    }
}
public class WorkPacket
{
    private List<string> customerIds;
    public List<string> customerIds
    {
        get { return customerIds; }
        set { customerIds = value; }
    }
    public WorkPacket(List<string> Cids)
    {
        customerIds = Cids;
    }
}

主要方法代码

wa = new WorkAllocator(10);
if (wa.WorkPackets != null && wa.WorkPackets.Count > 0)
{
    //Dynamically Create threads for each work packets and join after all child thread completes the activity
    foreach (WorkPacket wp in wa.WorkPackets)
    {
            var _processor = new Processor();
        Processor.Run(wp);
    }
}

在多个线程中执行可工作数据包

使用Task的简单解决方案:

wa = new WorkAllocator(10);
if (wa.WorkPackets != null && wa.WorkPackets.Count > 0)
{
    var tasks = new List<Task>();
    foreach (WorkPacket wp in wa.WorkPackets)
    {
        tasks.Add(Task.Run(() =>
            {
                Processor.Run(wp);
            });
    }
    Task.WaitAll(tasks.ToArray());
}

这将在其自己的线程中运行每个Processor(由ThreadPool管理的并行化(,然后等待所有线程完成。

也许也可以看看ActionBlock,在那里你可以做一些类似的事情:

var threadCount = Environment.ProcessorCount;
var actionBlock = new ActionBlock<WorkPacket>(() => Processor.Run(wp),
   // this is optional, but default is 1:
   new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = threadCount });
wa = new WorkAllocator(10);
if (wa.WorkPackets != null && wa.WorkPackets.Count > 0)
{
    var tasks = new List<Task>();
    foreach (WorkPacket wp in wa.WorkPackets)
        actionBlock.Post(wp);
    actionBlock.Complete();
    actionBlock.Completion.Wait();
}

在您的主方法中,您可以简单地使用Parallel.ForEach。这是阻塞-在处理完所有工作包之前不会返回。

Parallel.ForEach(wa.WorkPackets, wp => Processor.Run(wp));