实现对象池的好方法是什么

本文关键字:方法 是什么 对象 实现 | 更新日期: 2023-09-27 18:20:24

我有一个第三方类,我们称它为Analyser。这个类非常擅长分析,但实例化成本很高(秒),而且不支持多线程。

我的应用程序需要处理涉及调用Analyser的请求。这些请求将同时发生。

我想我需要创建一个通用类,比如

public class Pool<T>
{
    public Pool(Func<T> instantiator, int size)
    {
        ...
    }
    public async Task<TResult> Invoke<TResult>(
            Func<T, TResult> target,
            CancellationToken cancellationToken)
    {
        // await the first available T,
        // lock the T,
        // invoke the target, return the result
        // release the lock
    }
}

这个类一般封装池功能。

我的问题是,实现这个类的正确方法是什么。它是否已经以不同的名称存在?我应该使用TPL.DataFlow吗?我应该手动滚动吗?

Good被定义为可靠的线程安全,越容易维护越好。


如果通用Pool是解决问题的错误方法,请提出正确的替代方案。


Pool类的使用方式如下。

private readonly Pool<Analyser> pool = new Pool<Analyser>(
        () => new Analyser(a, b, c),
        100);
public async Task<string> ProcessRequest(
        string raw,
        CancellationToken cancellationToken)
{
    return await this.pool.Invoke(
        analyser => analyser.Analyse(raw),
        cancellationToken);
}

实现对象池的好方法是什么

我认为构建一个通用池将是一项相当复杂的任务,因此我会从中获得很多乐趣:-)

注意:我的愿景与您的愿景不同的最重要的一点是,我不希望池处理与它管理的对象相关的线程问题。该池有一些与线程安全相关的代码,但仅用于管理它自己的状态(实例列表)。线程启动、停止/和/或取消是池的客户端和构建的对象所关心的问题,而不是池本身。

我会从开始

  1. 由池维护的对象的一次性包装器,在释放时将对象返回到池
  2. 一个池,用于构造或重用可用实例,并在将实例返回到客户端之前对其进行包装

一个超级简化的实现:

class PoolItem<T> : IDisposable
{
    public event EventHandler<EventArgs> Disposed;

    public PoolItem(T wrapped)
    {
        WrappedObject = wrapped;
    }

    public T WrappedObject { get; private set; }

    public void Dispose()
    {
        Disposed(this, EventArgs.Empty);
    }
}

现在的池:

class Pool<T> where T : class
{
    private static readonly object m_SyncRoot = new object();
    private readonly Func<T> m_FactoryMethod;
    private List<T> m_PoolItems = new List<T>();

    public Pool(Func<T> factoryMethod)
    {
        m_FactoryMethod = factoryMethod;
    }

    public PoolItem<T> Get()
    {
        T target = null;
        lock (m_SyncRoot)
        {
            if (m_PoolItems.Count > 0)
            {
                target = m_PoolItems[0];
                m_PoolItems.RemoveAt(0);
            }
        }
        if (target == null)
            target = m_FactoryMethod();
        var wrapper = new PoolItem<T>(target);
        wrapper.Disposed += wrapper_Disposed;
        return wrapper;
    }

    void wrapper_Disposed(object sender, EventArgs e)
    {
        var wrapper = sender as PoolItem<T>;
        lock (m_SyncRoot)
        {
            m_PoolItems.Add(wrapper.WrappedObject);
        }
    }
}

用法:

class ExpensiveConstructionObject
{
    public ExpensiveConstructionObject()
    {
        Console.WriteLine("Executing the expensive constructor...");
    }
    public void Do(string stuff)
    {
        Console.WriteLine("Doing: " + stuff);
    }
}
    class Program
{
    static void Main(string[] args)
    {
        var pool = new Pool<ExpensiveConstructionObject>(() => new ExpensiveConstructionObject());
        var t1 = pool.Get();
        t1.WrappedObject.Do("task 1");
        using (var t2 = pool.Get())
            t2.WrappedObject.Do("task 2");
        using (var t3 = pool.Get())
            t3.WrappedObject.Do("task 3");
        t1.Dispose();
        Console.ReadLine();
    }
}

接下来的步骤是:

  1. 经典泳池功能,如:初始尺寸、最大尺寸
  2. 动态代理,允许Pool::Get返回类型为T,而不是PoolItem
  3. 维护包装器的列表,以便在池自身被释放时,如果调用方没有释放它们

IIUC您试图实现的是一个通用对象池,当您没有资源可供使用时,您可以异步等待,直到您使用为止。

最简单的解决方案是使用TPL DataflowBufferBlock来保存项目,并在项目为空时等待。在您的API中,您得到一个委托并运行它,但我建议您从池中返回实际项目,并让用户决定如何处理它:

public class ObjectPool<TItem>
{
    private readonly BufferBlock<TItem> _bufferBlock;
    private readonly int _maxSize;
    private readonly Func<TItem> _creator;
    private readonly CancellationToken _cancellationToken;
    private readonly object _lock;
    private int _currentSize;
    public ObjectPool(int maxSize, Func<TItem> creator, CancellationToken cancellationToken)
    {
        _lock = new object();
        _maxSize = maxSize;
        _currentSize = 1;
        _creator = creator;
        _cancellationToken = cancellationToken;
        _bufferBlock = new BufferBlock<TItem>(new DataflowBlockOptions{CancellationToken = cancellationToken});
    }
    public void Push(TItem item)
    {
        if (!_bufferBlock.Post(item) || _bufferBlock.Count > _maxSize)
        {
            throw new Exception();
        }
    }
    public Task<TItem> PopAsync()
    {
        TItem item;
        if (_bufferBlock.TryReceive(out item))
        {
            return Task.FromResult(item);
        }
        if (_currentSize < _maxSize)
        {
            lock (_lock)
            {
                if (_currentSize < _maxSize)
                {
                    _currentSize++;
                    _bufferBlock.Post(_creator());
                }
            }
        }
        return _bufferBlock.ReceiveAsync();
    }
}

解释:

  • 我使用锁来确保一次只创建一个新项目,如果需要很长时间,可以很容易地用AsyncLock替换
  • 我使用双重检查锁定来优化所有项目都已创建的常见情况
  • PopAsync返回一个Task,但不是一个异步方法,所以只要有要返回的项,它就会同步完成。它只在池为空并且已达到限制时等待

您可以添加一个返回IDisposable的方法,这样您就可以不用担心地使用scope将其放入:

public async Task<Disposable> GetDisposableAsync()
{
    return new Disposable(this, await PopAsync());
}
public class Disposable : IDisposable
{
    private readonly ObjectPool<TItem> _pool;
    public TItem Item { get; set; }
    public Disposable(ObjectPool<TItem> pool, TItem item)
    {
        Item = item;
        _pool = pool;
    }
    public void Dispose()
    {
        _pool.Push(Item);
    }
}

池是一个很好的解决方案。毕竟,池正是用于此目的的(维护一组成本太高而无法每次实例化的对象:数据库连接、线程等)

然而,如果你想构建一个通用池,你必须非常小心:你的代码用户可能会做"意想不到"的事情,最终会开枪自杀。

例如,锁定:您应该真正检查这是否会导致死锁。如果需要,在飞行中扩大游泳池,或者如果代理要求更多的物体,则投掷。。。例外情况也应该小心处理。

因此,"等待第一个可用的T"answers"锁定T"步骤应该完全由池处理,它应该进行所有必要的检查,以避免出现尴尬的情况。如果你想(例如嵌套锁定或类似的东西),你可以考虑为你的"客户端代码"(目标)提供对池的引用,以要求额外的锁定能力

更实际的做法是:您可以从一个专门适用于Analyser类的解决方案开始,然后在需要时从那里着手开发通用池?