跨线程拆分工作

本文关键字:工作 拆分 线程 | 更新日期: 2023-09-27 18:16:32

目前,我是一个一个地处理从视频中读取的帧,然后将它们写入文件。这似乎效率低下,速度慢,所以我想把工作分成多个线程。

我当前的代码可以总结如下:

for(long n = 0; n < totalframes; n++) {
    using(Bitmap frame = vreader.ReadVideoFrame()) {
        Process(frame); //awfully slow
        WriteToFile(frame);
    }
}

我如何加载,比如说,四个帧,在四个线程中处理它们,等待它们全部完成,然后将它们写入文件?重要的是,这些帧的写入顺序与视频中的顺序完全相同。

跨线程拆分工作

您可以使用例如Parallel.ForEach()来处理帧。然后用迭代器块(IEnumerable<>)读取它们。

但是写作需要更多的注意。确保你附加一个数字到每一帧,并在处理结束时,转储他们在一个BlockingCollection<T>。启动一个单独的线程(Task)来处理队列并按顺序写入帧。这是一个典型的n-生产者/1-消费者解决方案。

这就是你想要一个Pipeline的地方。我几乎直接从并行编程模式中复制了代码,并在步骤2中引入了额外的并行性(我包含了使用并行任务和PLINQ的示例)。它不是太复杂,而且很好用,在我的机器上,它的运行速度比顺序版本快很多倍。您可能看不到代码中相同程度的改进(因为我猜您的ProcessThread.Sleep更复杂),但它仍然会运行得更快。

显然,由于额外的并行性和我试图匹配你的对象模型,有很多混乱。参考《并行编程模式》第55页的原始、简单的示例代码。这是一件美丽的事情,所以一定要去看看(http://www.microsoft.com/en-au/download/details.aspx?id=19222)。

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace PipelineExample
{
    /// <summary>
    /// Stack Overflow question 16882318.
    /// </summary>
    public class Program
    {
        /// <summary>
        /// This is our simulated "file". In essense it will contain the
        /// ID of each Frame which has been processed and written to file.
        /// </summary> 
        private static readonly List<long> FrameFile = new List<long>();
        /// <summary>
        /// This is a modification of Stephen Toub's Pipelines
        /// example from Patterns Of Parallel Programming.
        /// </summary>
        private static void RunPipeline(VReader vreader, long totalframes)
        {
            var rawFrames = new BlockingCollection<Bitmap>();
            var processedFrames = new BlockingCollection<Bitmap>();
            // Stage 1: read raw frames.
            var readTask = Task.Run(() =>
            {
                try
                {
                    for (long n = 0; n < totalframes; n++)
                    {
                        rawFrames.Add(vreader.ReadVideoFrame());
                    }
                }
                finally { rawFrames.CompleteAdding(); }
            });
            // Stage 2: process frames in parallel.
            var processTask = Task.Run(() =>
            {
                try
                {
                    // Try both - see which performs better in your scenario.
                    Step2WithParallelTasks(rawFrames, processedFrames);
                    //Step2WithPLinq(rawFrames, processedFrames);
                }
                finally { processedFrames.CompleteAdding(); }
            });
            // Stage 3: write results to file and dispose of the frame.
            var writeTask = Task.Run(() =>
            {
                foreach (var processedFrame in processedFrames.GetConsumingEnumerable())
                {
                    WriteToFile(processedFrame);
                    processedFrame.Dispose();
                }
            });
            Task.WaitAll(readTask, processTask, writeTask);
        }
        /// <summary>
        /// Processes frames in rawFrames and adds them to
        /// processedFrames preserving the original frame order.
        /// </summary>
        private static void Step2WithPLinq(BlockingCollection<Bitmap> rawFrames, BlockingCollection<Bitmap> processedFrames)
        {
            Console.WriteLine("Executing Step 2 via PLinq.");
            var processed = rawFrames.GetConsumingEnumerable()
                .AsParallel()
                .AsOrdered()
                .Select(frame =>
                {
                    Process(frame);
                    return frame;
                });
            foreach (var frame in processed)
            {
                processedFrames.Add(frame);
            }
        }
        /// <summary>
        /// Processes frames in rawFrames and adds them to
        /// processedFrames preserving the original frame order.
        /// </summary>
        private static void Step2WithParallelTasks(BlockingCollection<Bitmap> rawFrames, BlockingCollection<Bitmap> processedFrames)
        {
            Console.WriteLine("Executing Step 2 via parallel tasks.");
            var degreesOfParallellism = Environment.ProcessorCount;
            var inbox = rawFrames.GetConsumingEnumerable();
            // Start our parallel tasks.
            while (true)
            {
                var tasks = inbox
                    .Take(degreesOfParallellism)
                    .Select(frame => Task.Run(() =>
                    {
                        Process(frame);
                        return frame;
                    }))
                    .ToArray();
                if (tasks.Length == 0)
                {
                    break;
                }
                Task.WaitAll(tasks);
                foreach (var t in tasks)
                {
                    processedFrames.Add(t.Result);
                }
            }
        }
        /// <summary>
        /// Sequential implementation - as is (for comparison).
        /// </summary>
        private static void RunSequential(VReader vreader, long totalframes)
        {
            for (long n = 0; n < totalframes; n++)
            {
                using (var frame = vreader.ReadVideoFrame())
                {
                    Process(frame);
                    WriteToFile(frame);
                }
            }
        }
        /// <summary>
        /// Main entry point.
        /// </summary>
        private static void Main(string[] args)
        {
            // Arguments.
            long totalframes = 1000;
            var vreader = new VReader();
            // We'll time our run.
            var sw = Stopwatch.StartNew();
            // Try both for comparison.
            //RunSequential(vreader, totalframes);
            RunPipeline(vreader, totalframes);
            sw.Stop();
            Console.WriteLine("Elapsed ms: {0}.", sw.ElapsedMilliseconds);
            // Validation: count, order and contents.
            if (Range(1, totalframes).SequenceEqual(FrameFile))
            {
                Console.WriteLine("Frame count and order of frames in the file are CORRECT.");
            }
            else
            {
                Console.WriteLine("Frame count and order of frames in the file are INCORRECT.");
            }
            Console.ReadLine();
        }
        /// <summary>
        /// Simulate CPU work.
        /// </summary>
        private static void Process(Bitmap frame)
        {
            Thread.Sleep(10);
        }
        /// <summary>
        /// Simulate IO pressure.
        /// </summary>
        private static void WriteToFile(Bitmap frame)
        {
            Thread.Sleep(5);
            FrameFile.Add(frame.ID);
        }
        /// <summary>
        /// Naive implementation of Enumerable.Range(int, int) for long.
        /// </summary>
        private static IEnumerable<long> Range(long start, long count)
        {
            for (long i = start; i < start + count; i++)
            {
                yield return i;
            }
        }
        private class VReader
        {
            public Bitmap ReadVideoFrame()
            {
                return new Bitmap();
            }
        }
        private class Bitmap : IDisposable
        {
            private static int MaxID;
            public readonly long ID;
            public Bitmap()
            {
                this.ID = Interlocked.Increment(ref MaxID);
            }
            public void Dispose()
            {
                // Dummy method.
            }
        }
    }
}

要并行操作元素,请使用System.Linq的并行方法,如ParallelEnumerable.Range()。为了保持元素的顺序,可以使用.AsOrdered() .

ParallelEnumerable.Range(0, totalframes)
                  .AsOrdered()
                  .Select(x => vreader.ReadVideoFrame())
                  .Select(Process)
                  .Select(WriteToFile);

是的-你需要一个线程池,一些线程,一个输入图像数据的类+一个'序列号'或'帧号'来识别顺序和一个线程安全的'ReSerializer'类,它有一个容器来缓存所有收到的'乱序'帧,直到更早的帧进来。

可能是4个BackgroundWorker的。从1-4传递一个数字到每个除了数据本身-并在他们的RunWorkerCompleted事件处理程序-检查是否所有其他3已经完成…(您可以使用bool[4])

据我所知-你不必担心2 RunWorkerCompleted 's同时被调用,因为它们都在同一个线程上运行。

我有一个类似的问题,我问在这个线程。

我确实想出了一个解决方案,似乎还可以,但对于你的目的来说似乎太复杂了。

它围绕着您能够提供3个委托:一个用于检索工作项(在您的情况下,它将返回Bitmap),一个用于处理该工作项,最后一个用于输出该工作项。它还允许您指定将要运行的并发线程的最大数量——您可以使用它来限制内存使用。参见下面ParallelBlockProcessor构造函数中的numTasks参数。

只有处理委托可以被多个线程调用。

和您一样,我需要确保最终输出与原始输入的顺序相同。我使用了一个优先级队列。

使用。net 4.5的TPL可能有更好的解决方案,但我仅限于。net 4。

这是我想出的代码,我想你可以把它适应你的问题:

ParallelBlockProcessor类:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Threading.Tasks;
using ConsoleApplication1;
namespace Demo
{
    public sealed class ParallelBlockProcessor<T> where T: class
    {
        public delegate T Read();            // Called by only one thread.
        public delegate T Process(T block);  // Called simultaneously by multiple threads.
        public delegate void Write(T block); // Called by only one thread.
        public ParallelBlockProcessor(Read read, Process process, Write write, int numTasks = 0)
        {
            Contract.Requires(read != null);
            Contract.Requires(process != null);
            Contract.Requires(write != null);
            Contract.Requires((0 <= numTasks) && (numTasks <= 64));
            _read    = read;
            _process = process;
            _write   = write;
            numTasks = (numTasks > 0) ? numTasks : Environment.ProcessorCount;
            _workPool   = new BlockingCollection<WorkItem>(numTasks*2);
            _inputQueue  = new BlockingCollection<WorkItem>(numTasks);
            _outputQueue = new ConcurrentPriorityQueue<int, T>();
            _processors  = new Task[numTasks];
            initWorkItems();
            startProcessors();
            Task.Factory.StartNew(enqueueBlocks);
            _dequeuer = Task.Factory.StartNew(dequeueBlocks);
        }
        private void startProcessors()
        {
            for (int i = 0; i < _processors.Length; ++i)
            {
                _processors[i] = Task.Factory.StartNew(processBlocks);
            }
        }
        private void initWorkItems()
        {
            for (int i = 0; i < _workPool.BoundedCapacity; ++i)
            {
                _workPool.Add(new WorkItem());
            }
        }
        private void enqueueBlocks()
        {
            int index = 0;
            while (true)
            {
                T data = _read();
                if (data == null)
                {
                    _inputQueue.CompleteAdding();
                    _outputQueue.Enqueue(index, null); // Special terminator WorkItem.
                    break;
                }
                WorkItem workItem = _workPool.Take();
                workItem.Data = data;
                workItem.Index = index++;
                _inputQueue.Add(workItem);
            }
        }
        private void dequeueBlocks()
        {
            int index = 0; // Next required index.
            int last = int.MaxValue;
            while (true)
            {
                KeyValuePair<int, T> workItem;
                _outputQueue.WaitForNewItem();   // There will always be at least one item - the sentinel item.
                while (_outputQueue.TryPeek(out workItem))
                {
                    if (workItem.Value == null) // The sentinel item has a null value to indicate that it's the sentinel.
                    {
                        last = workItem.Key;  // The sentinel's key is the index of the last block + 1.
                    }
                    else if (workItem.Key == index) // Is this block the next one that we want?
                    {
                        // Even if new items are added to the queue while we're here, the new items will be lower priority.
                        // Therefore it is safe to assume that the item we will dequeue now is the same one we peeked at.
                        _outputQueue.TryDequeue(out workItem);
                        Contract.Assume(workItem.Key == index);
                        _workPool.Add(new WorkItem()); // Free up a work pool item.     
                        _write(workItem.Value);
                        ++index;
                    }
                    else // If it's not the block we want, we know we'll get a new item at some point.
                    {
                        _outputQueue.WaitForNewItem();
                    }
                    if (index == last)
                    {
                        return;
                    }
                }
            }
        }
        private void processBlocks()
        {
            foreach (var block in _inputQueue.GetConsumingEnumerable())
            {
                var processedData = _process(block.Data);
                _outputQueue.Enqueue(block.Index, processedData);
            }
        }
        public bool WaitForFinished(int maxMillisecondsToWait) // Can be Timeout.Infinite.
        {
            return _dequeuer.Wait(maxMillisecondsToWait);
        }
        private sealed class WorkItem // Note: This is mutable.
        {
            public T   Data  { get; set; }
            public int Index { get; set; }
        }
        private readonly Task[] _processors;
        private readonly Task _dequeuer;
        private readonly BlockingCollection<WorkItem> _workPool;
        private readonly BlockingCollection<WorkItem> _inputQueue;
        private readonly ConcurrentPriorityQueue<int, T> _outputQueue;
        private readonly Read    _read;
        private readonly Process _process;
        private readonly Write   _write;
    }
}

优先队列(改编自微软):

using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
namespace ConsoleApplication1
{
    /// <summary>Provides a thread-safe priority queue data structure.</summary> 
    /// <typeparam name="TKey">Specifies the type of keys used to prioritize values.</typeparam> 
    /// <typeparam name="TValue">Specifies the type of elements in the queue.</typeparam> 
    [SuppressMessage("Microsoft.Naming", "CA1711:IdentifiersShouldNotHaveIncorrectSuffix")]
    [SuppressMessage("Microsoft.Naming", "CA1710:IdentifiersShouldHaveCorrectSuffix")]
    [DebuggerDisplay("Count={Count}")] 
    public sealed class ConcurrentPriorityQueue<TKey, TValue> : 
        IProducerConsumerCollection<KeyValuePair<TKey,TValue>>  
        where TKey : IComparable<TKey> 
    { 
        /// <summary>Initializes a new instance of the ConcurrentPriorityQueue class.</summary> 
        public ConcurrentPriorityQueue() {} 
        /// <summary>Initializes a new instance of the ConcurrentPriorityQueue class that contains elements copied from the specified collection.</summary> 
        /// <param name="collection">The collection whose elements are copied to the new ConcurrentPriorityQueue.</param> 
        [SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
        public ConcurrentPriorityQueue(IEnumerable<KeyValuePair<TKey, TValue>> collection) 
        { 
            if (collection == null) throw new ArgumentNullException("collection"); 
            foreach (var item in collection) _minHeap.Insert(item); 
        } 
        /// <summary>Adds the key/value pair to the priority queue.</summary> 
        /// <param name="priority">The priority of the item to be added.</param> 
        /// <param name="value">The item to be added.</param> 
        public void Enqueue(TKey priority, TValue value) 
        { 
            Enqueue(new KeyValuePair<TKey, TValue>(priority, value)); 
        } 
        /// <summary>Adds the key/value pair to the priority queue.</summary> 
        /// <param name="item">The key/value pair to be added to the queue.</param> 
        public void Enqueue(KeyValuePair<TKey, TValue> item) 
        {
            lock (_syncLock)
            {
                _minHeap.Insert(item);
                _newItem.Set();
            }
        }
        /// <summary>Waits for a new item to appear.</summary>
        public void WaitForNewItem()
        {
            _newItem.WaitOne();
        }
        /// <summary>Attempts to remove and return the next prioritized item in the queue.</summary> 
        /// <param name="result"> 
        /// When this method returns, if the operation was successful, result contains the object removed. If 
        /// no object was available to be removed, the value is unspecified. 
        /// </param> 
        /// <returns> 
        /// true if an element was removed and returned from the queue succesfully; otherwise, false. 
        /// </returns> 
        public bool TryDequeue(out KeyValuePair<TKey, TValue> result) 
        { 
            result = default(KeyValuePair<TKey, TValue>); 
            lock (_syncLock) 
            { 
                if (_minHeap.Count > 0) 
                { 
                    result = _minHeap.Remove(); 
                    return true; 
                } 
            } 
            return false; 
        } 
        /// <summary>Attempts to return the next prioritized item in the queue.</summary> 
        /// <param name="result"> 
        /// When this method returns, if the operation was successful, result contains the object. 
        /// The queue was not modified by the operation. 
        /// </param> 
        /// <returns> 
        /// true if an element was returned from the queue succesfully; otherwise, false. 
        /// </returns> 
        public bool TryPeek(out KeyValuePair<TKey, TValue> result) 
        { 
            result = default(KeyValuePair<TKey, TValue>); 
            lock (_syncLock) 
            { 
                if (_minHeap.Count > 0) 
                { 
                    result = _minHeap.Peek(); 
                    return true; 
                } 
            } 
            return false; 
        } 
        /// <summary>Empties the queue.</summary> 
        public void Clear() { lock(_syncLock) _minHeap.Clear(); } 
        /// <summary>Gets whether the queue is empty.</summary> 
        public bool IsEmpty { get { return Count == 0; } } 
        /// <summary>Gets the number of elements contained in the queue.</summary> 
        public int Count 
        { 
            get { lock (_syncLock) return _minHeap.Count; } 
        } 
        /// <summary>Copies the elements of the collection to an array, starting at a particular array index.</summary> 
        /// <param name="array"> 
        /// The one-dimensional array that is the destination of the elements copied from the queue. 
        /// </param> 
        /// <param name="index"> 
        /// The zero-based index in array at which copying begins. 
        /// </param> 
        /// <remarks>The elements will not be copied to the array in any guaranteed order.</remarks> 
        public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index) 
        { 
            lock (_syncLock) _minHeap.Items.CopyTo(array, index); 
        } 
        /// <summary>Copies the elements stored in the queue to a new array.</summary> 
        /// <returns>A new array containing a snapshot of elements copied from the queue.</returns> 
        public KeyValuePair<TKey, TValue>[] ToArray() 
        { 
            lock (_syncLock) 
            { 
                var clonedHeap = new MinBinaryHeap(_minHeap); 
                var result = new KeyValuePair<TKey, TValue>[_minHeap.Count]; 
                for (int i = 0; i < result.Length; i++) 
                { 
                    result[i] = clonedHeap.Remove(); 
                } 
                return result; 
            } 
        } 
        /// <summary>Attempts to add an item in the queue.</summary> 
        /// <param name="item">The key/value pair to be added.</param> 
        /// <returns> 
        /// true if the pair was added; otherwise, false. 
        /// </returns> 
        bool IProducerConsumerCollection<KeyValuePair<TKey, TValue>>.TryAdd(KeyValuePair<TKey, TValue> item) 
        { 
            Enqueue(item); 
            return true; 
        } 
        /// <summary>Attempts to remove and return the next prioritized item in the queue.</summary> 
        /// <param name="item"> 
        /// When this method returns, if the operation was successful, result contains the object removed. If 
        /// no object was available to be removed, the value is unspecified. 
        /// </param> 
        /// <returns> 
        /// true if an element was removed and returned from the queue succesfully; otherwise, false. 
        /// </returns> 
        bool IProducerConsumerCollection<KeyValuePair<TKey, TValue>>.TryTake(out KeyValuePair<TKey, TValue> item) 
        { 
            return TryDequeue(out item); 
        } 
        /// <summary>Returns an enumerator that iterates through the collection.</summary> 
        /// <returns>An enumerator for the contents of the queue.</returns> 
        /// <remarks> 
        /// The enumeration represents a moment-in-time snapshot of the contents of the queue. It does not 
        /// reflect any updates to the collection after GetEnumerator was called. The enumerator is safe to 
        /// use concurrently with reads from and writes to the queue. 
        /// </remarks> 
        public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator() 
        { 
            var arr = ToArray(); 
            return ((IEnumerable<KeyValuePair<TKey, TValue>>)arr).GetEnumerator(); 
        } 
        /// <summary>Returns an enumerator that iterates through a collection.</summary> 
        /// <returns>An IEnumerator that can be used to iterate through the collection.</returns> 
        IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } 
        /// <summary>Copies the elements of the collection to an array, starting at a particular array index.</summary> 
        /// <param name="array"> 
        /// The one-dimensional array that is the destination of the elements copied from the queue. 
        /// </param> 
        /// <param name="index"> 
        /// The zero-based index in array at which copying begins. 
        /// </param> 
        void ICollection.CopyTo(Array array, int index) 
        { 
            lock (_syncLock) ((ICollection)_minHeap.Items).CopyTo(array, index); 
        } 
        /// <summary> 
        /// Gets a value indicating whether access to the ICollection is synchronized with the SyncRoot. 
        /// </summary> 
        bool ICollection.IsSynchronized { get { return true; } } 
        /// <summary> 
        /// Gets an object that can be used to synchronize access to the collection. 
        /// </summary> 
        object ICollection.SyncRoot { get { return _syncLock; } } 
        /// <summary>Implements a binary heap that prioritizes smaller values.</summary> 
        private sealed class MinBinaryHeap 
        { 
            private readonly List<KeyValuePair<TKey, TValue>> _items; 
            /// <summary>Initializes an empty heap.</summary> 
            public MinBinaryHeap() 
            { 
                _items = new List<KeyValuePair<TKey, TValue>>(); 
            } 
            /// <summary>Initializes a heap as a copy of another heap instance.</summary> 
            /// <param name="heapToCopy">The heap to copy.</param> 
            /// <remarks>Key/Value values are not deep cloned.</remarks> 
            public MinBinaryHeap(MinBinaryHeap heapToCopy) 
            { 
                _items = new List<KeyValuePair<TKey, TValue>>(heapToCopy.Items); 
            } 
            /// <summary>Empties the heap.</summary> 
            public void Clear() { _items.Clear(); } 
            /// <summary>Adds an item to the heap.</summary> 
            public void Insert(KeyValuePair<TKey,TValue> entry) 
            { 
                // Add the item to the list, making sure to keep track of where it was added. 
                _items.Add(entry); 
                int pos = _items.Count - 1; 
                // If the new item is the only item, we're done. 
                if (pos == 0) return; 
                // Otherwise, perform log(n) operations, walking up the tree, swapping 
                // where necessary based on key values 
                while (pos > 0) 
                { 
                    // Get the next position to check 
                    int nextPos = (pos-1) / 2; 
                    // Extract the entry at the next position 
                    var toCheck = _items[nextPos]; 
                    // Compare that entry to our new one.  If our entry has a smaller key, move it up. 
                    // Otherwise, we're done. 
                    if (entry.Key.CompareTo(toCheck.Key) < 0) 
                    { 
                        _items[pos] = toCheck; 
                        pos = nextPos; 
                    } 
                    else break; 
                } 
                // Make sure we put this entry back in, just in case 
                _items[pos] = entry; 
            } 
            /// <summary>Returns the entry at the top of the heap.</summary> 
            public KeyValuePair<TKey, TValue> Peek() 
            { 
                // Returns the first item 
                if (_items.Count == 0) throw new InvalidOperationException("The heap is empty."); 
                return _items[0]; 
            } 
            /// <summary>Removes the entry at the top of the heap.</summary> 
            public KeyValuePair<TKey, TValue> Remove() 
            { 
                // Get the first item and save it for later (this is what will be returned). 
                if (_items.Count == 0) throw new InvalidOperationException("The heap is empty."); 
                KeyValuePair<TKey, TValue> toReturn = _items[0]; 
                // Remove the first item if there will only be 0 or 1 items left after doing so.   
                if (_items.Count <= 2) _items.RemoveAt(0); 
                // A reheapify will be required for the removal 
                else 
                { 
                    // Remove the first item and move the last item to the front. 
                    _items[0] = _items[_items.Count - 1]; 
                    _items.RemoveAt(_items.Count - 1); 
                    // Start reheapify 
                    int current = 0, possibleSwap = 0; 
                    // Keep going until the tree is a heap 
                    while (true) 
                    { 
                        // Get the positions of the node's children 
                        int leftChildPos = 2 * current + 1; 
                        int rightChildPos = leftChildPos + 1; 
                        // Should we swap with the left child? 
                        if (leftChildPos < _items.Count) 
                        { 
                            // Get the two entries to compare (node and its left child) 
                            var entry1 = _items[current]; 
                            var entry2 = _items[leftChildPos]; 
                            // If the child has a lower key than the parent, set that as a possible swap 
                            if (entry2.Key.CompareTo(entry1.Key) < 0) possibleSwap = leftChildPos; 
                        } 
                        else break; // if can't swap this, we're done 
                        // Should we swap with the right child?  Note that now we check with the possible swap 
                        // position (which might be current and might be left child). 
                        if (rightChildPos < _items.Count) 
                        { 
                            // Get the two entries to compare (node and its left child) 
                            var entry1 = _items[possibleSwap]; 
                            var entry2 = _items[rightChildPos]; 
                            // If the child has a lower key than the parent, set that as a possible swap 
                            if (entry2.Key.CompareTo(entry1.Key) < 0) possibleSwap = rightChildPos; 
                        } 
                        // Now swap current and possible swap if necessary 
                        if (current != possibleSwap) 
                        { 
                            var temp = _items[current]; 
                            _items[current] = _items[possibleSwap]; 
                            _items[possibleSwap] = temp; 
                        } 
                        else break; // if nothing to swap, we're done 
                        // Update current to the location of the swap 
                        current = possibleSwap; 
                    } 
                } 
                // Return the item from the heap 
                return toReturn; 
            } 
            /// <summary>Gets the number of objects stored in the heap.</summary> 
            public int Count { get { return _items.Count; } } 
            internal List<KeyValuePair<TKey, TValue>> Items { get { return _items; } } 
        }
        private readonly AutoResetEvent _newItem = new AutoResetEvent(false);
        private readonly object _syncLock = new object();
        private readonly MinBinaryHeap _minHeap = new MinBinaryHeap();
    } 
}

测试程序:

using System;
using System.Diagnostics;
using System.Threading;
namespace Demo
{
    public static class Program
    {
        private static void Main(string[] args)
        {
            _rng = new Random(34324);
            int threadCount = 8;
            int maxBlocks = 200;
            ThreadPool.SetMinThreads(threadCount + 2, 4);  // Kludge!
            var stopwatch = new Stopwatch();
            _numBlocks = maxBlocks;
            stopwatch.Restart();
            var processor = new ParallelBlockProcessor<byte[]>(read, process, write, threadCount);
            processor.WaitForFinished(Timeout.Infinite);
            Console.WriteLine("'n'nFinished in " + stopwatch.Elapsed + "'n'n");
        }
        private static byte[] read()
        {
            if (_numBlocks-- == 0)
            {
                return null;
            }
            var result = new byte[128];
            result[0] = (byte)_numBlocks;
            Console.WriteLine("Supplied input: " + _numBlocks);
            return result;
        }
        private static byte[] process(byte[] data)
        {
            if (data[0] == 190)/*!*/
            {
                Thread.Sleep(5000);
            }
            Thread.Sleep(10 + _rng.Next(50));
            Console.WriteLine("Processed: " + data[0]);
            return data;
        }
        private static void write(byte[] data)
        {
            Console.WriteLine("Received output: " + data[0]);
        }
        private static Random _rng;
        private static int _numBlocks;
    }
}

您可以使用CountdownEVent,它允许您等待多个线程。

的例子:static CountdownEvent _countdown = new CountdownEvent (3);

static void Main()
{
  new Thread (SaySomething).Start ("I am thread 1");
  new Thread (SaySomething).Start ("I am thread 2");
  new Thread (SaySomething).Start ("I am thread 3");
  _countdown.Wait();   // Blocks until Signal has been called 3 times
  Console.WriteLine ("All threads have finished speaking!");
}
static void SaySomething (object thing)
{
  Thread.Sleep (1000);
  Console.WriteLine (thing);
  _countdown.Signal();
}

这段代码不保证线程1-3将按照这个顺序执行,但是如果你先调用信号方法,我相信应该解决这个问题

另一个更有效的方法是寻求实现Monitor.Pulse()和Monitor.Wait()机制,您可以将其与Thread结合使用。睡眠在理论上是指当一个线程完成执行临界区(在你的例子中是一个帧)时,让它进入睡眠状态。在一个线程完成对帧的处理后,将该线程置于睡眠状态,并对等待的线程进行脉冲操作,直到所有帧都完成,然后唤醒线程....线程很棘手,因为很难知道它们何时会完成执行。