
本文关键字:修改 线程 原子性 这种情况下 是否 | 更新日期: 2023-09-27 18:04:38




public interface IWork { }
public interface IResource { }
public sealed class WorkPerformer
    public static WorkPerformer Instance { get { return lazyInstance.Value; } }
    public static readonly Lazy<WorkPerformer> lazyInstance = new Lazy<WorkPerformer>(() => new WorkPerformer());
    private ConcurrentDictionary<IResource, ConcurrentQueue<Guid>> IResourceWaitQueues { get; set; }
    private ConcurrentDictionary<IWork, ConcurrentDictionary<IResource, Guid>> IWorkToPerform { get; set; }
    private readonly object _LockObj = new object();
    private WorkPerformer()
        IResourceWaitQueues = new ConcurrentDictionary<IResource, ConcurrentQueue<Guid>>();
        IWorkToPerform = new ConcurrentDictionary<IWork, ConcurrentDictionary<IResource, Guid>>();
    private void ModifierTask_MultipleAdds(IWork workToDo)
        Task.Run(() =>
                // -- The point is here I am making multiple additions to IResourceWaitQueues and IWorkToPerform 
                // Find all IResource this IWork uses and generate a Guid for each
                // Enqueue these Guid into their respective ConcurrentQueue's within IResourceWaitQueues
                // Add this IWork and IResource -> Guid mapping into IWorkToPerform
    public void ModifierTask_MultipleRemoves(IWork workThatsDone)
        Task.Run(() =>
            lock (_LockObj)
                // -- The point is here I am making multiple deletions to IResourceWaitQueues and IWorkToPerform 
                // Find all IResource that this IWork used to perform its work
                // Dequeue from the ConcurrentQueue respective to each IResource used from IResourceWaitQueues
                // Remove this ITask KeyValuePair from IWorkToPerform


我不得不假设,如果lock有多个争用,有时它可能会变慢。但除此之外,如果我正确理解lock,这些我希望执行的多个修改不应该相互交错,因为在lock ed代码中一次只允许一个线程。

我看到的唯一另一个问题是我认为我必须在上面示例代码中对IResourceWaitQueuesIWorkToPerform的每一次其他访问上使用lock ?当然,除非访问可以与代码的lock部分交叉。


using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
namespace WorkProcessorSandbox
    public interface IResource { }
    public interface IWork
        void ProcessWork();
        List<IResource> NeededResources { get; set; }
    // This classes purpose is to process IWork objects by calling their ProcessWork methods when it is found that
    // the IResources they need to process are free. The nature of IResource objects is that they are not threadsafe
    // (though some may be; some must be if an IResource appears in NeededResources multiple times). As a result
    // care must be taken to make sure two IWork do not try to use a distinct IResource simultaneously.
    // This is done by a sort of signalling/ticketing system. Each time a new IWork comes in to be processed it "lines
    // up" for the IResources it needs. Only when it is at the front of the line for all IResource it needs will it
    // move on to process. By forcing atomicity on the "lining up" of the IWork for IResources deadlocks and race conditions
    // can be prevented because the order of an IWork "ticket" in a line can never interleave anothers. 
    public sealed class WorkProcessor
        // Singleton class
        public static WorkProcessor Instance { get { return lazyInstance.Value; } }
        public static readonly Lazy<WorkProcessor> lazyInstance = new Lazy<WorkProcessor>(() => new WorkProcessor());
        // ResourceWaitQueues holds a Queue of Guids mapped to distinct 
        // IResources representing the next IWork that is in line to use it
        private readonly object _Lock_ResourceDict = new object();
        private Dictionary<IResource, Queue<Guid>> ResourceWaitQueues { get; set; }
        // WorkToProcess holds a Dictionary of Guid mapped to IResources representing 
        // the place in line this IWork (that said Dictionary is mapped to) is in for use of the IResources.
        private readonly object _Lock_WorkDict = new object();
        private Dictionary<IWork, Dictionary<IResource, Guid>> WorkToProcess { get; set; }
        private WorkProcessor()
            Running = false;
        private bool Running { get; set; }
        private CancellationToken ProcessingToken { get; set; }
        private CancellationTokenSource ProcessingTokenSource { get; set; }
        // Stops the processing of IWork from the WorkToProcess Dictionary
        public void StopProcessing()
            if (Running)
                Running = false;
        // Starts (Allows) the processing of IWork from the WorkToProcess Dictionary
        public void StartProcessing()
            if (!Running)
                // Instantiate to Empty
                ResourceWaitQueues = new Dictionary<IResource, Queue<Guid>>();
                WorkToProcess = new Dictionary<IWork, Dictionary<IResource, Guid>>();
                // Create CancellationToken for use in controlling Tasks
                ProcessingTokenSource = new CancellationTokenSource();
                ProcessingToken = ProcessingTokenSource.Token;
                Running = true;
        // The purpose of this method is to compare the list of Guids at the front of the Queues in ResourceWaitQueues
        // to the list of Guids that each IWork is waiting on for it to start processing. 
        // If the Guids that an IWork needs to start processing is present in the list of Guids at the front of the Queues
        // then the IWork can start processing, otherwise it cannot.
        private void TryProcessWork()
                // A Task that will go through all of the IWork waiting to be 
                // processed and start processing the IWork objects that are ready.
                Task.Run(() =>
                    // Here we need to lock on both the ResourceWaitQueues and WorkToProcess locks
                    lock (_Lock_ResourceDict) {
                    lock (_Lock_WorkDict)
                        // Go through the Dictionary of IWork waiting to be processed
                        foreach (var waitingWork in WorkToProcess)
                            // Find the List<Guid> that are needed for this IWork to be processed
                            var worksGuids = waitingWork.Value.Select(x => x.Value).ToList();
                            // Find the List<Guid> that are currently ready to be processed 
                            var guidsReadyToProcess = ResourceWaitQueues.Values.Select(x =>
                                // If a Queue<T> is Empty when it is Peek'd it throws and Exception!
                                if (x.Count > 0)
                                    return x.Peek();
                                return Guid.Empty;
                            // If the List<Guid> needed by this IWork is contained within the List<Guid> ready to be processed
                            if (worksGuids.All(x => guidsReadyToProcess.Contains(x)))
                                // This IWork is ready to be processed!
                                // Remove this IWork from WorkToProcess
                                if (!WorkToProcess.Remove(waitingWork.Key))
                                    Console.Out.WriteLine("Fatal error! Stopping work processing. Could not remove IWork from Dictionary that should contain it.");
                }, ProcessingToken);
        // The purpose of this function is to "enqueue" IWork for processing. First a list of all the IResources
        // that the IWork needs to process is created along with a Guid for each unique IResource it uses. 
        // These Guids are then enqueued into the respective Queue in ResourceWaitQueues representing this IWork's
        // "spot in line" to use those specific IResources. Finally the IWork and its Guids are then added to the
        // WorkToPerform Dictionary so that TryProcessWork can determine if it is ready to run or not.
        // TryProcess is called at the end to see if this IWork is possibly ready to process right away.
        public void EnqueueWork(IWork workToDo)
            if (Running)
                // Get all distinct IResource in the IWork's NeededResources
                var worksResources = workToDo.NeededResources.Distinct().ToList();
                // Create the Guids this IWork object will wait on to start processing
                Dictionary<IResource, Guid> worksGuidResourceMap = new Dictionary<IResource, Guid>();
                worksResources.ForEach(x => worksGuidResourceMap.Add(x, Guid.NewGuid()));
                // Here we need to lock on both the ResourceWaitQueues and WorkToProcess locks
                lock (_Lock_ResourceDict) {
                lock (_Lock_WorkDict)
                    // Find all of the IResources that are not currently present in the ResourceWaitQueues Dictionary
                    var toAddResources = worksResources.Where(x => !ResourceWaitQueues.Keys.Contains(x)).ToList();
                    // Create a new entry in ResourceWaitQueues for these IResources
                    toAddResources.ForEach(x => ResourceWaitQueues.Add(x, new Queue<Guid>()));
                    // Add each Guid for this works IResources into the Queues in ResourceWaitQueues
                    foreach (var aGuidResourceMap in worksGuidResourceMap)
                        foreach (var resourceQueue in ResourceWaitQueues)
                            if (aGuidResourceMap.Key == resourceQueue.Key)
                    // Add this IWork and its processing info to the Dictionary of awaiting IWork to be processed
                    WorkToProcess.Add(workToDo, worksGuidResourceMap);
                // Go through the list of IWork waiting to be processed and start processing IWork that is ready
        // The purpose of this function is to create a Task in which the IWork passed to it can be processed.
        // Once the processing is complete the Task then dequeues a single Guid from the Queue respective to 
        // each IResource it needed to process. It then calls TryProcessWork because it is most likely possible
        // there is some IWork that is now ready to process.
        private void ProcessWork(KeyValuePair<IWork, Dictionary<IResource, Guid>> workToProcess)
            Task.Run(() =>
                // Actually perform the work to be processed.
                // Get the list of the IResources that were used during processing
                var usedResources = workToProcess.Value.Select(x => x.Key).ToList();
                // We are removing multiple Guids from the ResourceWaitQueues. They must be atomic. 
                // The ResourceWaitQueues could become incoherent if any other operations are performed on it during the dequeueing.
                // It is ok for WorkToProcess to be modified while this is happening.
                lock (_Lock_ResourceDict)
                    // Get the Queues corresponding to these IResources
                    var resourceQueues = ResourceWaitQueues.Where(x => usedResources.Contains(x.Key)).Select(x => x.Value).ToList();
                        // Dequeue a Guid from each of these Queues exposing the next Guid to be processed on each
                        resourceQueues.ForEach(x => x.Dequeue());
                    catch (InvalidOperationException ex)
                        Console.Out.WriteLine("Fatal error! Stopping work processing. Could not dequeue a Guid that should exist: " + ex.Message);
                // Go through the list of IWork waiting to be processed and start processing IWork that is ready
            }, ProcessingToken);




但如果你分组业务代表本身某种类型的一致性,在其他访问的对象是不正确的,如果他们被允许发生当一个分组操作在进步,是的,您还需要使用lock, _LockObj相同的参考,以确保分组操作不能在同一时间发生其他访问发生取决于聚合数据结构的一致性。



我可以肯定地说,使用。net接口命名约定的字段(即pascal -case标识符,总是以I开头)是一个非常糟糕的选择。当你这样做的时候,你肯定会让人们很难理解你的代码。

