如何在多线程中并行运行相关任务

本文关键字:运行 任务 并行 多线程 | 更新日期: 2023-09-27 18:11:01

我有一组巨大的任务要在c#中执行。每次计算都会产生一个结果数据,我想写入一个文件(我使用SQLite)。目前,我正在按顺序这样做[Task1 -> FileSaving1], [Task2 -> FileSaving2],。等等。

但是我的优先级是首先完成所有的计算,所以我想在一个线程中并行运行计算,并在另一个线程中完成文件保存。每次计算结束并准备写入数据时,都会向文件保存线程发出信号。文件保存可以是顺序的或并行的。

如何在c#中实现这个?我使用的是。net 4.0。如果可能的话,请给我提供一些例子。

如何在多线程中并行运行相关任务

您可以使用BlockingCollection<T>来帮助完成此操作。

棘手的事情是你想要几个线程处理工作项,但是它们可以以随机顺序产生输出,所以你需要在写入输出时复用输出(假设你想要以相同的顺序写入数据,如果你使用旧的单线程解决方案)。

我写了一个类来做这个。

它假设您可以将每个"工作项"封装在一个类的实例中。这些实例被添加到工作队列中;然后多个线程(通过Task)可以从工作队列中删除工作项,处理它们,然后将它们输出到优先级队列。

最后,另一个线程可以从已完成的队列中删除已完成的工作项,小心地将它们多路复用,以便它以与最初添加到工作队列中的顺序相同的顺序删除这些项。

这个实现为你创建和管理线程。您需要告诉它要使用多少工作线程,并为它提供提供新工作项(Read()),处理每个工作项(Process())和输出每个工作项(Write())的委托。

只有Process()委托可以被多个线程调用。

注意,如果你不关心顺序,你可以避免所有这些东西,直接使用BlockingCollection

代码如下:

public sealed class ParallelWorkProcessor<T> where T: class // T is the work item type.
{
    public delegate T    Read();           // Called by only one thread.
    public delegate T    Process(T block); // Called simultaneously by multiple threads.
    public delegate void Write(T block);   // Called by only one thread.
    public ParallelWorkProcessor(Read read, Process process, Write write, int numWorkers = 0)
    {
        _read    = read;
        _process = process;
        _write   = write;
        numWorkers = (numWorkers > 0) ? numWorkers : Environment.ProcessorCount;
        _workPool    = new SemaphoreSlim(numWorkers*2);
        _inputQueue  = new BlockingCollection<WorkItem>(numWorkers);
        _outputQueue = new ConcurrentPriorityQueue<int, T>();
        _workers     = new Task[numWorkers];
        startWorkers();
        Task.Factory.StartNew(enqueueWorkItems);
        _multiplexor = Task.Factory.StartNew(multiplex);
    }
    private void startWorkers()
    {
        for (int i = 0; i < _workers.Length; ++i)
        {
            _workers[i] = Task.Factory.StartNew(processBlocks);
        }
    }
    private void enqueueWorkItems()
    {
        int index = 0;
        while (true)
        {
            T data = _read();
            if (data == null) // Signals end of input.
            {
                _inputQueue.CompleteAdding();
                _outputQueue.Enqueue(index, null); // Special sentinel WorkItem .
                break;
            }
            _workPool.Wait();
            _inputQueue.Add(new WorkItem(data, index++));
        }
    }
    private void multiplex()
    {
        int index = 0; // Next required index.
        int last = int.MaxValue;
        while (index != last)
        {
            KeyValuePair<int, T> workItem;
            _outputQueue.WaitForNewItem(); // There will always be at least one item - the sentinel item.
            while ((index != last) && _outputQueue.TryPeek(out workItem))
            {
                if (workItem.Value == null) // The sentinel item has a null value to indicate that it's the sentinel.
                {
                    last = workItem.Key;  // The sentinel's key is the index of the last block + 1.
                }
                else if (workItem.Key == index) // Is this block the next one that we want?
                {
                    // Even if new items are added to the queue while we're here, the new items will be lower priority.
                    // Therefore it is safe to assume that the item we will dequeue now is the same one we peeked at.
                    _outputQueue.TryDequeue(out workItem);
                    Contract.Assume(workItem.Key == index); // This *must* be the case.
                    _workPool.Release();                    // Allow the enqueuer to queue another work item.
                    _write(workItem.Value);
                    ++index;
                }
                else // If it's not the block we want, we know we'll get a new item at some point.
                {
                    _outputQueue.WaitForNewItem();
                }
            }
        }
    }
    private void processBlocks()
    {
        foreach (var block in _inputQueue.GetConsumingEnumerable())
        {
            var processedData = _process(block.Data);
            _outputQueue.Enqueue(block.Index, processedData);
        }
    }
    public bool WaitForFinished(int maxMillisecondsToWait) // Can be Timeout.Infinite.
    {
        return _multiplexor.Wait(maxMillisecondsToWait);
    }
    private sealed class WorkItem
    {
        public WorkItem(T data, int index)
        {
            Data  = data;
            Index = index;
        }
        public T   Data  { get; private set; }
        public int Index { get; private set; }
    }
    private readonly Task[] _workers;
    private readonly Task _multiplexor;
    private readonly SemaphoreSlim _workPool;
    private readonly BlockingCollection<WorkItem> _inputQueue;
    private readonly ConcurrentPriorityQueue<int, T> _outputQueue;
    private readonly Read    _read;
    private readonly Process _process;
    private readonly Write   _write;
}

下面是它的测试代码:

namespace Demo
{
    public static class Program
    {
        private static void Main(string[] args)
        {
            _rng = new Random(34324);
            int threadCount = 8;
            _maxBlocks = 200;
            ThreadPool.SetMinThreads(threadCount + 2, 4); // Kludge to prevent slow thread startup.
            _numBlocks = _maxBlocks;
            var stopwatch = Stopwatch.StartNew();
            var processor = new ParallelWorkProcessor<byte[]>(read, process, write, threadCount);
            processor.WaitForFinished(Timeout.Infinite);
            Console.WriteLine("'n'nFinished in " + stopwatch.Elapsed + "'n'n");
        }
        private static byte[] read()
        {
            if (_numBlocks-- == 0)
            {
                return null;
            }
            var result = new byte[128];
            result[0] = (byte)(_maxBlocks-_numBlocks);
            Console.WriteLine("Supplied input: " + result[0]);
            return result;
        }
        private static byte[] process(byte[] data)
        {
            if (data[0] == 10) // Hack for test purposes. Make it REALLY slow for this item!
            {
                Console.WriteLine("Delaying a call to process() for 5s for ID 10");
                Thread.Sleep(5000);
            }
            Thread.Sleep(10 + _rng.Next(50));
            Console.WriteLine("Processed: " + data[0]);
            return data;
        }
        private static void write(byte[] data)
        {
            Console.WriteLine("Received output: " + data[0]);
        }
        private static Random _rng;
        private static int _numBlocks;
        private static int _maxBlocks;
    }
}

这也需要从这里实现ConcurrentPriorityQueue。

我必须稍微修改一下,所以这是我修改后的版本:

using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading;

namespace ConsoleApplication1
{
    /// <summary>Provides a thread-safe priority queue data structure.</summary> 
    /// <typeparam name="TKey">Specifies the type of keys used to prioritize values.</typeparam> 
    /// <typeparam name="TValue">Specifies the type of elements in the queue.</typeparam> 
    [SuppressMessage("Microsoft.Naming", "CA1711:IdentifiersShouldNotHaveIncorrectSuffix")]
    [SuppressMessage("Microsoft.Naming", "CA1710:IdentifiersShouldHaveCorrectSuffix")]
    [DebuggerDisplay("Count={Count}")] 
    public sealed class ConcurrentPriorityQueue<TKey, TValue> : 
        IProducerConsumerCollection<KeyValuePair<TKey,TValue>>  
        where TKey : IComparable<TKey> 
    { 
        /// <summary>Initializes a new instance of the ConcurrentPriorityQueue class.</summary> 
        public ConcurrentPriorityQueue() {} 
        /// <summary>Initializes a new instance of the ConcurrentPriorityQueue class that contains elements copied from the specified collection.</summary> 
        /// <param name="collection">The collection whose elements are copied to the new ConcurrentPriorityQueue.</param> 
        [SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
        public ConcurrentPriorityQueue(IEnumerable<KeyValuePair<TKey, TValue>> collection) 
        { 
            if (collection == null) throw new ArgumentNullException("collection"); 
            foreach (var item in collection) _minHeap.Insert(item); 
        } 
        /// <summary>Adds the key/value pair to the priority queue.</summary> 
        /// <param name="priority">The priority of the item to be added.</param> 
        /// <param name="value">The item to be added.</param> 
        public void Enqueue(TKey priority, TValue value) 
        { 
            Enqueue(new KeyValuePair<TKey, TValue>(priority, value)); 
        } 
        /// <summary>Adds the key/value pair to the priority queue.</summary> 
        /// <param name="item">The key/value pair to be added to the queue.</param> 
        public void Enqueue(KeyValuePair<TKey, TValue> item) 
        {
            lock (_syncLock)
            {
                _minHeap.Insert(item);
                _newItem.Set();
            }
        }
        /// <summary>Waits for a new item to appear.</summary>
        public void WaitForNewItem()
        {
            _newItem.WaitOne();
        }
        /// <summary>Attempts to remove and return the next prioritized item in the queue.</summary> 
        /// <param name="result"> 
        /// When this method returns, if the operation was successful, result contains the object removed. If 
        /// no object was available to be removed, the value is unspecified. 
        /// </param> 
        /// <returns> 
        /// true if an element was removed and returned from the queue succesfully; otherwise, false. 
        /// </returns> 
        public bool TryDequeue(out KeyValuePair<TKey, TValue> result) 
        { 
            result = default(KeyValuePair<TKey, TValue>); 
            lock (_syncLock) 
            { 
                if (_minHeap.Count > 0) 
                { 
                    result = _minHeap.Remove(); 
                    return true; 
                } 
            } 
            return false; 
        } 
        /// <summary>Attempts to return the next prioritized item in the queue.</summary> 
        /// <param name="result"> 
        /// When this method returns, if the operation was successful, result contains the object. 
        /// The queue was not modified by the operation. 
        /// </param> 
        /// <returns> 
        /// true if an element was returned from the queue succesfully; otherwise, false. 
        /// </returns> 
        public bool TryPeek(out KeyValuePair<TKey, TValue> result) 
        { 
            result = default(KeyValuePair<TKey, TValue>); 
            lock (_syncLock) 
            { 
                if (_minHeap.Count > 0) 
                { 
                    result = _minHeap.Peek(); 
                    return true; 
                } 
            } 
            return false; 
        } 
        /// <summary>Empties the queue.</summary> 
        public void Clear() { lock(_syncLock) _minHeap.Clear(); } 
        /// <summary>Gets whether the queue is empty.</summary> 
        public bool IsEmpty { get { return Count == 0; } } 
        /// <summary>Gets the number of elements contained in the queue.</summary> 
        public int Count 
        { 
            get { lock (_syncLock) return _minHeap.Count; } 
        } 
        /// <summary>Copies the elements of the collection to an array, starting at a particular array index.</summary> 
        /// <param name="array"> 
        /// The one-dimensional array that is the destination of the elements copied from the queue. 
        /// </param> 
        /// <param name="index"> 
        /// The zero-based index in array at which copying begins. 
        /// </param> 
        /// <remarks>The elements will not be copied to the array in any guaranteed order.</remarks> 
        public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index) 
        { 
            lock (_syncLock) _minHeap.Items.CopyTo(array, index); 
        } 
        /// <summary>Copies the elements stored in the queue to a new array.</summary> 
        /// <returns>A new array containing a snapshot of elements copied from the queue.</returns> 
        public KeyValuePair<TKey, TValue>[] ToArray() 
        { 
            lock (_syncLock) 
            { 
                var clonedHeap = new MinBinaryHeap(_minHeap); 
                var result = new KeyValuePair<TKey, TValue>[_minHeap.Count]; 
                for (int i = 0; i < result.Length; i++) 
                { 
                    result[i] = clonedHeap.Remove(); 
                } 
                return result; 
            } 
        } 
        /// <summary>Attempts to add an item in the queue.</summary> 
        /// <param name="item">The key/value pair to be added.</param> 
        /// <returns> 
        /// true if the pair was added; otherwise, false. 
        /// </returns> 
        bool IProducerConsumerCollection<KeyValuePair<TKey, TValue>>.TryAdd(KeyValuePair<TKey, TValue> item) 
        { 
            Enqueue(item); 
            return true; 
        } 
        /// <summary>Attempts to remove and return the next prioritized item in the queue.</summary> 
        /// <param name="item"> 
        /// When this method returns, if the operation was successful, result contains the object removed. If 
        /// no object was available to be removed, the value is unspecified. 
        /// </param> 
        /// <returns> 
        /// true if an element was removed and returned from the queue succesfully; otherwise, false. 
        /// </returns> 
        bool IProducerConsumerCollection<KeyValuePair<TKey, TValue>>.TryTake(out KeyValuePair<TKey, TValue> item) 
        { 
            return TryDequeue(out item); 
        } 
        /// <summary>Returns an enumerator that iterates through the collection.</summary> 
        /// <returns>An enumerator for the contents of the queue.</returns> 
        /// <remarks> 
        /// The enumeration represents a moment-in-time snapshot of the contents of the queue. It does not 
        /// reflect any updates to the collection after GetEnumerator was called. The enumerator is safe to 
        /// use concurrently with reads from and writes to the queue. 
        /// </remarks> 
        public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator() 
        { 
            var arr = ToArray(); 
            return ((IEnumerable<KeyValuePair<TKey, TValue>>)arr).GetEnumerator(); 
        } 
        /// <summary>Returns an enumerator that iterates through a collection.</summary> 
        /// <returns>An IEnumerator that can be used to iterate through the collection.</returns> 
        IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } 
        /// <summary>Copies the elements of the collection to an array, starting at a particular array index.</summary> 
        /// <param name="array"> 
        /// The one-dimensional array that is the destination of the elements copied from the queue. 
        /// </param> 
        /// <param name="index"> 
        /// The zero-based index in array at which copying begins. 
        /// </param> 
        void ICollection.CopyTo(Array array, int index) 
        { 
            lock (_syncLock) ((ICollection)_minHeap.Items).CopyTo(array, index); 
        } 
        /// <summary> 
        /// Gets a value indicating whether access to the ICollection is synchronized with the SyncRoot. 
        /// </summary> 
        bool ICollection.IsSynchronized { get { return true; } } 
        /// <summary> 
        /// Gets an object that can be used to synchronize access to the collection. 
        /// </summary> 
        object ICollection.SyncRoot { get { return _syncLock; } } 
        /// <summary>Implements a binary heap that prioritizes smaller values.</summary> 
        private sealed class MinBinaryHeap 
        { 
            private readonly List<KeyValuePair<TKey, TValue>> _items; 
            /// <summary>Initializes an empty heap.</summary> 
            public MinBinaryHeap() 
            { 
                _items = new List<KeyValuePair<TKey, TValue>>(); 
            } 
            /// <summary>Initializes a heap as a copy of another heap instance.</summary> 
            /// <param name="heapToCopy">The heap to copy.</param> 
            /// <remarks>Key/Value values are not deep cloned.</remarks> 
            public MinBinaryHeap(MinBinaryHeap heapToCopy) 
            { 
                _items = new List<KeyValuePair<TKey, TValue>>(heapToCopy.Items); 
            } 
            /// <summary>Empties the heap.</summary> 
            public void Clear() { _items.Clear(); } 
            /// <summary>Adds an item to the heap.</summary> 
            public void Insert(KeyValuePair<TKey,TValue> entry) 
            { 
                // Add the item to the list, making sure to keep track of where it was added. 
                _items.Add(entry); 
                int pos = _items.Count - 1; 
                // If the new item is the only item, we're done. 
                if (pos == 0) return; 
                // Otherwise, perform log(n) operations, walking up the tree, swapping 
                // where necessary based on key values 
                while (pos > 0) 
                { 
                    // Get the next position to check 
                    int nextPos = (pos-1) / 2; 
                    // Extract the entry at the next position 
                    var toCheck = _items[nextPos]; 
                    // Compare that entry to our new one.  If our entry has a smaller key, move it up. 
                    // Otherwise, we're done. 
                    if (entry.Key.CompareTo(toCheck.Key) < 0) 
                    { 
                        _items[pos] = toCheck; 
                        pos = nextPos; 
                    } 
                    else break; 
                } 
                // Make sure we put this entry back in, just in case 
                _items[pos] = entry; 
            } 
            /// <summary>Returns the entry at the top of the heap.</summary> 
            public KeyValuePair<TKey, TValue> Peek() 
            { 
                // Returns the first item 
                if (_items.Count == 0) throw new InvalidOperationException("The heap is empty."); 
                return _items[0]; 
            } 
            /// <summary>Removes the entry at the top of the heap.</summary> 
            public KeyValuePair<TKey, TValue> Remove() 
            { 
                // Get the first item and save it for later (this is what will be returned). 
                if (_items.Count == 0) throw new InvalidOperationException("The heap is empty."); 
                KeyValuePair<TKey, TValue> toReturn = _items[0]; 
                // Remove the first item if there will only be 0 or 1 items left after doing so.   
                if (_items.Count <= 2) _items.RemoveAt(0); 
                // A reheapify will be required for the removal 
                else 
                { 
                    // Remove the first item and move the last item to the front. 
                    _items[0] = _items[_items.Count - 1]; 
                    _items.RemoveAt(_items.Count - 1); 
                    // Start reheapify 
                    int current = 0, possibleSwap = 0; 
                    // Keep going until the tree is a heap 
                    while (true) 
                    { 
                        // Get the positions of the node's children 
                        int leftChildPos = 2 * current + 1; 
                        int rightChildPos = leftChildPos + 1; 
                        // Should we swap with the left child? 
                        if (leftChildPos < _items.Count) 
                        { 
                            // Get the two entries to compare (node and its left child) 
                            var entry1 = _items[current]; 
                            var entry2 = _items[leftChildPos]; 
                            // If the child has a lower key than the parent, set that as a possible swap 
                            if (entry2.Key.CompareTo(entry1.Key) < 0) possibleSwap = leftChildPos; 
                        } 
                        else break; // if can't swap this, we're done 
                        // Should we swap with the right child?  Note that now we check with the possible swap 
                        // position (which might be current and might be left child). 
                        if (rightChildPos < _items.Count) 
                        { 
                            // Get the two entries to compare (node and its left child) 
                            var entry1 = _items[possibleSwap]; 
                            var entry2 = _items[rightChildPos]; 
                            // If the child has a lower key than the parent, set that as a possible swap 
                            if (entry2.Key.CompareTo(entry1.Key) < 0) possibleSwap = rightChildPos; 
                        } 
                        // Now swap current and possible swap if necessary 
                        if (current != possibleSwap) 
                        { 
                            var temp = _items[current]; 
                            _items[current] = _items[possibleSwap]; 
                            _items[possibleSwap] = temp; 
                        } 
                        else break; // if nothing to swap, we're done 
                        // Update current to the location of the swap 
                        current = possibleSwap; 
                    } 
                } 
                // Return the item from the heap 
                return toReturn; 
            } 
            /// <summary>Gets the number of objects stored in the heap.</summary> 
            public int Count { get { return _items.Count; } } 
            internal List<KeyValuePair<TKey, TValue>> Items { get { return _items; } } 
        }
        private readonly AutoResetEvent _newItem = new AutoResetEvent(false);
        private readonly object _syncLock = new object();
        private readonly MinBinaryHeap _minHeap = new MinBinaryHeap();
    } 
}