“固定”/“负载平衡”C# 线程池

本文关键字:线程 负载平衡 负载 固定 平衡 | 更新日期: 2023-09-27 18:33:53

我有一个"昂贵"的第三方组件。此组件不是线程安全的。所述组件托管在WCF服务中(目前(,因此...每次调用进入服务时,我都必须更新组件。

相反,我想做的是拥有一个包含 16 个线程的池,每个线程都启动自己的组件副本,并具有调用该方法并将其分发到 16 个线程之一并返回值的机制。

所以像这样简单的事情:

var response = threadPool.CallMethod(param1, param2);
调用阻止直到

它得到响应很好,因为我需要响应才能继续。

有什么建议吗?也许我想多了,由 16 个线程提供服务的ConcurrentQueue可以完成这项工作,但现在确定方法返回值将如何返回给调用方?

“固定”/“负载平衡”C# 线程池

WCF 已经使用线程池来管理其资源,因此,如果在此基础上添加一层线程管理,它只会很糟糕。如果可能,请避免这样做,因为您的服务调用会遇到争用。

在您的情况下,我会做的只是使用单个 ThreadLocal 或线程静态,它会用您昂贵的对象初始化一次。此后,它将可用于线程池线程。

这是假设您的对象在 MTA 线程上很好;我猜它来自您的帖子,因为听起来事情目前正在工作,但速度很慢。

有人担心会创建太多对象,并且随着池变得太大而使用太多内存。但是,在执行其他任何操作之前,请查看在实践中是否是这种情况。这是一个非常简单的策略,易于实施,因此易于试用。只有当你真的需要时,才会变得更复杂。

首先,我同意@briantyler:ThreadLocal<T>或线程静态字段可能是您想要的。您应该以此为起点,如果它不能满足您的需求,请考虑其他选择。

一个复杂但灵活的替代方法是单一实例对象池。在最简单的形式中,池类型将如下所示:

public sealed class ObjectPool<T>
{
    private readonly ConcurrentQueue<T> __objects = new ConcurrentQueue<T>();
    private readonly Func<T> __factory;
    public ObjectPool(Func<T> factory)
    {
        __factory = factory;
    }
    public T Get()
    {
        T obj;
        return __objects.TryDequeue(out obj) ? obj : __factory();
    }
    public void Return(T obj)
    {
        __objects.Enqueue(obj);
    }
}

如果您T从基元类或结构(即 ObjectPool<MyComponent> (,因为池中没有任何内置的线程控件。但是你可以用你的T类型代替一个Lazy<T>Task<T>的monad,并得到你想要的。

池初始化:

Func<Task<MyComponent>> factory = () => Task.Run(() => new MyComponent());
ObjectPool<Task<MyComponent>> pool = new ObjectPool<Task<MyComponent>>(factory);
// "Pre-warm up" the pool with 16 concurrent tasks.
// This starts the tasks on the thread pool and
// returns immediately without blocking.
for (int i = 0; i < 16; i++) {
    pool.Return(pool.Get());
}

用法:

// Get a pooled task or create a new one. The task may
// have already completed, in which case Result will
// be available immediately. If the task is still
// in flight, accessing its Result will block.
Task<MyComponent> task = pool.Get();
try
{
    MyComponent component = task.Result; // Alternatively you can "await task"
    // Do something with component.
}
finally
{
    pool.Return(task);
}

此方法比在 ThreadLocal 或线程静态字段中维护组件更复杂,但如果您需要做一些花哨的事情,例如限制池实例的数量,池抽象可能非常有用。

编辑

基本的"固定的 X 实例集"池实现,带有一个Get,一旦池被耗尽,就会阻塞:

public sealed class ObjectPool<T>
{
    private readonly Queue<T> __objects;
    public ObjectPool(IEnumerable<T> items)
    {
        __objects = new Queue<T>(items);
    }
    public T Get()
    {
        lock (__objects)
        {
            while (__objects.Count == 0) {
                Monitor.Wait(__objects);
            }
            return __objects.Dequeue();
        }
    }
    public void Return(T obj)
    {
        lock (__objects)
        {
            __objects.Enqueue(obj);
            Monitor.Pulse(__objects);
        }
    }
}