本文关键字:添加 消费者 多线程 | 更新日期: 2023-09-27 18:19:04
// Synchronized.cs
// Showing multiple threads modifying a shared object with synchronization.
using System;
using System.Threading;
namespace Synchronized
// this class synchronizes access to an integer
public class HoldIntegerSynchronized
// buffer shared by producer and consumer threads
private int buffer = -1;
// occupiedBufferCount maintains count of occupied buffers
private int occupiedBufferCount = 0;
// property Buffer
public int Buffer
// obtain lock on this object
// if there is no data to read, place invoking
// thread in WaitSleepJoin state
if (occupiedBufferCount == 0)
Thread.CurrentThread.Name + " tries to read.");
DisplayState("Buffer empty. " +
Thread.CurrentThread.Name + " waits.");
// indicate that producer can store another value
// because a consumer just retrieved buffer value
// occupiedBufferCount -= 1;
Thread.CurrentThread.Name + " reads " + buffer);
// tell waiting thread (if there is one) to
// become ready to execute (Started state)
// Get copy of buffer before releasing lock.
// It is possible that the producer could be
// assigned the processor immediately after the
// monitor is released and before the return
// statement executes. In this case, the producer
// would assign a new value to buffer before the
// return statement returns the value to the
// consumer. Thus, the consumer would receive the
// new value. Making a copy of buffer and
// returning the copy ensures that the
// consumer receives the proper value.
int bufferCopy = buffer;
// release lock on this object
return bufferCopy;
} // end get
// acquire lock for this object
// if there are no empty locations, place invoking
// thread in WaitSleepJoin state
if (occupiedBufferCount == 1)
Thread.CurrentThread.Name + " tries to write.");
DisplayState("Buffer full. " +
Thread.CurrentThread.Name + " waits.");
// set new sharedInt value
buffer = value;
// indicate producer cannot store another value
// until consumer retrieves current sharedInt value
// occupiedBufferCount += 1;
Thread.CurrentThread.Name + " writes " + buffer);
// tell waiting thread (if there is one) to
// become ready to execute (Started state)
// release lock on this object
} // end set
} // end property Buffer
// display current operation and buffer state
public void DisplayState(string operation)
operation, buffer, occupiedBufferCount);
} // end class HoldIntegerSynchronized
// class Producer's Produce method controls a thread that
// stores values from 1 to 4 in sharedLocation
class Producer
private HoldIntegerSynchronized sharedLocation;
private Random randomSleepTime;
// constructor
public Producer(
HoldIntegerSynchronized shared, Random random)
sharedLocation = shared;
randomSleepTime = random;
// store values 1-4 in object sharedLocation
public void Produce()
// sleep for random interval upto 3000 milliseconds
// then set sharedLocation's Buffer property
for (int count = 1; count <= 10; count++)
Thread.Sleep(randomSleepTime.Next(1, 3000));
sharedLocation.Buffer = count;
Console.WriteLine(Thread.CurrentThread.Name +
" done producing.'nTerminating " +
Thread.CurrentThread.Name + ".'n");
} // end method Produce
} // end class Producer
// class Consumer's Consume method controls a thread that
// loops four times and reads a value from sharedLocation
class Consumer
private HoldIntegerSynchronized sharedLocation;
private Random randomSleepTime;
// constructor
public Consumer(
HoldIntegerSynchronized shared, Random random)
sharedLocation = shared;
randomSleepTime = random;
// read sharedLocation's value four times
public void Consume()
int sum = 0;
// get current thread
Thread current = Thread.CurrentThread;
// sleep for random interval upto 3000 milliseconds
// then add sharedLocation's Buffer property value
// to sum
for (int count = 1; count <= 10; count++)
Thread.Sleep(randomSleepTime.Next(1, 3000));
sum += sharedLocation.Buffer;
Console.WriteLine(Thread.CurrentThread.Name +
" read values totaling: " + sum +
".'nTerminating " + Thread.CurrentThread.Name + ".'n");
} // end method Consume
} // end class Consumer
// this class creates producer and consumer threads
class SharedCell
// create producer and consumer threads and start them
static void Main(string[] args)
// create shared object used by threads
HoldIntegerSynchronized holdInteger =
new HoldIntegerSynchronized();
// Random object used by each thread
Random random = new Random();
// create Producer and Consumer objects
Producer producer =
new Producer(holdInteger, random);
Consumer consumer =
new Consumer(holdInteger, random);
Consumer consumer2=
new Consumer(holdInteger, random);
Consumer consumer3=
new Consumer(holdInteger, random);
// output column heads and initial buffer state
"Operation", "Buffer", "Occupied Count");
holdInteger.DisplayState("Initial state");
// create threads for producer and consumer and set
// delegates for each thread
Thread producerThread =
new Thread(new ThreadStart(producer.Produce));
producerThread.Name = "Producer";
Thread consumerThread =
new Thread(new ThreadStart(consumer.Consume));
consumerThread.Name = "Consumer";
// start each thread
} // end method Main
} // end class SharedCell
} // end end!.
using System;
using System.Threading;
namespace MCSharp {
/** <summary>
A thread safe, blocking queue.</summary>
All members of this class are thread safe.</remarks>
public class MessageQueue<T> {
private LinkedQueue<T> messagequeue=new LinkedQueue<T>();
private Object waitobject=new Object();
private Int32 numwaitingthreads=0;
private Object emptyobject=new Object();
/** <summary>
Returns the number of items currently waiting in the queue.</summary>
public Int32 MessageCount {
get { lock (waitobject) return messagequeue.Count; }
/** <summary>
Returns the number of threads currently waiting for items to be added to the queue.</summary>
public Int32 ThreadCount {
get { lock (waitobject) return numwaitingthreads; }
/** <summary>
Creates a new queue.</summary>
public MessageQueue () { }
/** <summary>
Adds a new item to the back of the queue.</summary>
<param name="message">
The item to add to the queue.</param>
public void Enqueue (T message) {
lock (waitobject) {
/** <summary>
Removes an item from the front of the queue.</summary>
If there is currently no item at the front of the queue the thread will block
until there is one, and then return with that item.</remarks>
The item from the front of the queue.</returns>
public T Dequeue () {
lock (waitobject) {
while (messagequeue.Count==0) {
lock (emptyobject) {
return messagequeue.Dequeue();
/** <summary>
Waits for the queue to empty.</summary>
The calling thread blocks until the thread's <see cref="MCSharp.MessageQueue{T}.MessageCount">
message count</see> is zero.</remarks>
public void WaitForEmpty () {
while (true) {
try {
if (messagequeue.Count==0) {
} finally {
try {
} finally {
using System;
namespace MCSharp {
/** <summary>
Implements a queue based around a singly linked list.</summary>
The .NET's built in queue implementation uses a dynamically-resizing array
for its data storage.</remarks>
public class LinkedQueue<T> {
private class SinglyLinkedListNode<NodeT> {
public SinglyLinkedListNode<NodeT> Next=null;
public NodeT Item;
public SinglyLinkedListNode (NodeT item) {
private SinglyLinkedListNode<T> head=null;
private SinglyLinkedListNode<T> tail=null;
private Int32 count=0;
/** <summary>
Returns the number of items in the queue.</summary>
public Int32 Count {
get { return count; }
/** <summary>
Creates a new queue.</summary>
public LinkedQueue () { }
/** <summary>
Adds an item to the rear of the queue.</summary>
<param name="item">
The item to add to the queue.</param>
public void Enqueue (T item) {
SinglyLinkedListNode<T> newnode=new SinglyLinkedListNode<T>(item);
if (head==null) {
} else {
/** <summary>
Returns the item at the front of the queue.</summary>
The item at the front of the queue.</returns>
public T Dequeue () {
if (count==0) throw new InvalidOperationException();
T returnthis=head.Item;
if (head.Next==null) tail=null;
return returnthis;