c#并行地完成每个任务

本文关键字:任务 并行 | 更新日期: 2023-09-27 18:02:40

我使用c# Parallel。ForEach可以处理上千个数据子集。一套需要5-30分钟来处理,这取决于套装的大小。在我的电脑中使用选项

ParallelOptions po = new ParallelOptions();
po.MaxDegreeOfParallelism = Environment.ProcessorCount

我将得到8个并行进程。据我所知,进程在并行任务之间平均分配(例如,第一个任务获得编号为1、9、17等的任务,第二个任务获得编号为2、10、18等的任务);因此,一个任务可以比其他任务更快地完成自己的工作。因为这些数据集比其他数据集花费的时间更少。

问题是四个并行任务在24小时内完成了它们的任务,但最后一个任务在48小时内完成。是否有可能组织并行性,使所有并行任务都能平等地完成?这意味着所有并行任务都继续工作,直到所有作业都完成。

c#并行地完成每个任务

由于作业是不相等的,所以您不能在处理器之间分配作业的数量并让它们几乎同时完成。我认为这里需要的是8个工作线程,可以在线检索下一个工作。你必须在函数上使用锁才能获得下一个任务。

如果我说错了,有人会纠正我,但是我突然想到……可以给工作线程一个这样的函数:
public void ProcessJob()
{
    for (Job myJob = GetNextJob(); myJob != null; myJob = GetNextJob())
    {
        // process job
    }
}

获得下一个任务的函数看起来像:

private List<Job> jobs;
private int currentJob = 0;
private Job GetNextJob()
{
    lock (jobs)
    {
        Job job = null;
        if (currentJob < jobs.Count)
        {
            job = jobs[currentJob];
            currentJob++;
        }
        return job;
    }
}

似乎没有现成的解决方案,必须创建。

我之前的代码是:

var ListOfSets = (from x in Database
           group x by x.SetID into z
           select new { ID = z.Key}).ToList();
ParallelOptions po = new ParallelOptions();
po.MaxDegreeOfParallelism = Environment.ProcessorCount;
Parallel.ForEach(ListOfSets, po, SingleSet=>
{
     AnalyzeSet(SingleSet.ID);
});

为了在所有cpu之间平等地共享工作,我仍然使用Parallel来完成工作,但我使用For来代替ForEach和Matt的想法。新的代码是:

Parallel.For(0, Environment.ProcessorCount, i=>
{
    while(ListOfSets.Count() > 0)
    {
        double SetID = 0;
        lock (ListOfSets)
        {
            SetID = ListOfSets[0].ID;
            ListOfSets.RemoveAt(0);
        }
     AnalyzeSet(SetID);
    }
});

谢谢你的建议

根据其他人的建议,一个选择是管理您自己的生产者消费者队列。我想指出的是,使用BlockingCollection使这个非常容易做到。

BlockingCollection<JobData> queue = new BlockingCollection<JobData>();
//add data to queue; if it can be done quickly, just do it inline.  
//If it's expensive, start a new task/thread just to add items to the queue.
foreach (JobData job in data)
    queue.Add(job);
queue.CompleteAdding();
for (int i = 0; i < Environment.ProcessorCount; i++)
{
    Task.Factory.StartNew(() =>
    {
        foreach (var job in queue.GetConsumingEnumerable())
        {
            ProcessJob(job);
        }
    }, TaskCreationOptions.LongRunning);
}