如何防止任务上的同步延续

本文关键字:同步 延续 何防止 任务 | 更新日期: 2023-09-27 17:51:16

我有一些库(套接字网络)代码,它提供了基于Task的API,用于基于TaskCompletionSource<T>的挂起请求响应。然而,TPL中有一个令人烦恼的地方,似乎无法阻止同步延续。我希望能够做到的是:

  • 告诉TaskCompletionSource<T>不允许调用者附加TaskContinuationOptions.ExecuteSynchronously,或者
  • 设置结果(SetResult/TrySetResult),指定TaskContinuationOptions.ExecuteSynchronously应该被忽略,使用池代替

具体来说,我遇到的问题是传入的数据是由专用的读取器处理的,如果调用者可以附加TaskContinuationOptions.ExecuteSynchronously,他们可以使读取器停止(这影响的不仅仅是他们)。以前,我已经通过一些hack技术来解决这个问题,检测是否存在任何延续,如果存在,则将完成推到ThreadPool上,但是如果调用者的工作队列已经饱和,这将产生重大影响,因为完成将无法及时得到处理。如果它们使用的是Task.Wait()(或类似的),那么它们实际上会自己死锁。同样,这也是为什么reader在一个专用线程上,而不是使用worker。

;在我试着唠叨TPL团队之前:我错过了一个选择吗?

重点:

  • 我不希望外部调用者能够劫持我的线程
  • 我不能使用ThreadPool作为实现,因为它需要在池饱和时工作

下面的示例产生输出(顺序可能因时间而异):

Continuation on: Main thread
Press [return]
Continuation on: Thread pool

问题是一个随机调用者设法在"主线程"上获得延续。在实际代码中,这将中断主阅读器;不好的事情!

代码:

using System;
using System.Threading;
using System.Threading.Tasks;
static class Program
{
    static void Identify()
    {
        var thread = Thread.CurrentThread;
        string name = thread.IsThreadPoolThread
            ? "Thread pool" : thread.Name;
        if (string.IsNullOrEmpty(name))
            name = "#" + thread.ManagedThreadId;
        Console.WriteLine("Continuation on: " + name);
    }
    static void Main()
    {
        Thread.CurrentThread.Name = "Main thread";
        var source = new TaskCompletionSource<int>();
        var task = source.Task;
        task.ContinueWith(delegate {
            Identify();
        });
        task.ContinueWith(delegate {
            Identify();
        }, TaskContinuationOptions.ExecuteSynchronously);
        source.TrySetResult(123);
        Console.WriteLine("Press [return]");
        Console.ReadLine();
    }
}

如何防止任务上的同步延续

新在。net 4.6:

。. NET 4.6包含一个新的TaskCreationOptions: RunContinuationsAsynchronously .


既然你愿意使用反射来访问私有字段…

你可以用TASK_STATE_THREAD_WAS_ABORTED标记TCS的任务,这将导致所有的延续不被内联。

const int TASK_STATE_THREAD_WAS_ABORTED = 134217728;
var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance);
stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);
编辑:

我建议您使用表达式,而不是使用Reflection emit。这样可读性更强,并且具有与pcl兼容的优点:

var taskParameter = Expression.Parameter(typeof (Task));
const string stateFlagsFieldName = "m_stateFlags";
var setter =
    Expression.Lambda<Action<Task>>(
        Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName),
            Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName),
                Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();

不使用反射:

如果有人感兴趣,我已经找到了一种不需要反射的方法,但它也有点"脏",当然也带来了不可忽略的性能损失:

try
{
    Thread.CurrentThread.Abort();
}
catch (ThreadAbortException)
{
    source.TrySetResult(123);
    Thread.ResetAbort();
}

我认为TPL中没有任何东西可以为TaskCompletionSource.SetResult的延续提供显式的 API控制。我决定保留我最初的答案来控制async/await场景中的这种行为。

这是另一种对ContinueWith施加异步的解决方案,如果tcs.SetResult触发的延续发生在调用SetResult的同一个线程上:

public static class TaskExt
{
    static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks =
        new ConcurrentDictionary<Task, Thread>();
    // SetResultAsync
    static public void SetResultAsync<TResult>(
        this TaskCompletionSource<TResult> @this,
        TResult result)
    {
        s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread);
        try
        {
            @this.SetResult(result);
        }
        finally
        {
            Thread thread;
            s_tcsTasks.TryRemove(@this.Task, out thread);
        }
    }
    // ContinueWithAsync, TODO: more overrides
    static public Task ContinueWithAsync<TResult>(
        this Task<TResult> @this,
        Action<Task<TResult>> action,
        TaskContinuationOptions continuationOptions = TaskContinuationOptions.None)
    {
        return @this.ContinueWith((Func<Task<TResult>, Task>)(t =>
        {
            Thread thread = null;
            s_tcsTasks.TryGetValue(t, out thread);
            if (Thread.CurrentThread == thread)
            {
                // same thread which called SetResultAsync, avoid potential deadlocks
                // using thread pool
                return Task.Run(() => action(t));
                // not using thread pool (TaskCreationOptions.LongRunning creates a normal thread)
                // return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning);
            }
            else
            {
                // continue on the same thread
                var task = new Task(() => action(t));
                task.RunSynchronously();
                return Task.FromResult(task);
            }
        }), continuationOptions).Unwrap();
    }
}

更新以解决注释:

我不能控制来电者-我不能让他们使用特定的如果我可以,这个问题就不会存在于首先

我不知道你不能控制来电者。然而,如果您不控制它,您可能也不会将TaskCompletionSource对象直接传递给调用者。逻辑上,您将传递其中的令牌部分,即tcs.Task。在这种情况下,通过在上面的方法中添加另一个扩展方法,解决方案可能会更简单:

// ImposeAsync, TODO: more overrides
static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this)
{
    return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent =>
    {
        Thread thread = null;
        s_tcsTasks.TryGetValue(antecedent, out thread);
        if (Thread.CurrentThread == thread)
        {
            // continue on a pool thread
            return antecedent.ContinueWith(t => t, 
                TaskContinuationOptions.None).Unwrap();
        }
        else
        {
            return antecedent;
        }
    }), TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}

使用:

// library code
var source = new TaskCompletionSource<int>();
var task = source.Task.ImposeAsync();
// ... 
// client code
task.ContinueWith(delegate
{
    Identify();
}, TaskContinuationOptions.ExecuteSynchronously);
// ...
// library code
source.SetResultAsync(123);

这实际上适用于awaitContinueWith (fiddle),并且没有反射hack。

不做

var task = source.Task;

你可以这样做

var task = source.Task.ContinueWith<Int32>( x => x.Result );

因此,您总是添加一个将异步执行的延续,然后订阅者是否需要在相同上下文中进行延续并不重要。这是在敷衍任务,不是吗?

模拟中止方法看起来非常好,但是在某些情况下会导致TPL劫持线程。

然后我有了一个类似于检查延续对象的实现,但只是检查任何延续,因为实际上有太多的场景可以让给定的代码正常工作,但这意味着即使像Task.Wait这样的事情也会导致线程池查找。

最终,在检查了大量的IL之后,唯一的安全和有用的场景是SetOnInvokeMres场景(手动复位-事件-微小延续)。还有很多其他的场景:

  • 一些不安全,并导致线程劫持
  • 其余的都没有用,因为它们最终会导致线程池

所以最后,我选择检查一个非空的continuation对象;如果为空,则fine(不继续);如果非空,则检查SetOnInvokeMres的特殊情况-如果是:fine(可以安全调用);否则,让线程池执行TrySetComplete,而不告诉任务做任何特殊的事情,比如欺骗中止。Task.Wait使用SetOnInvokeMres的方法,这是我们想尝试真的很难不死锁的具体场景。

Type taskType = typeof(Task);
FieldInfo continuationField = taskType.GetField("m_continuationObject", BindingFlags.Instance | BindingFlags.NonPublic);
Type safeScenario = taskType.GetNestedType("SetOnInvokeMres", BindingFlags.NonPublic);
if (continuationField != null && continuationField.FieldType == typeof(object) && safeScenario != null)
{
    var method = new DynamicMethod("IsSyncSafe", typeof(bool), new[] { typeof(Task) }, typeof(Task), true);
    var il = method.GetILGenerator();
    var hasContinuation = il.DefineLabel();
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    Label nonNull = il.DefineLabel(), goodReturn = il.DefineLabel();
    // check if null
    il.Emit(OpCodes.Brtrue_S, nonNull);
    il.MarkLabel(goodReturn);
    il.Emit(OpCodes.Ldc_I4_1);
    il.Emit(OpCodes.Ret);
    // check if is a SetOnInvokeMres - if so, we're OK
    il.MarkLabel(nonNull);
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    il.Emit(OpCodes.Isinst, safeScenario);
    il.Emit(OpCodes.Brtrue_S, goodReturn);
    il.Emit(OpCodes.Ldc_I4_0);
    il.Emit(OpCodes.Ret);
    IsSyncSafe = (Func<Task, bool>)method.CreateDelegate(typeof(Func<Task, bool>));

如果你可以并且准备好使用反射,这应该可以做到;

public static class MakeItAsync
{
    static public void TrySetAsync<T>(this TaskCompletionSource<T> source, T result)
    {
        var continuation = typeof(Task).GetField("m_continuationObject", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
        var continuations = (List<object>)continuation.GetValue(source.Task);
        foreach (object c in continuations)
        {
            var option = c.GetType().GetField("m_options", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
            var options = (TaskContinuationOptions)option.GetValue(c);
            options &= ~TaskContinuationOptions.ExecuteSynchronously;
            option.SetValue(c, options);
        }
        source.TrySetResult(result);
    }        
}

更新,我发布了一个单独的答案来处理ContinueWith而不是await(因为ContinueWith不关心当前的同步上下文)。

您可以使用一个哑同步上下文对通过调用TaskCompletionSource上的SetResult/SetCancelled/SetException触发的延续施加异步。我相信当前的同步上下文(在await tcs.Task点)是TPL用来决定是否使这种延续同步或异步的标准。

以下内容适合我:

if (notifyAsync)
{
    tcs.SetResultAsync(null);
}
else
{
    tcs.SetResult(null);
}

SetResultAsync实现如下:

public static class TaskExt
{
    static public void SetResultAsync<T>(this TaskCompletionSource<T> tcs, T result)
    {
        FakeSynchronizationContext.Execute(() => tcs.SetResult(result));
    }
    // FakeSynchronizationContext
    class FakeSynchronizationContext : SynchronizationContext
    {
        private static readonly ThreadLocal<FakeSynchronizationContext> s_context =
            new ThreadLocal<FakeSynchronizationContext>(() => new FakeSynchronizationContext());
        private FakeSynchronizationContext() { }
        public static FakeSynchronizationContext Instance { get { return s_context.Value; } }
        public static void Execute(Action action)
        {
            var savedContext = SynchronizationContext.Current;
            SynchronizationContext.SetSynchronizationContext(FakeSynchronizationContext.Instance);
            try
            {
                action();
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(savedContext);
            }
        }
        // SynchronizationContext methods
        public override SynchronizationContext CreateCopy()
        {
            return this;
        }
        public override void OperationStarted()
        {
            throw new NotImplementedException("OperationStarted");
        }
        public override void OperationCompleted()
        {
            throw new NotImplementedException("OperationCompleted");
        }
        public override void Post(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Post");
        }
        public override void Send(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Send");
        }
    }
}

SynchronizationContext.SetSynchronizationContext在增加开销方面非常便宜。实际上,WPF Dispatcher.BeginInvoke的实现采用了非常相似的方法。

TPL比较await点和tcs.SetResult点的目标同步上下文。如果同步上下文相同(或者两个地方都没有同步上下文),则直接以同步方式调用延续。否则,它将在目标同步上下文中使用SynchronizationContext.Post进行排队,即正常的await行为。这种方法所做的是总是强制SynchronizationContext.Post行为(或者如果没有目标同步上下文,则强制执行池线程延续)。

更新了,这对task.ContinueWith不起作用,因为ContinueWith不关心当前的同步上下文。然而,它适用于await task(小提琴)。它也适用于await task.ConfigureAwait(false)

OTOH,此方法适用于ContinueWith .