建议使用TPL,用于超长寿命螺纹

本文关键字:用于 TPL | 更新日期: 2023-09-27 18:20:14

我已经阅读了任务并行库的一些MSDN文档(http://msdn.microsoft.com/en-us/library/dd537609(v=vs.110).aspx),特别是关于TPL的最佳实践用法。

我有一个启动线程的应用程序。线程的目的是监视队列并"处理"已添加的项目。队列中项目的处理需要按顺序进行,因此我不希望同时处理队列中的多个项目。线程的寿命与windows进程一样长,并且只有在应用程序退出时才会关闭。

我想知道启动这个后台线程作为使用TPL的任务的利弊。

我最初的直觉是不使用TPL,因为我会在运行应用程序的整个生命周期中占用线程池线程,这可能会阻止其他任务的运行,并干扰线程池的最佳运行。

我不确定手动启动一个新的后台线程来执行工作会如何影响TPL在应用程序的其他独立部分中的任何使用。

我想知道在这种情况下,手动线程还是任务是推荐的方法?或者,为了做出明智的选择,我还需要考虑哪些其他因素?

我已经包含了下面的代码。请注意,代码中的"处理器"通常执行CPU绑定操作,然后"处理程序"通常执行IO绑定操作:

public class TransactionProcessor : IDisposable
{
    private readonly IProcessorConfiguration configuration;
    private readonly IList<Tuple<string, IProcessor>> processors = new List<Tuple<string, IProcessor>>();
    private readonly IList<Tuple<string, IHandler>> handlers = new List<Tuple<string, IHandler>>(); 
    private AutoResetEvent waitForWork = new AutoResetEvent(true);
    private object lockObject = new object();
    private bool processThreadShouldExit = false;
    private Thread processorThread; 
    private Queue queue = new Queue();
    public TransactionProcessor(IProcessorConfiguration configuration)
    {
        if (configuration == null)
        {
            throw new ArgumentNullException("configuration");
        }
        this.configuration = configuration;
        this.Initialise();
    }
    public void Start()
    {
        lock (this.lockObject)
        {
            if (this.processorThread == null)
            {
                this.processThreadShouldExit = false;
                this.processorThread = new Thread(this.Dispatcher);
                this.processorThread.Start();
            }
        }
    }
    public void Stop()
    {
        if (this.processorThread != null)
        {
            this.processThreadShouldExit = true;
            this.waitForWork.Set();
            this.processorThread.Join();
            this.processorThread = null;
        }
    }   
    public void QueueTransactionForProcessing(Transaction Transaction, Guid clientID)
    {
        var queueObject = new QueueObject() { Transaction = Transaction };
        lock (this.lockObject)
        {
            this.queue.Enqueue(queueObject);
        }
        if (this.waitForWork != null)
        {
            this.waitForWork.Set();
        }
    }
    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }
    protected virtual void Dispose(bool disposing)
    {
        if (this.processorThread != null)
        {
            this.Stop();
        }
        if (this.waitForWork != null)
        {
            this.waitForWork.Dispose();
            this.waitForWork = null;
        }
    }
    private void Dispatcher()
    {
        if (this.queue.Count == 0)
        {
            this.waitForWork.Reset();
        }
        while (!this.processThreadShouldExit)
        {
            if (this.queue.Count > 0 || this.waitForWork.WaitOne(60000))
            {
                while (this.queue.Count > 0 && !this.processThreadShouldExit)
                {
                    QueueObject queueObject;
                    lock (this.lockObject)
                    {
                        queueObject = (QueueObject)this.queue.Dequeue();
                    }
                    this.ProcessQueueItem(queueObject);
                }
                if (this.queue.Count == 0)
                {
                    this.waitForWork.Reset();
                }
            }
        }
    }   
    private void ProcessQueueItem(QueueObject item)
    {
        var correlationId = Guid.NewGuid();
        try
        {
            bool continuePipeline = true;
            foreach (var processor in this.processors)
            {
                processor.Item2.Process(item.Transaction, correlationId, ref continuePipeline);
                if (!continuePipeline)
                {
                    break;
                }
            }
            if (continuePipeline)
            {
                foreach (var handler in this.handlers)
                {
                    Transaction clonedTransaction = item.Transaction.Clone();
                    try
                    {
                        handler.Item2.Handle(clonedTransaction, correlationId);
                    }
                    catch (Exception e)
                    {
                    }
                }
            }
        }
        catch (Exception e)
        {               
        }
    }
    private void Initialise()
    {       
        foreach (var processor in this.configuration.Processors)
        {
            try
            {
                Type processorType = Type.GetType(processor.Value);
                if (processorType != null && typeof(IProcessor).IsAssignableFrom(processorType))
                {
                    var processorInstance = (IProcessor)Activator.CreateInstance(processorType);
                    this.processors.Add(new Tuple<string, IProcessor>(processor.Key, processorInstance));
                }
            catch (Exception e)
            {               
            }
        }
        foreach (var handler in this.configuration.Handlers)
        {
            try
            {
                Type handlerType = Type.GetType(handler.Value);
                if (handlerType != null && typeof(IHandler).IsAssignableFrom(handlerType))
                {
                    var handlerInstance = (IHandler)Activator.CreateInstance(handlerType);
                    this.handlers.Add(new Tuple<string, IHandler>(handler.Key, handlerInstance));
                }
            }
            catch (Exception e)
            {   
            }
        }
    }
}

建议使用TPL,用于超长寿命螺纹

您应该有所不同。很少需要使用像Thread这样的低级别构造,TPL也可以满足长时间运行的任务,使用它可以产生更灵活和可维护的代码。

您可以使用启动长时间运行的任务

 Task.Factory.StartNew(() => {}, TaskCreationOptions.LongRunning);

来自MSDN文档:

LongRunning指定任务将是长时间运行的,粗粒度操作涉及比细粒度系统。它向TaskScheduler提供了一个提示超额认购可能是有保证的。超额订阅允许您创建比可用的硬件线程数更多的线程。

因此,调度程序可以创建额外的线程,以确保ThreadPool的容量足够。此外,手动创建Thread不会影响您在代码的其他部分中对TPL的使用。

就我个人而言,我肯定会选择TPL而不是手动创建线程。有一点学习曲线,但它是一个非常强大的库,可以满足各种各样的场景。