在这种情况下,锁是否保证多个修改是跨线程的原子性修改?
本文关键字:修改 线程 原子性 这种情况下 是否 | 更新日期: 2023-09-27 18:04:38
我正试图确定一个解决方案,我所面临的问题概述了这个程序员。本身的问题。我现在面临的具体问题是,我需要以某种方式对来自System.Collections.Concurrent
名称空间的集合进行多个原子修改。据我所知,目前还没有适当的机制来做到这一点;并发集合只保证单个操作是原子性的。
鉴于此,我想到了另一种可能的解决方案,即使用提供的开箱即用集合。我的解决方案是使用lock
来控制对执行多个修改的代码部分的访问,以便它们不会相互交错。
public interface IWork { }
public interface IResource { }
public sealed class WorkPerformer
{
public static WorkPerformer Instance { get { return lazyInstance.Value; } }
public static readonly Lazy<WorkPerformer> lazyInstance = new Lazy<WorkPerformer>(() => new WorkPerformer());
private ConcurrentDictionary<IResource, ConcurrentQueue<Guid>> IResourceWaitQueues { get; set; }
private ConcurrentDictionary<IWork, ConcurrentDictionary<IResource, Guid>> IWorkToPerform { get; set; }
private readonly object _LockObj = new object();
private WorkPerformer()
{
IResourceWaitQueues = new ConcurrentDictionary<IResource, ConcurrentQueue<Guid>>();
IWorkToPerform = new ConcurrentDictionary<IWork, ConcurrentDictionary<IResource, Guid>>();
}
private void ModifierTask_MultipleAdds(IWork workToDo)
{
Task.Run(() =>
{
lock(_LockObj)
{
// -- The point is here I am making multiple additions to IResourceWaitQueues and IWorkToPerform
// Find all IResource this IWork uses and generate a Guid for each
// Enqueue these Guid into their respective ConcurrentQueue's within IResourceWaitQueues
// Add this IWork and IResource -> Guid mapping into IWorkToPerform
}
});
}
public void ModifierTask_MultipleRemoves(IWork workThatsDone)
{
Task.Run(() =>
{
lock (_LockObj)
{
// -- The point is here I am making multiple deletions to IResourceWaitQueues and IWorkToPerform
// Find all IResource that this IWork used to perform its work
// Dequeue from the ConcurrentQueue respective to each IResource used from IResourceWaitQueues
// Remove this ITask KeyValuePair from IWorkToPerform
}
});
}
}
我想知道这个解决方案是否可以在上面的示例代码中允许对IResourceWaitQueues
和IWorkToPerform
进行多个原子操作?
我不得不假设,如果lock
有多个争用,有时它可能会变慢。但除此之外,如果我正确理解lock
,这些我希望执行的多个修改不应该相互交错,因为在lock
ed代码中一次只允许一个线程。
我看到的唯一另一个问题是我认为我必须在上面示例代码中对IResourceWaitQueues
和IWorkToPerform
的每一次其他访问上使用lock
?当然,除非访问可以与代码的lock
部分交叉。
EDIT:这里是一个更完整的代码示例,有一些,希望,对我试图解决的确切问题有帮助的评论。同样,作为参考,本程序中概述了对问题和解决方案的另一种解释。我问的这个问题。
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
namespace WorkProcessorSandbox
{
public interface IResource { }
public interface IWork
{
void ProcessWork();
List<IResource> NeededResources { get; set; }
}
// This classes purpose is to process IWork objects by calling their ProcessWork methods when it is found that
// the IResources they need to process are free. The nature of IResource objects is that they are not threadsafe
// (though some may be; some must be if an IResource appears in NeededResources multiple times). As a result
// care must be taken to make sure two IWork do not try to use a distinct IResource simultaneously.
// This is done by a sort of signalling/ticketing system. Each time a new IWork comes in to be processed it "lines
// up" for the IResources it needs. Only when it is at the front of the line for all IResource it needs will it
// move on to process. By forcing atomicity on the "lining up" of the IWork for IResources deadlocks and race conditions
// can be prevented because the order of an IWork "ticket" in a line can never interleave anothers.
public sealed class WorkProcessor
{
// Singleton class
public static WorkProcessor Instance { get { return lazyInstance.Value; } }
public static readonly Lazy<WorkProcessor> lazyInstance = new Lazy<WorkProcessor>(() => new WorkProcessor());
// ResourceWaitQueues holds a Queue of Guids mapped to distinct
// IResources representing the next IWork that is in line to use it
private readonly object _Lock_ResourceDict = new object();
private Dictionary<IResource, Queue<Guid>> ResourceWaitQueues { get; set; }
// WorkToProcess holds a Dictionary of Guid mapped to IResources representing
// the place in line this IWork (that said Dictionary is mapped to) is in for use of the IResources.
private readonly object _Lock_WorkDict = new object();
private Dictionary<IWork, Dictionary<IResource, Guid>> WorkToProcess { get; set; }
private WorkProcessor()
{
Running = false;
}
private bool Running { get; set; }
private CancellationToken ProcessingToken { get; set; }
private CancellationTokenSource ProcessingTokenSource { get; set; }
// Stops the processing of IWork from the WorkToProcess Dictionary
public void StopProcessing()
{
if (Running)
{
ProcessingTokenSource.Cancel();
Running = false;
}
}
// Starts (Allows) the processing of IWork from the WorkToProcess Dictionary
public void StartProcessing()
{
if (!Running)
{
// Instantiate to Empty
ResourceWaitQueues = new Dictionary<IResource, Queue<Guid>>();
WorkToProcess = new Dictionary<IWork, Dictionary<IResource, Guid>>();
// Create CancellationToken for use in controlling Tasks
ProcessingTokenSource = new CancellationTokenSource();
ProcessingToken = ProcessingTokenSource.Token;
Running = true;
}
}
// The purpose of this method is to compare the list of Guids at the front of the Queues in ResourceWaitQueues
// to the list of Guids that each IWork is waiting on for it to start processing.
// If the Guids that an IWork needs to start processing is present in the list of Guids at the front of the Queues
// then the IWork can start processing, otherwise it cannot.
private void TryProcessWork()
{
if(Running)
{
// A Task that will go through all of the IWork waiting to be
// processed and start processing the IWork objects that are ready.
Task.Run(() =>
{
// Here we need to lock on both the ResourceWaitQueues and WorkToProcess locks
lock (_Lock_ResourceDict) {
lock (_Lock_WorkDict)
{
// Go through the Dictionary of IWork waiting to be processed
foreach (var waitingWork in WorkToProcess)
{
// Find the List<Guid> that are needed for this IWork to be processed
var worksGuids = waitingWork.Value.Select(x => x.Value).ToList();
// Find the List<Guid> that are currently ready to be processed
var guidsReadyToProcess = ResourceWaitQueues.Values.Select(x =>
{
// If a Queue<T> is Empty when it is Peek'd it throws and Exception!
if (x.Count > 0)
return x.Peek();
return Guid.Empty;
}).ToList();
// If the List<Guid> needed by this IWork is contained within the List<Guid> ready to be processed
if (worksGuids.All(x => guidsReadyToProcess.Contains(x)))
{
// This IWork is ready to be processed!
ProcessWork(waitingWork);
// Remove this IWork from WorkToProcess
if (!WorkToProcess.Remove(waitingWork.Key))
{
Console.Out.WriteLine("Fatal error! Stopping work processing. Could not remove IWork from Dictionary that should contain it.");
StopProcessing();
break;
}
}
}
}
}
}, ProcessingToken);
}
}
// The purpose of this function is to "enqueue" IWork for processing. First a list of all the IResources
// that the IWork needs to process is created along with a Guid for each unique IResource it uses.
// These Guids are then enqueued into the respective Queue in ResourceWaitQueues representing this IWork's
// "spot in line" to use those specific IResources. Finally the IWork and its Guids are then added to the
// WorkToPerform Dictionary so that TryProcessWork can determine if it is ready to run or not.
// TryProcess is called at the end to see if this IWork is possibly ready to process right away.
public void EnqueueWork(IWork workToDo)
{
if (Running)
{
// Get all distinct IResource in the IWork's NeededResources
var worksResources = workToDo.NeededResources.Distinct().ToList();
// Create the Guids this IWork object will wait on to start processing
Dictionary<IResource, Guid> worksGuidResourceMap = new Dictionary<IResource, Guid>();
worksResources.ForEach(x => worksGuidResourceMap.Add(x, Guid.NewGuid()));
// Here we need to lock on both the ResourceWaitQueues and WorkToProcess locks
lock (_Lock_ResourceDict) {
lock (_Lock_WorkDict)
{
// Find all of the IResources that are not currently present in the ResourceWaitQueues Dictionary
var toAddResources = worksResources.Where(x => !ResourceWaitQueues.Keys.Contains(x)).ToList();
// Create a new entry in ResourceWaitQueues for these IResources
toAddResources.ForEach(x => ResourceWaitQueues.Add(x, new Queue<Guid>()));
// Add each Guid for this works IResources into the Queues in ResourceWaitQueues
foreach (var aGuidResourceMap in worksGuidResourceMap)
{
foreach (var resourceQueue in ResourceWaitQueues)
{
if (aGuidResourceMap.Key == resourceQueue.Key)
resourceQueue.Value.Enqueue(aGuidResourceMap.Value);
}
}
// Add this IWork and its processing info to the Dictionary of awaiting IWork to be processed
WorkToProcess.Add(workToDo, worksGuidResourceMap);
}
}
// Go through the list of IWork waiting to be processed and start processing IWork that is ready
TryProcessWork();
}
}
// The purpose of this function is to create a Task in which the IWork passed to it can be processed.
// Once the processing is complete the Task then dequeues a single Guid from the Queue respective to
// each IResource it needed to process. It then calls TryProcessWork because it is most likely possible
// there is some IWork that is now ready to process.
private void ProcessWork(KeyValuePair<IWork, Dictionary<IResource, Guid>> workToProcess)
{
Task.Run(() =>
{
// Actually perform the work to be processed.
workToProcess.Key.ProcessWork();
// Get the list of the IResources that were used during processing
var usedResources = workToProcess.Value.Select(x => x.Key).ToList();
// We are removing multiple Guids from the ResourceWaitQueues. They must be atomic.
// The ResourceWaitQueues could become incoherent if any other operations are performed on it during the dequeueing.
// It is ok for WorkToProcess to be modified while this is happening.
lock (_Lock_ResourceDict)
{
// Get the Queues corresponding to these IResources
var resourceQueues = ResourceWaitQueues.Where(x => usedResources.Contains(x.Key)).Select(x => x.Value).ToList();
try
{
// Dequeue a Guid from each of these Queues exposing the next Guid to be processed on each
resourceQueues.ForEach(x => x.Dequeue());
}
catch (InvalidOperationException ex)
{
Console.Out.WriteLine("Fatal error! Stopping work processing. Could not dequeue a Guid that should exist: " + ex.Message);
StopProcessing();
}
}
// Go through the list of IWork waiting to be processed and start processing IWork that is ready
TryProcessWork();
}, ProcessingToken);
}
}
}
如果没有一个良好的最小化、完整和可验证的代码示例来准确地说明您的场景,就不可能肯定地说。但根据您到目前为止的描述,似乎相当清楚,使用lock
将解决您的主要问题(一些分组操作系列的原子性)
是否需要对相同对象的所有其他访问也使用lock
取决于这些访问是什么以及它们与lock
保护的组操作的关系。当然,没有必要仅仅为了确保集合保持一致而使用lock
。它们的内部同步将确保。
但如果你分组业务代表本身某种类型的一致性,在其他访问的对象是不正确的,如果他们被允许发生当一个分组操作在进步,是的,您还需要使用lock
, _LockObj
相同的参考,以确保分组操作不能在同一时间发生其他访问发生取决于聚合数据结构的一致性。
如果你需要更具体的建议,请改进问题,以便清楚所有这些操作实际上是如何关联的。
Aside:您可能需要考虑遵循正常的。net编码约定:将Pascal大小写的使用限制在方法和属性上,并对字段使用骆驼式大小写。这将使读者更容易理解你的代码。
我可以肯定地说,使用。net接口命名约定的字段(即pascal -case标识符,总是以I
开头)是一个非常糟糕的选择。当你这样做的时候,你肯定会让人们很难理解你的代码。
为了使性能最大化,您应该避免在IWork.ProcessWork
方法期间锁定X个IResource
对象。问题是,如果您有一个IWork
对象需要10个IResource
对象,其中9个资源可能只需要几毫秒来处理,而第10个资源可能需要几分钟,在这种情况下,所有10个资源对象都将被锁定,以便没有其他IWork
对象可以使用它们完成工作所需的全部时间。
通过创建LockResource
方法和ReleaseResource
方法,您可以使用ConcurrentDictionary
,因为它的设计不需要在lock
中包装它,因为您将只执行原子操作,即将IResource
添加到ResourceWaitQueue
并从ResourceWaitQueue
中删除IResource
。这将允许您的IWork
对象以一种有效的方式执行,其中唯一的瓶颈是实际资源而不是代码。