用有限的订阅者并发订阅可观察集合的简单方法
本文关键字:观察 集合 方法 简单 并发 | 更新日期: 2023-09-27 18:05:25
我一直在尝试使用Rx和可观察集合实现一个简单的生产者-消费者模式。我还需要能够节流订户的数量很容易。我在并行扩展中看到了很多对LimitedConcurrencyLevelTaskScheduler的引用,但我似乎无法使用多个线程。
我想我在做一些愚蠢的事情,所以我希望有人能解释一下。在下面的单元测试中,我希望使用多(2)个线程来使用阻塞集合中的字符串。我做错了什么?
[TestClass]
public class LimitedConcurrencyLevelTaskSchedulerTestscs
{
private ConcurrentBag<string> _testStrings = new ConcurrentBag<string>();
ConcurrentBag<int> _threadIds= new ConcurrentBag<int>();
[TestMethod]
public void WhenConsumingFromBlockingCollection_GivenLimitOfTwoThreads_TwoThreadsAreUsed()
{
// Setup the command queue for processing combinations
var commandQueue = new BlockingCollection<string>();
var taskFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(2));
var scheduler = new TaskPoolScheduler(taskFactory);
commandQueue.GetConsumingEnumerable()
.ToObservable(scheduler)
.Subscribe(Go, ex => { throw ex; });
var iterationCount = 100;
for (int i = 0; i < iterationCount; i++)
{
commandQueue.Add(string.Format("string {0}", i));
}
commandQueue.CompleteAdding();
while (!commandQueue.IsCompleted)
{
Thread.Sleep(100);
}
Assert.AreEqual(iterationCount, _testStrings.Count);
Assert.AreEqual(2, _threadIds.Distinct().Count());
}
private void Go(string testString)
{
_testStrings.Add(testString);
_threadIds.Add(Thread.CurrentThread.ManagedThreadId);
}
}
每个人似乎都经历了相同的Rx学习曲线。需要理解的是,Rx不执行并行处理,除非您显式地执行强制并行的查询。调度程序不引入并行性。
Rx有一个行为契约,表示0或多个值是连续产生的(不管可能使用多少线程),一个接一个,没有重叠,最后是一个可选的单个错误或单个完整消息,然后没有其他。
通常写成OnNext*(OnError|OnCompleted)
。
调度程序所做的就是定义一个规则,如果调度程序没有处理当前可观察对象的待挂值,则决定在上处理哪个线程的新值。
现在输入你的代码:
var taskFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(2));
var scheduler = new TaskPoolScheduler(taskFactory);
表示调度程序将在两个线程中的一个上运行订阅的值。但这并不意味着它会为所创造的每一个价值都这样做。请记住,由于值是连续地、一个接一个地产生的,因此最好重用现有线程,而不是花费高昂的成本创建一个新线程。因此,Rx所做的是重用现有线程,如果在当前值完成处理之前在调度程序上调度了一个新值。
这是键-如果在现有值的处理完成之前调度了一个新值,它会重用线程。
所以你的代码是这样做的:
commandQueue.GetConsumingEnumerable()
.ToObservable(scheduler)
.Subscribe(Go, ex => { throw ex; });
这意味着调度程序只会在第一个值出现时创建线程。但是,当昂贵的线程创建操作完成时,向commandQueue
添加值的代码也完成了,因此它已经将所有值排队,因此它可以更有效地使用单个线程,而不是创建昂贵的第二个线程。
为了避免这种情况,您需要构造查询来引入并行性。
方法如下:
public void WhenConsumingFromBlockingCollection_GivenLimitOfTwoThreads_TwoThreadsAreUsed()
{
var taskFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(2));
var scheduler = new TaskPoolScheduler(taskFactory);
var iterationCount = 100;
Observable
.Range(0, iterationCount)
.SelectMany(n => Observable.Start(() => n.ToString(), scheduler)
.Do(x => Go(x)))
.Wait();
(iterationCount == _testStrings.Count).Dump();
(2 == _threadIds.Distinct().Count()).Dump();
}
现在,我已经使用Do(...)
/.Wait()
组合来为您提供相当于阻塞.Subscribe(...)
方法的等效方法。
结果是你的断言都返回true。
我发现,通过如下方式修改订阅,我可以添加5个订阅者,但只有两个线程将处理集合的内容,因此这满足了我的目的。
for(int i = 0; i < 5; i++)
observable.Subscribe(Go, ex => { throw ex; });
我很想知道是否有更好或更优雅的方法来实现这一点!