如何在多线程中添加消费者

本文关键字:添加 消费者 多线程 | 更新日期: 2023-09-27 18:19:04

这个程序用于同步使用监视器,它有一个生产者和消费者以及将读取和写入的缓冲区我怎么能再增加2个消费者(所以我有3个在这个程序),程序仍然正常工作?

  // Synchronized.cs
    // Showing multiple threads modifying a shared object with synchronization.
    using System;
    using System.Threading;
    namespace Synchronized
    {
        // this class synchronizes access to an integer
        public class HoldIntegerSynchronized
        {
            // buffer shared by producer and consumer threads 
            private int buffer = -1;
            // occupiedBufferCount maintains count of occupied buffers
            private int occupiedBufferCount = 0;
            // property Buffer
            public int Buffer
            {
                get
                {
                    // obtain lock on this object
                    Monitor.Enter(this);
                    // if there is no data to read, place invoking 
                    // thread in WaitSleepJoin state
                    if (occupiedBufferCount == 0)
                    {
                        Console.WriteLine(
                           Thread.CurrentThread.Name + " tries to read.");
                        DisplayState("Buffer empty. " +
                           Thread.CurrentThread.Name + " waits.");
                        Monitor.Wait(this);
                    }
                    // indicate that producer can store another value 
                    // because a consumer just retrieved buffer value
                   --occupiedBufferCount;
                  //  occupiedBufferCount -= 1;
                    DisplayState(
                       Thread.CurrentThread.Name + " reads " + buffer);
                    // tell waiting thread (if there is one) to 
                    // become ready to execute (Started state)
                    Monitor.Pulse(this);
                    // Get copy of buffer before releasing lock. 
                    // It is possible that the producer could be
                    // assigned the processor immediately after the
                    // monitor is released and before the return 
                    // statement executes. In this case, the producer 
                    // would assign a new value to buffer before the 
                    // return statement returns the value to the 
                    // consumer. Thus, the consumer would receive the 
                    // new value. Making a copy of buffer and 
                    // returning the copy ensures that the
                    // consumer receives the proper value.
                    int bufferCopy = buffer;
                    // release lock on this object
                    Monitor.Exit(this);
                    return bufferCopy;
                } // end get
                set
                {
                    // acquire lock for this object
                    Monitor.Enter(this);
                    // if there are no empty locations, place invoking
                    // thread in WaitSleepJoin state
                    if (occupiedBufferCount == 1)
                    {
                        Console.WriteLine(
                           Thread.CurrentThread.Name + " tries to write.");
                        DisplayState("Buffer full. " +
                           Thread.CurrentThread.Name + " waits.");
                        Monitor.Wait(this);
                    }
                    // set new sharedInt value
                    buffer = value;
                    // indicate producer cannot store another value 
                    // until consumer retrieves current sharedInt value
                    ++occupiedBufferCount;
                 //   occupiedBufferCount += 1;
                    DisplayState(
                       Thread.CurrentThread.Name + " writes " + buffer);
                    // tell waiting thread (if there is one) to 
                    // become ready to execute (Started state)
                    Monitor.Pulse(this);
                    // release lock on this object
                    Monitor.Exit(this);
                } // end set
            } // end property Buffer
            // display current operation and buffer state
            public void DisplayState(string operation)
            {
                Console.WriteLine("{0,-35}{1,-9}{2}'n",
                   operation, buffer, occupiedBufferCount);
            }
        } // end class HoldIntegerSynchronized
        // class Producer's Produce method controls a thread that
        // stores values from 1 to 4 in sharedLocation
        class Producer
        {
            private HoldIntegerSynchronized sharedLocation;
            private Random randomSleepTime;
            // constructor
            public Producer(
               HoldIntegerSynchronized shared, Random random)
            {
                sharedLocation = shared;
                randomSleepTime = random;
            }
            // store values 1-4 in object sharedLocation
            public void Produce()
            {
                // sleep for random interval upto 3000 milliseconds
                // then set sharedLocation's Buffer property
                for (int count = 1; count <= 10; count++)
                {
                    Thread.Sleep(randomSleepTime.Next(1, 3000));
                    sharedLocation.Buffer = count;
                }
                Console.WriteLine(Thread.CurrentThread.Name +
                   " done producing.'nTerminating " +
                   Thread.CurrentThread.Name + ".'n");
            } // end method Produce
        } // end class Producer
        // class Consumer's Consume method controls a thread that
        // loops four times and reads a value from sharedLocation
        class Consumer
        {
            private HoldIntegerSynchronized sharedLocation;
            private Random randomSleepTime;
            // constructor
            public Consumer(
               HoldIntegerSynchronized shared, Random random)
            {
                sharedLocation = shared;
                randomSleepTime = random;
            }
            // read sharedLocation's value four times
            public void Consume()
            {
                int sum = 0;
                // get current thread
                Thread current = Thread.CurrentThread;
                // sleep for random interval upto 3000 milliseconds
                // then add sharedLocation's Buffer property value
                // to sum
                for (int count = 1; count <= 10; count++)
                {
                    Thread.Sleep(randomSleepTime.Next(1, 3000));
                    sum += sharedLocation.Buffer;
                }
                Console.WriteLine(Thread.CurrentThread.Name +
                   " read values totaling: " + sum +
                   ".'nTerminating " + Thread.CurrentThread.Name + ".'n");
            } // end method Consume
        } // end class Consumer
        // this class creates producer and consumer threads
        class SharedCell
        {
            // create producer and consumer threads and start them
            static void Main(string[] args)
            {
                // create shared object used by threads
                HoldIntegerSynchronized holdInteger =
                   new HoldIntegerSynchronized();
                // Random object used by each thread
                Random random = new Random();
                // create Producer and Consumer objects
                Producer producer =
                   new Producer(holdInteger, random);
                Consumer consumer =
                   new Consumer(holdInteger, random);
                Consumer consumer2=
                   new Consumer(holdInteger, random);
                Consumer consumer3=
                   new Consumer(holdInteger, random);
                // output column heads and initial buffer state
                Console.WriteLine("{0,-35}{1,-9}{2}'n",
                   "Operation", "Buffer", "Occupied Count");
                holdInteger.DisplayState("Initial state");
                // create threads for producer and consumer and set 
                // delegates for each thread
                Thread producerThread =
                   new Thread(new ThreadStart(producer.Produce));
                producerThread.Name = "Producer";
                Thread consumerThread =
                   new Thread(new ThreadStart(consumer.Consume));
                consumerThread.Name = "Consumer";

                // start each thread
                producerThread.Start();
                consumerThread.Start();
            } // end method Main
        } // end class SharedCell
    } // end end!.

如何在多线程中添加消费者

我发现在这种情况下,我应该编写一个类来实现线程安全和阻塞队列,该队列使用condvars(在。net中,Monitor.Pulse()和Monitor.PulseAll())来等待队列

此解决方案对于您想要添加和删除元素的任意多个线程都是安全的。

一个例子:

using System;
using System.Threading;

namespace MCSharp {

    /** <summary>
        A thread safe, blocking queue.</summary>
        <remarks>
        All members of this class are thread safe.</remarks>
    */
    public class MessageQueue<T> {

        private LinkedQueue<T> messagequeue=new LinkedQueue<T>();
        private Object waitobject=new Object();
        private Int32 numwaitingthreads=0;
        private Object emptyobject=new Object();

        /** <summary>
            Returns the number of items currently waiting in the queue.</summary>
        */
        public Int32 MessageCount {
            get {   lock (waitobject) return messagequeue.Count;    }
        }

        /** <summary>
            Returns the number of threads currently waiting for items to be added to the queue.</summary>
        */
        public Int32 ThreadCount {
            get {   lock (waitobject) return numwaitingthreads; }
        }

        /** <summary>
            Creates a new queue.</summary>
        */
        public MessageQueue () {    }

        /** <summary>
            Adds a new item to the back of the queue.</summary>
            <param name="message">
            The item to add to the queue.</param>
        */
        public void Enqueue (T message) {
            lock (waitobject) {
                messagequeue.Enqueue(message);
                Monitor.Pulse(waitobject);
            }
        }

        /** <summary>
            Removes an item from the front of the queue.</summary>
            <remarks>
            If there is currently no item at the front of the queue the thread will block
            until there is one, and then return with that item.</remarks>
            <returns>
            The item from the front of the queue.</returns>
        */
        public T Dequeue () {
            lock (waitobject) {
                while (messagequeue.Count==0) {
                    numwaitingthreads++;
                    Monitor.Wait(waitobject);
                    numwaitingthreads--;
                }
                lock (emptyobject) {
                    Monitor.PulseAll(emptyobject);
                    return messagequeue.Dequeue();
                }
            }
        }

        /** <summary>
            Waits for the queue to empty.</summary>
            <remarks>
            The calling thread blocks until the thread's <see cref="MCSharp.MessageQueue{T}.MessageCount">
            message count</see> is zero.</remarks>
        */
        public void WaitForEmpty () {
            while (true) {
                Monitor.Enter(waitobject);
                try {
                    if (messagequeue.Count==0) {
                        return;
                    }
                    Monitor.Enter(emptyobject);
                } finally {
                    Monitor.Exit(waitobject);
                }
                try {
                    Monitor.Wait(emptyobject);
                } finally {
                    Monitor.Exit(emptyobject);
                }
            }
        }

    }

}

它指的是"LinkedQueue"类,这里是:

using System;

namespace MCSharp {

    /** <summary>
        Implements a queue based around a singly linked list.</summary>
        <remarks>
        The .NET's built in queue implementation uses a dynamically-resizing array
        for its data storage.</remarks>
    */
    public class LinkedQueue<T> {

        private class SinglyLinkedListNode<NodeT> {

            public SinglyLinkedListNode<NodeT> Next=null;
            public NodeT Item;

            public SinglyLinkedListNode (NodeT item) {
                Item=item;
            }

        }

        private SinglyLinkedListNode<T> head=null;
        private SinglyLinkedListNode<T> tail=null;
        private Int32 count=0;

        /** <summary>
            Returns the number of items in the queue.</summary>
        */
        public Int32 Count {
            get {   return count;   }
        }

        /** <summary>
            Creates a new queue.</summary>
        */
        public LinkedQueue () { }

        /** <summary>
            Adds an item to the rear of the queue.</summary>
            <param name="item">
            The item to add to the queue.</param>
        */
        public void Enqueue (T item) {
            SinglyLinkedListNode<T> newnode=new SinglyLinkedListNode<T>(item);
            count++;
            if (head==null) {
                head=newnode;
                tail=newnode;
            } else {
                tail.Next=newnode;
                tail=newnode;
            }
        }

        /** <summary>
            Returns the item at the front of the queue.</summary>
            <returns>
            The item at the front of the queue.</returns>
        */
        public T Dequeue () {
            if (count==0) throw new InvalidOperationException();
            T returnthis=head.Item;
            if (head.Next==null) tail=null;
            head=head.Next;
            count--;
            return returnthis;
        }

    }

}