如何将实现生产者消费者管道的类公开给API中的客户端代码

本文关键字:API 代码 客户端 实现 生产者 消费者 管道 | 更新日期: 2023-09-27 17:51:05

我对如何将实现生产者消费者管道的类公开给客户端代码有点困惑。

假设我有两个类分别代表生产者和消费者,如下所示:

public class Consumer
{
  ...
  public void Cosume();
  ...
}

制片人:

public class Producer
{
  ...
  public void Produce();
  ...
}

有第三类协调生产者和消费者(顺便说一下,问题来了)

public class ProducerConsumer
{
  ...
  private Producer producer;
  private Consumer consumer;
  ...
  public void Start()
  {
     ...
  }
}

如何实现start?

我想调用producer.Produce()和consumer.Consume()封装在Task中。运行并等待它们完成,如下所示:

public async Task Start()
{
     await Task.WhenAll(Task.Run(() => producer.Produce(), Task.Run(() => consumer.Consume());
}

但是我读到在实现中使用Task.Run()不是一个很好的实践,所以我放弃了它。

我还想到了Parallel.Invoke(),并将将Parallel.Invoke()的阻塞等待留给另一个线程的责任留给客户机代码,如下所示:

public void Start()
{
      Parallel.Invoke(() => producer.Produce(), () => consumer.Consume());
}

客户端代码会这样做:

public async void ButtonHandler(object sender, RoutedEventArgs e)
{
   await Task.Run(() => producerConsumer.Start());
}

但是卸载到另一个线程只是为了等待另外两个线程完成对我来说有点奇怪,因为我浪费了一个线程只是为了等待一些东西。

在阅读了StackOverflow上的其他问题后,许多人建议将如何调用并行代码的责任留给调用代码,因此我认为从ProducerConsumer类中暴露Producer和Consumer,并让客户端代码以它想要的方式调用Producer. produce()和Consumer. consume(),或多或少像:

public async void ButtonHandler(object sender, RoutedEventArgs e)
{
   Task producerTask = Task.Run(() => producerConsumer.Producer.Produce());
   Task consumerTask = Task.Run(() => producerConsumer.Consumer.Consumer());
   await Task.WhenAll(producerTask, consumerTask);
}

但是最后一个实现对我来说看起来很尴尬,因为调用者代码负责调用Produce()和Consume(),如果省略了两个方法中的一个,可能会导致错误。

我知道TPL。DataFlow来实现生产者和消费者管道,但我不能添加外部依赖到库。

那么我应该如何编写Start()方法呢?更一般地说:我应该如何将本质上是并行的代码暴露给库客户端代码?

如何将实现生产者消费者管道的类公开给API中的客户端代码

如果你有一个async兼容的生产者/消费者队列(例如,来自TPL Dataflow的BufferBlock<T>),并且你的生产者是节流的(例如,队列有一个合理的最大元素数量,或者生产者的数据来自一些I/O操作),那么你可以让你的生产者和消费者async,并直接像这样调用它们:

public async Task ExecuteAsync()
{
  await Task.WhenAll(producer.ProduceAsync(), consumer.ConsumeAsync())
      .ConfigureAwait(false);
}

否则,你将不得不在某处"给予"。如果你的生产者和消费者是同步的,那么你的包含类——根据定义——控制两个线程。在这种情况下,可以使用Task.Run:

public async Task ExecuteAsync()
{
  await Task.WhenAll(Task.Run(() => producer.Produce()), Task.Run(() => consumer.Consume()))
      .ConfigureAwait(false);
}