可观察到的.Create: CancellationToken不会转换为IsCancellationRequested
本文关键字:转换 IsCancellationRequested CancellationToken 观察 Create | 更新日期: 2023-09-27 18:17:00
使用这个小脚本(在LINQPad中设计,但应该在任何地方运行):
void Main()
{
Task.Run(() => Worker()).Wait();
}
async Task Worker()
{
if (SynchronizationContext.Current != null)
throw new InvalidOperationException("Don't want any synchronization!");
BaseClass provider = new Implementation();
Func<IObserver<TimeSpan>, CancellationToken, Task> subscribeAsync =
provider.CreateValues;
var observable = Observable.Create(subscribeAsync);
var cancellation = new CancellationTokenSource(5500).Token; // gets cancelled after 5.5s
cancellation.Register(() => Console.WriteLine("token is cancelled now"));
await observable
.Do(ts =>
{
Console.WriteLine("Elapsed: {0}; cancelled: {1}",
ts,
cancellation.IsCancellationRequested);
cancellation.ThrowIfCancellationRequested();
})
.ToTask(cancellation)
.ConfigureAwait(false);
}
abstract class BaseClass
{
// allow implementers to use async-await
public abstract Task CreateValues(IObserver<TimeSpan> observer, CancellationToken cancellation);
}
class Implementation : BaseClass
{
// creates Values for 10s; entirely CPU-bound: no way for async-await hence return Task.CompletedTask
public override Task CreateValues(IObserver<TimeSpan> observer, CancellationToken cancellation)
{
try
{
var sw = Stopwatch.StartNew();
for (int i = 0; i < 10; i++)
{
for (int j = 0; j < 3; j++)
{
Console.WriteLine("{0}/{1} cancelled:{2}", i, j, cancellation.IsCancellationRequested);
Thread.Sleep(333);
}
if (cancellation.IsCancellationRequested) // !! never gets true !!
throw new ApplicationException("token is cancelled");
observer.OnNext(sw.Elapsed);
}
return Task.CompletedTask;
}
catch (Exception ex)
{
Console.WriteLine(ex);
throw;
}
}
}
方法Implementation.CreateValues
保持运行整整10秒,而不是在5.5秒后停止。Observable.Create
传入的CancellationToken
甚至不会过渡到取消状态(原始令牌当然会)!
是bug吗?是我做错了什么吗?
输出是:
0/0 cancelled:False
0/1 cancelled:False
0/2 cancelled:False
Elapsed: 00:00:01.0205951; cancelled: False
1/0 cancelled:False
1/1 cancelled:False
1/2 cancelled:False
Elapsed: 00:00:02.0253279; cancelled: False
2/0 cancelled:False
2/1 cancelled:False
2/2 cancelled:False
Elapsed: 00:00:03.0274035; cancelled: False
3/0 cancelled:False
3/1 cancelled:False
3/2 cancelled:False
Elapsed: 00:00:04.0294796; cancelled: False
4/0 cancelled:False
4/1 cancelled:False
4/2 cancelled:False
Elapsed: 00:00:05.0315332; cancelled: False
5/0 cancelled:False
5/1 cancelled:False
token is cancelled now
5/2 cancelled:False
Elapsed: 00:00:06.0335601; cancelled: True
6/0 cancelled:False
6/1 cancelled:False
6/2 cancelled:False
Elapsed: 00:00:07.0436211; cancelled: True
7/0 cancelled:False
7/1 cancelled:False
7/2 cancelled:False
Elapsed: 00:00:08.0457921; cancelled: True
8/0 cancelled:False
8/1 cancelled:False
8/2 cancelled:False
Elapsed: 00:00:09.0477509; cancelled: True
9/0 cancelled:False
9/1 cancelled:False
9/2 cancelled:False
Elapsed: 00:00:10.0498751; cancelled: True
[AggregateException] at Main/Task.Wait()
传递给subscribeAsync
函数的取消令牌是由Observable.Create
调用实例化的,而不是您正在实例化的取消令牌。
根据Observable.Create
过载摘要:
从指定的可取消对象创建一个可观察序列异步订阅方法。CancellationToken传递给异步订阅方法绑定到返回的一次性对象订阅,允许尽最大努力取消。
简而言之,取消令牌将在您处置订阅时被取消,而不是在指定的延迟之后。
你应该能够像下面这样重构你的代码,使它工作:
Observable.Create(observer => subscribeAsync(observer, cancellation));
希望能有所帮助。
这不是对问题的真正回答,而是使用System. threading . tasks . dataflow代替System. threading . tasks . dataflow重写示例代码。响应式(作为评论发布的代码太多了):
这有几个优点:
- 由于
observer
参数现在是Task
,每个实现都有await
的东西。 - 先前在
Do()
中的处理代码(现在在ActionBlock
中)可以自己异步实现,如果需要的话。如果需要, - 集成缓冲。
- 它是解耦=技术不可知论:我的接口是
Func<TimeSpan, Task<bool>>
,所以没有依赖于Rx或TPL-Dataflow或其他什么。
void Main()
{
Task.Run(() => Worker()).Wait();
Console.WriteLine("DONE");
}
async Task Worker()
{
if (SynchronizationContext.Current != null)
throw new InvalidOperationException("Don't want any synchronization!");
var cancellation = new CancellationTokenSource(55000).Token; // gets cancelled after 5.5s
cancellation.Register(() => Console.WriteLine("token is cancelled now"));
var flow = new ActionBlock<TimeSpan>(
async ts =>
{
Console.WriteLine("[START] Elapsed: {0}; cancelled: {1}", ts, cancellation.IsCancellationRequested);
await Task.Delay(2500).ConfigureAwait(false); // processing takes more time than items need to produce
Console.WriteLine("[STOP] Elapsed: {0}; cancelled: {1}", ts, cancellation.IsCancellationRequested);
},
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 2, // Buffer 1 item ahead
EnsureOrdered = true,
CancellationToken = cancellation,
});
Func<TimeSpan, Task<bool>> observer = ts => flow.SendAsync(ts, cancellation);
BaseClass provider = new Implementation();
await provider.CreateValues(observer, cancellation).ConfigureAwait(false);
Console.WriteLine("provider.CreateValues done");
flow.Complete();
await flow.Completion.ConfigureAwait(false);
Console.WriteLine("flow completed");
}
abstract class BaseClass
{
// allow implementers to use async-await
public abstract Task CreateValues(Func<TimeSpan, Task<bool>> observer, CancellationToken cancellation);
}
class Implementation : BaseClass
{
public override async Task CreateValues(Func<TimeSpan, Task<bool>> observer, CancellationToken cancellation)
{
try
{
var sw = Stopwatch.StartNew();
for (int i = 0; i < 10; i++)
{
for (int j = 0; j < 3; j++)
{
Console.WriteLine("{0}/{1} cancelled:{2}", i, j, cancellation.IsCancellationRequested);
Thread.Sleep(333);
}
if (cancellation.IsCancellationRequested)
throw new ApplicationException("token is cancelled");
var value = sw.Elapsed;
var queued = await observer(value); // use of "observer" encorces async-await even if there is nothing else async
Console.WriteLine("[{0}] '{1}' @ {2}", queued ? "enqueued" : "skipped", value, sw.Elapsed);
if (!queued)
; // Dispose item
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
throw;
}
}
}