c#平行.因为没有执行每一步
本文关键字:执行 每一步 平行 因为 | 更新日期: 2023-09-27 17:49:30
我一直在做一个导入服务的模型,它目前按顺序运行。然而,我的模型似乎出现了一个奇怪的问题,有时for循环中的一两个项目没有执行。
class Service
{
private Thread _worker;
private bool _stopping;
private CancellationTokenSource _cts;
private ParallelOptions _po;
private Repository _repository;
public void Start(Repository repository)
{
_repository = repository;
_cts = new CancellationTokenSource();
_po = new ParallelOptions {
CancellationToken = _cts.Token
};
_worker = new Thread(ProcessImport);
_worker.Start();
}
public void Stop()
{
_stopping = true;
_cts.Cancel();
if(_worker != null && _worker.IsAlive)
_worker.Join();
}
private void ProcessImport()
{
while (!_stopping)
{
var import = _repository.GetInProgressImport();
if (import == null)
{
Thread.Sleep(1000);
continue;
}
try
{
Parallel.For(0, 1000, _po, i => Work.DoWork(i, import, _cts.Token, _repository));
}
catch (OperationCanceledException)
{
// Unmark batch so it can be started again
batch = _repository.GetBatch(import.BatchId);
batch.Processing = false;
_repository.UpdateBatch(batch);
Console.WriteLine("Aborted import {0}", import.ImportId);
}
catch (Exception ex)
{
Console.WriteLine("Something went wrong: {0}", ex.Message);
}
}
}
}
class Work
{
public static void DoWork(int i, Import import, CancellationToken ct, Repository repository)
{
// Simulate doing some work
Thread.Sleep(100);
HandleAbort(ct);
Thread.Sleep(100);
HandleAbort(ct);
Thread.Sleep(100);
// Update the batch
var batch = repository.GetBatch(import.BatchId);
batch.Processed++;
if (batch.Processed == batch.Total)
{
batch.Finished = DateTime.Now;
batch.Processing = false;
}
repository.UpdateBatch(batch);
}
private static void HandleAbort(CancellationToken ct)
{
if (!ct.IsCancellationRequested)
return;
ct.ThrowIfCancellationRequested();
}
}
使用这段代码,我经常发现批处理永远不会完成,而那个批处理。已处理= 999或998.
有谁能告诉我我做错了什么吗?
提前感谢。
编辑:为了明确存储库/批处理对象-我相信在我当前的模型中,它是线程安全的
class Repository
{
private ConcurrentBag<Batch> _batchData = new ConcurrentBag<Batch>();
private ConcurrentBag<Import> _importData = new ConcurrentBag<Import>();
public void CreateImport(Import import)
{
_importData.Add(import);
}
public Import GetInProgressImport()
{
var import = _importData
.Join(_batchData, i => i.BatchId, b => b.BatchId, (i, b) => new
{
Import = i,
Batch = b
})
.Where(j => j.Batch.Processed < j.Batch.Total && !j.Batch.Processing)
.OrderByDescending(j => j.Batch.Total - j.Batch.Processed)
.ThenBy(j => j.Batch.BatchId - j.Batch.BatchId)
.Select(j => j.Import)
.FirstOrDefault();
if (import == null)
return null;
// mark the batch as processing
var batch = GetBatch(import.BatchId);
batch.Processing = true;
UpdateBatch(batch);
return import;
}
public List<Import> ListImports()
{
return _importData.ToList();
}
public void CreateBatch(Batch batch)
{
_batchData.Add(batch);
}
public Batch GetBatch(Int64 batchId)
{
return _batchData.FirstOrDefault(b => b.BatchId == batchId);
}
public void UpdateBatch(Batch batch)
{
var batchData = _batchData.First(b => b.BatchId == batch.BatchId);
batchData.Total = batch.Total;
batchData.Processed = batch.Processed;
batchData.Started = batch.Started;
batchData.Finished = batch.Finished;
batchData.Processing = batch.Processing;
}
}
class Import
{
public Int64 ImportId { get; set; }
public Int64 BatchId { get; set; }
}
class Batch
{
public Int64 BatchId { get; set; }
public int Total { get; set; }
public int Processed { get; set; }
public DateTime Created { get; set; }
public DateTime Started { get; set; }
public DateTime Finished { get; set; }
public bool Processing { get; set; }
}
这只是一个模型,所以在我的存储库后面没有DB或其他持久性。
同样,我不是在用I的值来竞争我的批处理,而是用批处理对象的Processed属性所指示的循环迭代次数(实际完成的工作)来竞争。
感谢解决方案:
我忘记了需要同步批处理的更新。应该看起来像:
class Work
{
private static object _sync = new object();
public static void DoWork(int i, Import import, CancellationToken ct, Repository repository)
{
// Do work
Thread.Sleep(100);
HandleAbort(ct);
Thread.Sleep(100);
HandleAbort(ct);
Thread.Sleep(100);
lock (_sync)
{
// Update the batch
var batch = repository.GetBatch(import.BatchId);
batch.Processed++;
if (batch.Processed == batch.Total)
{
batch.Finished = DateTime.Now;
batch.Processing = false;
}
repository.UpdateBatch(batch);
}
}
private static void HandleAbort(CancellationToken ct)
{
if (!ct.IsCancellationRequested)
return;
ct.ThrowIfCancellationRequested();
}
}
看起来batch.Processed
上丢失了更新。增量不是原子的。batch.Processed++;
是活泼的。使用Interlocked.Increment
根据MSDN, Parallel.For
的过载指定第二个整数为toExclusive
,表示上升到但不满足该值。换句话说,999是预期的结果,而不是1000——但也要注意,从"0"开始,循环确实执行了1000次。
乍一看,你的代码是并行的,所以确保你没有看到"999"调用打乱了"998"调用的顺序——这是因为并行执行,你的代码本质上是无序的,很容易以非常随机的重新排列而结束。另外,请仔细阅读lock
,因为您的代码可能正在访问应该等待的值。