在多个线程中执行可工作数据包
本文关键字:工作 数据包 执行 线程 | 更新日期: 2023-09-27 18:14:20
我是C#/.Net多线程概念的新手,我有以下要求。
我们的应用程序应该首先获取需要进行数据处理的客户(仅id(,并根据WorkPacket的大小(用于将一组客户打包为一个可工作的数据包的术语(创建WorkPacket。因此,每个工作包都包含一组客户
现在,在运行时创建的每个工作包都需要在不同的线程上执行。因此,所有线程都需要在运行时创建并中止。由于需要处理的客户数量非常巨大,我们采用了多线程概念。
我们有单线程的工作功能,但可以确定如何使用实现。下面是使用的代码。有什么建议吗???
public class WorkAllocator
{
private int workPacketSize;
private List<WorkPacket> workPackets;
public List<WorkPacket> WorkPackets
{
get { return workPackets; }
set { workPackets = value; }
}
public int WorkPacketSize
{
get { return workPacketSize; }
set { workPacketSize = value; }
}
public WorkAllocator(int size)
{
workPacketSize = size;
int noOfPackets=0;
DataManager objDAL = new DataManager(ConnectionString);
IEnumerable<string> CustomerIds = objDAL.GetData();
workPackets = new List<WorkPacket>();
if (CustomerIds.Count() > 0)
{
noOfPackets = CustomerIds.Count() / workPacketSize;
if (CustomerIds.Count() % workPacketSize != 0)
{
noOfPackets++;
}
var wps = CustomerIds
.Select((c, i) => new { Index = i, Value = c })
.GroupBy(c => c.Index / workPacketSize)
.Select(c => c.Select(v => v.Value).ToList())
.ToList();
foreach (List<string> wp in wps)
{
workPackets.Add(new WorkPacket(wp));
}
}
objDAL.Dispose();
}
}
public class WorkPacket
{
private List<string> customerIds;
public List<string> customerIds
{
get { return customerIds; }
set { customerIds = value; }
}
public WorkPacket(List<string> Cids)
{
customerIds = Cids;
}
}
主要方法代码
wa = new WorkAllocator(10);
if (wa.WorkPackets != null && wa.WorkPackets.Count > 0)
{
//Dynamically Create threads for each work packets and join after all child thread completes the activity
foreach (WorkPacket wp in wa.WorkPackets)
{
var _processor = new Processor();
Processor.Run(wp);
}
}
使用Task
的简单解决方案:
wa = new WorkAllocator(10);
if (wa.WorkPackets != null && wa.WorkPackets.Count > 0)
{
var tasks = new List<Task>();
foreach (WorkPacket wp in wa.WorkPackets)
{
tasks.Add(Task.Run(() =>
{
Processor.Run(wp);
});
}
Task.WaitAll(tasks.ToArray());
}
这将在其自己的线程中运行每个Processor
(由ThreadPool
管理的并行化(,然后等待所有线程完成。
也许也可以看看ActionBlock
,在那里你可以做一些类似的事情:
var threadCount = Environment.ProcessorCount;
var actionBlock = new ActionBlock<WorkPacket>(() => Processor.Run(wp),
// this is optional, but default is 1:
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = threadCount });
wa = new WorkAllocator(10);
if (wa.WorkPackets != null && wa.WorkPackets.Count > 0)
{
var tasks = new List<Task>();
foreach (WorkPacket wp in wa.WorkPackets)
actionBlock.Post(wp);
actionBlock.Complete();
actionBlock.Completion.Wait();
}
在您的主方法中,您可以简单地使用Parallel.ForEach。这是阻塞-在处理完所有工作包之前不会返回。
Parallel.ForEach(wa.WorkPackets, wp => Processor.Run(wp));