Rx-如何创建IOobservable<;T>;来自任务<;T>;使得取消预订取消了任务
本文关键字:gt 任务 取消 lt Rx- 何创建 创建 IOobservable | 更新日期: 2023-09-27 18:29:11
我是Rx的新手,请耐心等待。
我想把Task<T>
包装在IObservable<T>
中。到目前为止还不错:
Task<T> task = Task.Factory.StartNew(...);
IObservable<T> obs = task.ToObservable();
现在,我想要的是在观察者取消订阅时发出取消任务的信号:
var cancel = new CancellationToken();
Task<T> task = Task.Factory.StartNew(..., cancel);
IObservable<T> obs = task.ToObservable(); //there should be a way to tie the cancel token
//to the IObservable (?)
IDisposable disposable = obs.Subscribe(...);
Thread.Sleep(1000);
disposable.Dispose(); // this should signal the task to cancel
我该怎么做?
FWIW这是产生这个切线的场景:Rx和任务-当新任务产生时取消运行任务?
以下是我能想到的最简单的方法,使用Observable.Create
:
static IObservable<int> SomeRxWork()
{
return Observable.Create<int>(o =>
{
CancellationTokenSource cts = new CancellationTokenSource();
IDisposable sub = SomeAsyncWork(cts.Token).ToObservable().Subscribe(o);
return new CompositeDisposable(sub, new CancellationDisposable(cts));
});
}
static Task<int> SomeAsyncWork(CancellationToken token);
我在评论中暗示的最初方式实际上相当冗长:
static IObservable<int> SomeRxWork()
{
return Observable.Create<int>(async (o, token) =>
{
try
{
o.OnNext(await SomeAsyncWork(token));
o.OnCompleted();
}
catch (OperationCanceledException)
{
}
catch (Exception ex)
{
o.OnError(ex);
}
});
}
假设您有一个这样的方法:
Task<Gizmo> GetGizmoAsync(int id, CancellationToken cancellationToken);
您可以将其转换为IObservable<Gizmo>
,其中订阅将启动Task<Gizmo>
,取消订阅将使用以下命令取消它。
IObservable<Gizmo> observable = Observable.FromAsync(
cancellationToken => GetGizmoAsync(7, cancellationToken));
// Starts the task:
IDisposable subscription = observable.Subscribe(...);
// Cancels the task if it is still running:
subscription.Dispose();