C#监视器/信号机并发为缓冲区生成使用者
本文关键字:缓冲区 使用者 并发 监视器 信号机 | 更新日期: 2023-09-27 18:24:31
我正在解决一个典型的生产者-消费者问题。我有多个生产者和一个消费者。有n生产者线程,每个线程调用SetOne(OrderObject order),消费者调用GetOne()uffer类用作包含<n个单元格,是指Buffer类可用时消耗的每个单元格。出于某种原因,下面的设置有时可以工作,但并不总是占用所有单元格。我已经包含了所有涉及客户端、服务器和缓冲区的类。此外,我可以展示一个运行这个原型的简单原型。仅供参考-使用的方法是首先将信号量初始化为与所用缓冲区大小相同的大小,然后在缓冲区打开后,找到一个打开的单元格,然后对该单元格执行操作。
public class Buffer
{
public static OrderObject[] BufferCells;
public static Semaphore _pool { get; set; }
public static void SetOne(OrderObject order)
{
_pool.WaitOne();
try
{
Monitor.Enter(BufferCells);
for (int i = 0; i < BufferCells.Length - 1; i++)
{
BufferCells[i] = order;
Console.WriteLine(String.Format("Client {0} Produced {1}", BufferCells[i].Id, BufferCells[i].Id));
}
}
finally
{
Monitor.Exit(BufferCells);
_pool.Release();
}
}
public static OrderObject GetOne()
{
_pool.WaitOne();
OrderObject value = null;
try
{
Monitor.Enter(BufferCells);
for (int i = 0; i < BufferCells.Length - 1; i++)
{
if (BufferCells[i].Id != "-1")
{
value = BufferCells[i];
BufferCells[i] = new OrderObject() { Id = "-1" }; /*Clear Cell*/
Console.WriteLine(String.Format(" Server Consumed {0}", value.Id));
break;
}
}
}
finally
{
Monitor.Exit(BufferCells);
_pool.Release();
}
return value;
}
}
public class Client
{
public int Id {get;set;}
public void Run()
{
/*Produce*/
Buffer.SetOne(Id);
}
}
public class Server
{
public void Run()
{
while(true)
{
Buffer.GetOne();
}
}
}
public class Program
{
public static void Main(string[] args)
{
/*Initialize 2 Open Semaphores*/
int numCells = 2;
Semaphore pool = new Semaphore(numCells, numCells);
/*Initialize BufferCells with Empty OrderObjects*/
List<OrderObject> OrderObjects = new List<OrderObject>();
for (var i = 0; i < numCells; i++)
{
OrderObjects.Add(new OrderObject() { Id = "-1" });
}
Buffer.BufferCells = OrderObjects.ToArray();
/*Initialize Consumer Thread*/
Server server = new Server(pool);
Thread serverThread = new Thread(new ThreadStart(server.Run));
/*Initialize Producer Objects*/
List<Client> clients = new List<Client>();
for (int i = 0; i <= 20; i++)
{
/*Create 5000 Clients*/
Client client = new Client(i.ToString(), pool, new OrderObject() { Id = i.ToString() });
clients.Add(client);
}
/*Start Each Client Thread*/
List<Thread> clientThreads = new List<Thread>();
foreach (var client in clients)
{
Thread t = new Thread(new ThreadStart(client.Run));
clientThreads.Add(t);
}
/*Start Server Thread*/
serverThread.Start();
/*Start Each Producer Thread*/
clientThreads.ForEach(p => p.Start());
/*Start Consumer Thread*/
Console.ReadLine();
}
}
我猜我遇到了以下问题之一:死锁、Livelock或饥饿。由于某些原因,Server无法使用生成并添加到单元格缓冲区的所有订单对象。不确定修复是什么。
好的,让我们分解一下这个代码试图做的事情。
public class Buffer
{
public static OrderObject[] BufferCells;
public static Semaphore _pool { get; set; }
//设置一个
public static void SetOne(OrderObject order)
{
//为什么你这里需要一个信号灯?
_pool.WaitOne();
try
{
//Semaphore是多余的,因为Monitor。Enter是一个限制性更强的锁。
Monitor.Enter(BufferCells);
//嗯?我以为这应该是SetOne?不是SetEverything?我只能假设你的意图是设置其中一个单元格,而让其他单元格可以设置或获取。如果您正试图实现这一点,那么队列似乎是一种更合适的数据结构。更好的是BlockingCollection
也启用了锁定/阻塞机制。
for (int i = 0; i < BufferCells.Length - 1; i++)
{
BufferCells[i] = order;
Console.WriteLine(String.Format("Client {0} Produced {1}", BufferCells[i].Id, BufferCells[i].Id));
}
}
finally
{
Monitor.Exit(BufferCells);
_pool.Release();
}
}
public static OrderObject GetOne()
{
//同样,这个信号量在这里似乎没有太大帮助
_pool.WaitOne();
OrderObject value = null;
try
{
//因为监视器是一个限制性更强的锁定
Monitor.Enter(BufferCells);
for (int i = 0; i < BufferCells.Length - 1; i++)
{
if (BufferCells[i].Id != "-1")
{
value = BufferCells[i];
BufferCells[i] = new OrderObject() { Id = "-1" }; /*Clear Cell*/
Console.WriteLine(String.Format(" Server Consumed {0}", value.Id));
break;
}
}
}
finally
{
Monitor.Exit(BufferCells);
_pool.Release();
}
return value;
}
}
综上所述:
- 此代码使用冗余锁定,这是不必要的
- 这段代码将缓冲区中的所有单元格一次性设置为独占锁,这似乎首先破坏了缓冲区的目的
- 这段代码试图实现的目标似乎已经在
BlockingCollection
中实现了。无需重新发明轮子!:)
祝你好运!
我假设监视器是为了保护对数组的访问,信号量应该进行计数,是吗?
如果是这样,则不应在getOne()的finally部分调用"_pool.Release()",也不应在setOne()顶部调用"_pool.WWaitOne()"。生产者的工作是在信号量推送对象后发出信号,消费者的工作是等待信号量再弹出对象。
Aarrgghh!
说:"使用的方法是首先将信号量初始化为与所用缓冲区大小相同的大小。"
如果您想要一个无边界队列,请将信号量初始化为0。
如果你想要一个有界队列,正如上面的文本所暗示的那样,你需要两个信号量(以及监视器),一个是"I",用于计算队列中的项目,一个"S",用于统计队列中剩余的空间。将I初始化为0,将S初始化为队列大小。在生产者中,等待S,锁定,按下,解锁,信号I。在消费者中,等待I,锁定,弹出,解锁,S。