如何防止任务上的同步延续
本文关键字:同步 延续 何防止 任务 | 更新日期: 2023-09-27 17:51:16
我有一些库(套接字网络)代码,它提供了基于Task
的API,用于基于TaskCompletionSource<T>
的挂起请求响应。然而,TPL中有一个令人烦恼的地方,似乎无法阻止同步延续。我希望能够做到的是:
- 告诉
TaskCompletionSource<T>
不允许调用者附加TaskContinuationOptions.ExecuteSynchronously
,或者 - 设置结果(
SetResult
/TrySetResult
),指定TaskContinuationOptions.ExecuteSynchronously
应该被忽略,使用池代替
具体来说,我遇到的问题是传入的数据是由专用的读取器处理的,如果调用者可以附加TaskContinuationOptions.ExecuteSynchronously
,他们可以使读取器停止(这影响的不仅仅是他们)。以前,我已经通过一些hack技术来解决这个问题,检测是否存在任何延续,如果存在,则将完成推到ThreadPool
上,但是如果调用者的工作队列已经饱和,这将产生重大影响,因为完成将无法及时得到处理。如果它们使用的是Task.Wait()
(或类似的),那么它们实际上会自己死锁。同样,这也是为什么reader在一个专用线程上,而不是使用worker。
;在我试着唠叨TPL团队之前:我错过了一个选择吗?
重点:
- 我不希望外部调用者能够劫持我的线程
- 我不能使用
ThreadPool
作为实现,因为它需要在池饱和时工作
下面的示例产生输出(顺序可能因时间而异):
Continuation on: Main thread
Press [return]
Continuation on: Thread pool
问题是一个随机调用者设法在"主线程"上获得延续。在实际代码中,这将中断主阅读器;不好的事情!
代码:using System;
using System.Threading;
using System.Threading.Tasks;
static class Program
{
static void Identify()
{
var thread = Thread.CurrentThread;
string name = thread.IsThreadPoolThread
? "Thread pool" : thread.Name;
if (string.IsNullOrEmpty(name))
name = "#" + thread.ManagedThreadId;
Console.WriteLine("Continuation on: " + name);
}
static void Main()
{
Thread.CurrentThread.Name = "Main thread";
var source = new TaskCompletionSource<int>();
var task = source.Task;
task.ContinueWith(delegate {
Identify();
});
task.ContinueWith(delegate {
Identify();
}, TaskContinuationOptions.ExecuteSynchronously);
source.TrySetResult(123);
Console.WriteLine("Press [return]");
Console.ReadLine();
}
}
新在。net 4.6:
。. NET 4.6包含一个新的TaskCreationOptions
: RunContinuationsAsynchronously
.
既然你愿意使用反射来访问私有字段…
你可以用TASK_STATE_THREAD_WAS_ABORTED
标记TCS的任务,这将导致所有的延续不被内联。
const int TASK_STATE_THREAD_WAS_ABORTED = 134217728;
var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance);
stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);
编辑:我建议您使用表达式,而不是使用Reflection emit。这样可读性更强,并且具有与pcl兼容的优点:
var taskParameter = Expression.Parameter(typeof (Task));
const string stateFlagsFieldName = "m_stateFlags";
var setter =
Expression.Lambda<Action<Task>>(
Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName),
Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName),
Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();
不使用反射:
如果有人感兴趣,我已经找到了一种不需要反射的方法,但它也有点"脏",当然也带来了不可忽略的性能损失:
try
{
Thread.CurrentThread.Abort();
}
catch (ThreadAbortException)
{
source.TrySetResult(123);
Thread.ResetAbort();
}
我认为TPL中没有任何东西可以为TaskCompletionSource.SetResult
的延续提供显式的 API控制。我决定保留我最初的答案来控制async/await
场景中的这种行为。
这是另一种对ContinueWith
施加异步的解决方案,如果tcs.SetResult
触发的延续发生在调用SetResult
的同一个线程上:
public static class TaskExt
{
static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks =
new ConcurrentDictionary<Task, Thread>();
// SetResultAsync
static public void SetResultAsync<TResult>(
this TaskCompletionSource<TResult> @this,
TResult result)
{
s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread);
try
{
@this.SetResult(result);
}
finally
{
Thread thread;
s_tcsTasks.TryRemove(@this.Task, out thread);
}
}
// ContinueWithAsync, TODO: more overrides
static public Task ContinueWithAsync<TResult>(
this Task<TResult> @this,
Action<Task<TResult>> action,
TaskContinuationOptions continuationOptions = TaskContinuationOptions.None)
{
return @this.ContinueWith((Func<Task<TResult>, Task>)(t =>
{
Thread thread = null;
s_tcsTasks.TryGetValue(t, out thread);
if (Thread.CurrentThread == thread)
{
// same thread which called SetResultAsync, avoid potential deadlocks
// using thread pool
return Task.Run(() => action(t));
// not using thread pool (TaskCreationOptions.LongRunning creates a normal thread)
// return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning);
}
else
{
// continue on the same thread
var task = new Task(() => action(t));
task.RunSynchronously();
return Task.FromResult(task);
}
}), continuationOptions).Unwrap();
}
}
更新以解决注释:
我不能控制来电者-我不能让他们使用特定的如果我可以,这个问题就不会存在于首先
我不知道你不能控制来电者。然而,如果您不控制它,您可能也不会将TaskCompletionSource
对象直接传递给调用者。逻辑上,您将传递其中的令牌部分,即tcs.Task
。在这种情况下,通过在上面的方法中添加另一个扩展方法,解决方案可能会更简单:
// ImposeAsync, TODO: more overrides
static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this)
{
return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent =>
{
Thread thread = null;
s_tcsTasks.TryGetValue(antecedent, out thread);
if (Thread.CurrentThread == thread)
{
// continue on a pool thread
return antecedent.ContinueWith(t => t,
TaskContinuationOptions.None).Unwrap();
}
else
{
return antecedent;
}
}), TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}
使用:
// library code
var source = new TaskCompletionSource<int>();
var task = source.Task.ImposeAsync();
// ...
// client code
task.ContinueWith(delegate
{
Identify();
}, TaskContinuationOptions.ExecuteSynchronously);
// ...
// library code
source.SetResultAsync(123);
这实际上适用于await
和ContinueWith
(fiddle),并且没有反射hack。
不做
var task = source.Task;
你可以这样做
var task = source.Task.ContinueWith<Int32>( x => x.Result );
因此,您总是添加一个将异步执行的延续,然后订阅者是否需要在相同上下文中进行延续并不重要。这是在敷衍任务,不是吗?
模拟中止方法看起来非常好,但是在某些情况下会导致TPL劫持线程。
然后我有了一个类似于检查延续对象的实现,但只是检查任何延续,因为实际上有太多的场景可以让给定的代码正常工作,但这意味着即使像Task.Wait
这样的事情也会导致线程池查找。
最终,在检查了大量的IL之后,唯一的安全和有用的场景是SetOnInvokeMres
场景(手动复位-事件-微小延续)。还有很多其他的场景:
- 一些不安全,并导致线程劫持
- 其余的都没有用,因为它们最终会导致线程池
所以最后,我选择检查一个非空的continuation对象;如果为空,则fine(不继续);如果非空,则检查SetOnInvokeMres
的特殊情况-如果是:fine(可以安全调用);否则,让线程池执行TrySetComplete
,而不告诉任务做任何特殊的事情,比如欺骗中止。Task.Wait
使用SetOnInvokeMres
的方法,这是我们想尝试真的很难不死锁的具体场景。
Type taskType = typeof(Task);
FieldInfo continuationField = taskType.GetField("m_continuationObject", BindingFlags.Instance | BindingFlags.NonPublic);
Type safeScenario = taskType.GetNestedType("SetOnInvokeMres", BindingFlags.NonPublic);
if (continuationField != null && continuationField.FieldType == typeof(object) && safeScenario != null)
{
var method = new DynamicMethod("IsSyncSafe", typeof(bool), new[] { typeof(Task) }, typeof(Task), true);
var il = method.GetILGenerator();
var hasContinuation = il.DefineLabel();
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Ldfld, continuationField);
Label nonNull = il.DefineLabel(), goodReturn = il.DefineLabel();
// check if null
il.Emit(OpCodes.Brtrue_S, nonNull);
il.MarkLabel(goodReturn);
il.Emit(OpCodes.Ldc_I4_1);
il.Emit(OpCodes.Ret);
// check if is a SetOnInvokeMres - if so, we're OK
il.MarkLabel(nonNull);
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Ldfld, continuationField);
il.Emit(OpCodes.Isinst, safeScenario);
il.Emit(OpCodes.Brtrue_S, goodReturn);
il.Emit(OpCodes.Ldc_I4_0);
il.Emit(OpCodes.Ret);
IsSyncSafe = (Func<Task, bool>)method.CreateDelegate(typeof(Func<Task, bool>));
如果你可以并且准备好使用反射,这应该可以做到;
public static class MakeItAsync
{
static public void TrySetAsync<T>(this TaskCompletionSource<T> source, T result)
{
var continuation = typeof(Task).GetField("m_continuationObject", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
var continuations = (List<object>)continuation.GetValue(source.Task);
foreach (object c in continuations)
{
var option = c.GetType().GetField("m_options", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
var options = (TaskContinuationOptions)option.GetValue(c);
options &= ~TaskContinuationOptions.ExecuteSynchronously;
option.SetValue(c, options);
}
source.TrySetResult(result);
}
}
更新,我发布了一个单独的答案来处理ContinueWith
而不是await
(因为ContinueWith
不关心当前的同步上下文)。
您可以使用一个哑同步上下文对通过调用TaskCompletionSource
上的SetResult/SetCancelled/SetException
触发的延续施加异步。我相信当前的同步上下文(在await tcs.Task
点)是TPL用来决定是否使这种延续同步或异步的标准。
以下内容适合我:
if (notifyAsync)
{
tcs.SetResultAsync(null);
}
else
{
tcs.SetResult(null);
}
SetResultAsync
实现如下:
public static class TaskExt
{
static public void SetResultAsync<T>(this TaskCompletionSource<T> tcs, T result)
{
FakeSynchronizationContext.Execute(() => tcs.SetResult(result));
}
// FakeSynchronizationContext
class FakeSynchronizationContext : SynchronizationContext
{
private static readonly ThreadLocal<FakeSynchronizationContext> s_context =
new ThreadLocal<FakeSynchronizationContext>(() => new FakeSynchronizationContext());
private FakeSynchronizationContext() { }
public static FakeSynchronizationContext Instance { get { return s_context.Value; } }
public static void Execute(Action action)
{
var savedContext = SynchronizationContext.Current;
SynchronizationContext.SetSynchronizationContext(FakeSynchronizationContext.Instance);
try
{
action();
}
finally
{
SynchronizationContext.SetSynchronizationContext(savedContext);
}
}
// SynchronizationContext methods
public override SynchronizationContext CreateCopy()
{
return this;
}
public override void OperationStarted()
{
throw new NotImplementedException("OperationStarted");
}
public override void OperationCompleted()
{
throw new NotImplementedException("OperationCompleted");
}
public override void Post(SendOrPostCallback d, object state)
{
throw new NotImplementedException("Post");
}
public override void Send(SendOrPostCallback d, object state)
{
throw new NotImplementedException("Send");
}
}
}
SynchronizationContext.SetSynchronizationContext
在增加开销方面非常便宜。实际上,WPF Dispatcher.BeginInvoke
的实现采用了非常相似的方法。
TPL比较await
点和tcs.SetResult
点的目标同步上下文。如果同步上下文相同(或者两个地方都没有同步上下文),则直接以同步方式调用延续。否则,它将在目标同步上下文中使用SynchronizationContext.Post
进行排队,即正常的await
行为。这种方法所做的是总是强制SynchronizationContext.Post
行为(或者如果没有目标同步上下文,则强制执行池线程延续)。
更新了,这对task.ContinueWith
不起作用,因为ContinueWith
不关心当前的同步上下文。然而,它适用于await task
(小提琴)。它也适用于await task.ConfigureAwait(false)
。
OTOH,此方法适用于ContinueWith
.