c#平行.因为没有执行每一步

本文关键字:执行 每一步 平行 因为 | 更新日期: 2023-09-27 17:49:30

我一直在做一个导入服务的模型,它目前按顺序运行。然而,我的模型似乎出现了一个奇怪的问题,有时for循环中的一两个项目没有执行。

class Service
{
    private Thread _worker;
    private bool _stopping;        
    private CancellationTokenSource _cts;
    private ParallelOptions _po;
    private Repository _repository;
    public void Start(Repository repository)
    {
        _repository = repository;
        _cts = new CancellationTokenSource();            
        _po = new ParallelOptions { 
            CancellationToken = _cts.Token
        };
        _worker = new Thread(ProcessImport);
        _worker.Start();            
    }
    public void Stop()
    {
        _stopping = true;
        _cts.Cancel();
        if(_worker != null && _worker.IsAlive)
            _worker.Join();            
    }
    private void ProcessImport()
    {
        while (!_stopping)
        {
            var import = _repository.GetInProgressImport();
            if (import == null)
            {
                Thread.Sleep(1000);
                continue;
            }
            try
            {
                Parallel.For(0, 1000, _po, i => Work.DoWork(i, import, _cts.Token, _repository));
            }
            catch (OperationCanceledException)
            {
                // Unmark batch so it can be started again
                batch = _repository.GetBatch(import.BatchId);
                batch.Processing = false;
                _repository.UpdateBatch(batch);
                Console.WriteLine("Aborted import {0}", import.ImportId);
            }
            catch (Exception ex)
            {
                Console.WriteLine("Something went wrong: {0}", ex.Message);
            }         
        }         
    }
}
class Work
{
    public static void DoWork(int i, Import import, CancellationToken ct, Repository repository)
    {         
        // Simulate doing some work
        Thread.Sleep(100);
        HandleAbort(ct);
        Thread.Sleep(100);
        HandleAbort(ct);
        Thread.Sleep(100);
        // Update the batch
        var batch = repository.GetBatch(import.BatchId);
        batch.Processed++;
        if (batch.Processed == batch.Total)
        {
            batch.Finished = DateTime.Now;
            batch.Processing = false;                
        }            
        repository.UpdateBatch(batch);            
    }
    private static void HandleAbort(CancellationToken ct)
    {
        if (!ct.IsCancellationRequested) 
            return;
        ct.ThrowIfCancellationRequested();
    }
}

使用这段代码,我经常发现批处理永远不会完成,而那个批处理。已处理= 999或998.

有谁能告诉我我做错了什么吗?

提前感谢。

编辑:

为了明确存储库/批处理对象-我相信在我当前的模型中,它是线程安全的

class Repository
{
    private ConcurrentBag<Batch> _batchData = new ConcurrentBag<Batch>();
    private ConcurrentBag<Import> _importData = new ConcurrentBag<Import>();
    public void CreateImport(Import import)
    {
        _importData.Add(import);
    }
    public Import GetInProgressImport()
    {
        var import = _importData
            .Join(_batchData, i => i.BatchId, b => b.BatchId, (i, b) => new
            {
                Import = i,
                Batch = b
            })
            .Where(j => j.Batch.Processed < j.Batch.Total && !j.Batch.Processing)
            .OrderByDescending(j => j.Batch.Total - j.Batch.Processed)
            .ThenBy(j => j.Batch.BatchId - j.Batch.BatchId)
            .Select(j => j.Import)                
            .FirstOrDefault();
        if (import == null)
            return null;
        // mark the batch as processing
        var batch = GetBatch(import.BatchId);
        batch.Processing = true;
        UpdateBatch(batch);
        return import;
    }
    public List<Import> ListImports()
    {
        return _importData.ToList();
    }
    public void CreateBatch(Batch batch)
    {
        _batchData.Add(batch);
    }
    public Batch GetBatch(Int64 batchId)
    {
        return _batchData.FirstOrDefault(b => b.BatchId == batchId);
    }
    public void UpdateBatch(Batch batch)
    {
        var batchData = _batchData.First(b => b.BatchId == batch.BatchId);
        batchData.Total = batch.Total;
        batchData.Processed = batch.Processed;
        batchData.Started = batch.Started;
        batchData.Finished = batch.Finished;
        batchData.Processing = batch.Processing;
    }
}
class Import
{
    public Int64 ImportId { get; set; }
    public Int64 BatchId { get; set; }
}
class Batch
{
    public Int64 BatchId { get; set; }
    public int Total { get; set; }
    public int Processed { get; set; }
    public DateTime Created { get; set; }
    public DateTime Started { get; set; }
    public DateTime Finished { get; set; }   
    public bool Processing { get; set; }   
}

这只是一个模型,所以在我的存储库后面没有DB或其他持久性。

同样,我不是在用I的值来竞争我的批处理,而是用批处理对象的Processed属性所指示的循环迭代次数(实际完成的工作)来竞争。

感谢

解决方案:

我忘记了需要同步批处理的更新。应该看起来像:

class Work
{
    private static object _sync = new object();
    public static void DoWork(int i, Import import, CancellationToken ct, Repository repository)
    {       
        // Do work            
        Thread.Sleep(100);
        HandleAbort(ct);
        Thread.Sleep(100);
        HandleAbort(ct);
        Thread.Sleep(100);
        lock (_sync)
        {
            // Update the batch
            var batch = repository.GetBatch(import.BatchId);
            batch.Processed++;
            if (batch.Processed == batch.Total)
            {
                batch.Finished = DateTime.Now;
                batch.Processing = false;
            }
            repository.UpdateBatch(batch);
        }
    }
    private static void HandleAbort(CancellationToken ct)
    {
        if (!ct.IsCancellationRequested) 
            return;
        ct.ThrowIfCancellationRequested();
    }
}

c#平行.因为没有执行每一步

看起来batch.Processed上丢失了更新。增量不是原子的。batch.Processed++;是活泼的。使用Interlocked.Increment

在我看来,你现在似乎对线程没有很好的理解。在没有很好的理解的情况下执行这种复杂的线程是非常危险的。你所犯的错误很难测试出来,但是产品会发现它们。

根据MSDN, Parallel.For的过载指定第二个整数为toExclusive,表示上升到但不满足该值。换句话说,999是预期的结果,而不是1000——但也要注意,从"0"开始,循环确实执行了1000次。

乍一看,你的代码是并行的,所以确保你没有看到"999"调用打乱了"998"调用的顺序——这是因为并行执行,你的代码本质上是无序的,很容易以非常随机的重新排列而结束。另外,请仔细阅读lock,因为您的代码可能正在访问应该等待的值。