如何在C#中处理许多并发交互的实体
本文关键字:并发 交互 实体 许多 处理 | 更新日期: 2023-09-27 18:24:00
我正在开发一个应用程序,该应用程序具有许多可以同时交互的不同实体。我想知道这些实体以线程安全的方式相互交互的最佳方式是什么。
为了用一些简化的代码进行演示,请考虑每个实体都有自己的光纤和一些状态:
class Fiber
{
private ActionBlock<Action> _workQueue;
public Fiber()
{
_workQueue = new ActionBlock<Action>((a) => a());
}
public void Enqueue(Action a)
{
_workQueue.Post(a);
}
public void Stop()
{
_workQueue.Complete();
}
}
class EntityState
{
public int x { get; set; }
}
class Entity
{
private Fiber _fiber = new Fiber();
public EntityState State { get; set; }
// ...
}
假设操作在实体光纤上任意排队。一个这样的动作可能是一个实体必须修改另一个实体的状态。我考虑了两种选择来以线程安全的方式进行此操作。
选项1:只允许通过线程安全包装器进行状态突变,即
class Entity
{
private Fiber _fiber = new Fiber();
private ReaderWriterLockSlim _stateLock = new ReaderWriterLockSlim();
private EntityState _state = new EntityState();
public T ReadState<T>(Func<EntityState, T> reader)
{
T result = default(T);
_stateLock.EnterReadLock();
result = reader(_state);
_stateLock.ExitReadLock();
return result;
}
public void WriteState(Action<EntityState> writer)
{
_stateLock.EnterWriteLock();
writer(_state);
_stateLock.ExitWriteLock();
}
// ...
}
选项2:只允许状态突变,将其调度到拥有实体的光纤上,并返回Future,以便突变者可以看到突变何时发生,即
class Future<T>
{
public T Value { get; set; }
}
class Entity
{
private Fiber _fiber = new Fiber();
private EntityState _state = new EntityState();
public Future<T> AccessState<T>(Func<EntityState, T> accessor)
{
Future<T> future = new Future<T>();
_fiber.Enqueue(() => future.Value = accessor(_state));
return future;
}
// ...
}
我还没有考虑其他什么选择?有什么好方法可以做到这一点吗?我应该这么做吗?
所有的选择都会给你带来痛苦。
- 即使代码在技术上是正确的,您也会在您的域中遇到合乎逻辑的种族条件。类似订单的东西可以在付款前发货
- 这会损害可维护性。线程代码很难调试,很难测试,也很难阅读。当它与应用程序交错时,情况会变得更加复杂
正确的方法是将线程代码与应用程序代码完全分离。将任务放入单线程光纤中。任务在所有涉及的实体中同步执行所有作业。任务完成后,可以异步执行IO。我已经为这种方法编写了一个库。
您可以将突变排入所属光纤,然后将其延续排入您自己的光纤。这样你就没有任何显式的锁了。
但是:这种Fiber方法并不比在访问实体之前只使用lock
更好。(锁内部包含一个队列。)
此外,您不能以这种方式进行跨实体交易。使用锁方法,您可以收集参与事务的所有实体的锁,将它们排序为一个总顺序,并将它们全部锁定。这使您可以在没有死锁的情况下进行跨实体事务。
目前,我或多或少地选择了选项2,但如果访问状态不会被阻止,我会同步设置结果。即
using System;
using System.Threading;
namespace Server.Utility
{
public class ThreadSafeWrapper<ObjectType>
{
private ObjectType m_object;
private Fiber m_fiber;
public ThreadSafeWrapper(ObjectType obj, Fiber fiber)
{
m_object = obj;
m_fiber = fiber;
}
public Future<ReturnType> Transaction<ReturnType>(Func<ObjectType, ReturnType> accessor)
{
Future<ReturnType> future = new Future<ReturnType>();
ReturnType synchronousResult = default(ReturnType);
if (Monitor.TryEnter(m_object))
{
synchronousResult = accessor(m_object);
Monitor.Exit(m_object);
future.SetResult(synchronousResult);
}
else
{
m_fiber.Enqueue(() =>
{
ReturnType result = default(ReturnType);
lock (m_object)
{
result = accessor(m_object);
}
future.SetResult(result);
});
}
return future;
}
public void Transaction(Action<ObjectType> accessor)
{
if (Monitor.TryEnter(m_object))
{
accessor(m_object);
Monitor.Exit(m_object);
}
else
{
m_fiber.Enqueue(() =>
{
lock (m_object)
{
accessor(m_object);
}
});
}
}
}
}