如何在C#中处理许多并发交互的实体

本文关键字:并发 交互 实体 许多 处理 | 更新日期: 2023-09-27 18:24:00

我正在开发一个应用程序,该应用程序具有许多可以同时交互的不同实体。我想知道这些实体以线程安全的方式相互交互的最佳方式是什么。

为了用一些简化的代码进行演示,请考虑每个实体都有自己的光纤和一些状态:

class Fiber
{
    private ActionBlock<Action> _workQueue;
    public Fiber()
    {
        _workQueue = new ActionBlock<Action>((a) => a());
    }
    public void Enqueue(Action a)
    {
        _workQueue.Post(a);
    }
    public void Stop()
    {
        _workQueue.Complete();
    }
}
class EntityState
{
    public int x { get; set; }
}
class Entity
{
    private Fiber _fiber = new Fiber();
    public EntityState State { get; set; }
    // ...
}

假设操作在实体光纤上任意排队。一个这样的动作可能是一个实体必须修改另一个实体的状态。我考虑了两种选择来以线程安全的方式进行此操作。

选项1:只允许通过线程安全包装器进行状态突变,即

class Entity
{
    private Fiber _fiber = new Fiber();
    private ReaderWriterLockSlim _stateLock = new ReaderWriterLockSlim();
    private EntityState _state = new EntityState();
    public T ReadState<T>(Func<EntityState, T> reader)
    {
        T result = default(T);
        _stateLock.EnterReadLock();
        result = reader(_state);
        _stateLock.ExitReadLock();
        return result;
    }
    public void WriteState(Action<EntityState> writer)
    {
        _stateLock.EnterWriteLock();
        writer(_state);
        _stateLock.ExitWriteLock();
    }
    // ...
}

选项2:只允许状态突变,将其调度到拥有实体的光纤上,并返回Future,以便突变者可以看到突变何时发生,即

class Future<T>
{
    public T Value { get; set; }
}
class Entity
{
    private Fiber _fiber = new Fiber();
    private EntityState _state = new EntityState();
    public Future<T> AccessState<T>(Func<EntityState, T> accessor)
    {
        Future<T> future = new Future<T>();
        _fiber.Enqueue(() => future.Value = accessor(_state));
        return future;
    }
    // ...
}

我还没有考虑其他什么选择?有什么好方法可以做到这一点吗?我应该这么做吗?

如何在C#中处理许多并发交互的实体

所有的选择都会给你带来痛苦。

  1. 即使代码在技术上是正确的,您也会在您的域中遇到合乎逻辑的种族条件。类似订单的东西可以在付款前发货
  2. 这会损害可维护性。线程代码很难调试,很难测试,也很难阅读。当它与应用程序交错时,情况会变得更加复杂

正确的方法是将线程代码与应用程序代码完全分离。将任务放入单线程光纤中。任务在所有涉及的实体中同步执行所有作业。任务完成后,可以异步执行IO。我已经为这种方法编写了一个库。

您可以将突变排入所属光纤,然后将其延续排入您自己的光纤。这样你就没有任何显式的锁了。

但是:这种Fiber方法并不比在访问实体之前只使用lock更好。(锁内部包含一个队列。)

此外,您不能以这种方式进行跨实体交易。使用锁方法,您可以收集参与事务的所有实体的锁,将它们排序为一个总顺序,并将它们全部锁定。这使您可以在没有死锁的情况下进行跨实体事务。

目前,我或多或少地选择了选项2,但如果访问状态不会被阻止,我会同步设置结果。即

using System;
using System.Threading;
namespace Server.Utility
{
    public class ThreadSafeWrapper<ObjectType>
    {
        private ObjectType m_object;
        private Fiber m_fiber;
        public ThreadSafeWrapper(ObjectType obj, Fiber fiber)
        {
            m_object = obj;
            m_fiber = fiber;
        }
        public Future<ReturnType> Transaction<ReturnType>(Func<ObjectType, ReturnType> accessor)
        {
            Future<ReturnType> future = new Future<ReturnType>();
            ReturnType synchronousResult = default(ReturnType);
            if (Monitor.TryEnter(m_object))
            {
                synchronousResult = accessor(m_object);
                Monitor.Exit(m_object);
                future.SetResult(synchronousResult);
            }
            else
            {
                m_fiber.Enqueue(() =>
                {
                    ReturnType result = default(ReturnType);
                    lock (m_object)
                    {
                        result = accessor(m_object);
                    }
                    future.SetResult(result);
                });
            }
            return future;
        }
        public void Transaction(Action<ObjectType> accessor)
        {
            if (Monitor.TryEnter(m_object))
            {
                accessor(m_object);
                Monitor.Exit(m_object);
            }
            else
            {
                m_fiber.Enqueue(() =>
                {
                    lock (m_object)
                    {
                        accessor(m_object);
                    }
                });
            }
        }
    }
}