使用 async/await 调用 WCF 服务的模式

本文关键字:服务 模式 WCF 调用 async await 使用 | 更新日期: 2023-09-27 18:28:10

我用基于任务的操作生成了一个代理。

应该如何使用 async/await 正确调用此服务(之后释放ServiceClientOperationContext

我的第一次尝试是:

public async Task<HomeInfo> GetHomeInfoAsync(DateTime timestamp)
{
    using (var helper = new ServiceHelper<ServiceClient, ServiceContract>())
    {
        return await helper.Proxy.GetHomeInfoAsync(timestamp);
    }
}

ServiceHelper创建ServiceClientOperationContextScope并在之后处置它们的类:

try
{
    if (_operationContextScope != null)
    {
        _operationContextScope.Dispose();
    }
    if (_serviceClient != null)
    {
        if (_serviceClient.State != CommunicationState.Faulted)
        {
            _serviceClient.Close();
        }
        else
        {
            _serviceClient.Abort();
        }
    }
}
catch (CommunicationException)
{
    _serviceClient.Abort();
}
catch (TimeoutException)
{
    _serviceClient.Abort();
}
catch (Exception)
{
    _serviceClient.Abort();
    throw;
}
finally
{
    _operationContextScope = null;
    _serviceClient = null;
}

但是,当同时调用两个服务时,这惨遭失败,并出现以下错误:"此 OperationContextScope 正在与创建时不同的线程上释放。

MSDN 说:

不要在 OperationContextScope 块中使用异步"await"模式。当延续发生时,它可能在不同的线程上运行,并且 OperationContextScope 是特定于线程的。如果需要为异步调用"await",请在 OperationContextScope 块之外使用它。

所以这就是问题所在!但是,我们如何正确修复它?

这家伙按照MSDN所说的做了:

private async void DoStuffWithDoc(string docId)
{
   var doc = await GetDocumentAsync(docId);
   if (doc.YadaYada)
   {
        // more code here
   }
}
public Task<Document> GetDocumentAsync(string docId)
{
  var docClient = CreateDocumentServiceClient();
  using (new OperationContextScope(docClient.InnerChannel))
  {
    return docClient.GetDocumentAsync(docId);
  }
}

我对他的代码的问题是,他从不在ServiceClient上调用Close(或Abort(。

我还找到了一种使用自定义SynchronizationContext传播OperationContextScope的方法。但是,除了它有很多"风险"代码之外,他指出:

值得注意的是,它确实有一些关于操作上下文范围的处置的小问题(因为它们只允许您在调用线程上处置它们(,但这似乎不是问题,因为(至少根据反汇编(,它们实现了 Dispose(( 而不是 Finalize((。

那么,我们在这里不走运吗?是否有一种经过验证的模式来调用 WCF 服务,使用 async/await AND 释放ServiceClientOperationContextScope?也许Microsoft中的某个人(也许是大师斯蒂芬·图布:)(可以提供帮助。

谢谢!

[更新]

在用户Noseratio的大力帮助下,我想出了一些有用的方法:不要使用OperationContextScope。如果出于上述任何原因使用它,请尝试找到适合你的方案的解决方法。否则,如果你真的,真的,需要OperationContextScope,你将不得不想出一个捕获它的SynchronizationContext的实现,这似乎非常困难(如果可能的话 - 一定有一个原因为什么这不是默认行为(。

因此,完整的工作代码是:

public async Task<HomeInfo> GetHomeInfoAsync(DateTime timestamp)
{
    using (var helper = new ServiceHelper<ServiceClient, ServiceContract>())
    {
        return await helper.Proxy.GetHomeInfoAsync(timestamp);
    }
}

ServiceHelper是:

public class ServiceHelper<TServiceClient, TService> : IDisposable
    where TServiceClient : ClientBase<TService>, new()
    where TService : class
{
protected bool _isInitialized;
    protected TServiceClient _serviceClient;
    public TServiceClient Proxy
    {
        get
        {
            if (!_isInitialized)
            {
                Initialize();
                _isInitialized = true;
            }
            else if (_serviceClient == null)
            {
                throw new ObjectDisposedException("ServiceHelper");
            }
            return _serviceClient;
        }
    }
    protected virtual void Initialize()
    {
        _serviceClient = new TServiceClient();
    }
    // Implement IDisposable.
    // Do not make this method virtual.
    // A derived class should not be able to override this method.
    public void Dispose()
    {
        Dispose(true);
        // Take yourself off the Finalization queue 
        // to prevent finalization code for this object
        // from executing a second time.
        GC.SuppressFinalize(this);
    }
    // Dispose(bool disposing) executes in two distinct scenarios.
    // If disposing equals true, the method has been called directly
    // or indirectly by a user's code. Managed and unmanaged resources
    // can be disposed.
    // If disposing equals false, the method has been called by the 
    // runtime from inside the finalizer and you should not reference 
    // other objects. Only unmanaged resources can be disposed.
    protected virtual void Dispose(bool disposing)
    {
        // If disposing equals true, dispose all managed 
        // and unmanaged resources.
        if (disposing)
        {
            try
            {
                if (_serviceClient != null)
                {
                    if (_serviceClient.State != CommunicationState.Faulted)
                    {
                        _serviceClient.Close();
                    }
                    else
                    {
                        _serviceClient.Abort();
                    }
                }
            }
            catch (CommunicationException)
            {
                _serviceClient.Abort();
            }
            catch (TimeoutException)
            {
                _serviceClient.Abort();
            }
            catch (Exception)
            {
                _serviceClient.Abort();
                throw;
            }
            finally
            {
                _serviceClient = null;
            }
        }
    }
}

请注意,该类支持扩展;也许您需要继承并提供凭据。

唯一可能的"陷阱"是 在 GetHomeInfoAsync 中,您不能只返回从代理获得的Task(这看起来很自然,为什么要在已经拥有新Task时创建新(。好吧,在这种情况下,您需要await代理Task然后关闭(或中止(ServiceClient,否则您将在调用服务后立即关闭它(同时通过网络发送字节(!

好的,我们有办法让它工作,但正如Noseratio所说,从权威来源得到答案会很好。

使用 async/await 调用 WCF 服务的模式

我认为一个可行的解决方案可能是使用自定义等待器通过OperationContext.Current来流动新的操作上下文。OperationContext本身的实现似乎不需要线程相关性。这是模式:

async Task TestAsync()
{
    using(var client = new WcfAPM.ServiceClient())
    using (var scope = new FlowingOperationContextScope(client.InnerChannel))
    {
        await client.SomeMethodAsync(1).ContinueOnScope(scope);
        await client.AnotherMethodAsync(2).ContinueOnScope(scope);
    }
}

以下是FlowingOperationContextScopeContinueOnScope的实现(仅略有测试(:

public sealed class FlowingOperationContextScope : IDisposable
{
    bool _inflight = false;
    bool _disposed;
    OperationContext _thisContext = null;
    OperationContext _originalContext = null;
    public FlowingOperationContextScope(IContextChannel channel):
        this(new OperationContext(channel))
    {
    }
    public FlowingOperationContextScope(OperationContext context)
    {
        _originalContext = OperationContext.Current;
        OperationContext.Current = _thisContext = context;
    }
    public void Dispose()
    {
        if (!_disposed)
        {
            if (_inflight || OperationContext.Current != _thisContext)
                throw new InvalidOperationException();
            _disposed = true;
            OperationContext.Current = _originalContext;
            _thisContext = null;
            _originalContext = null;
        }
    }
    internal void BeforeAwait()
    {
        if (_inflight)
            return;
        _inflight = true;
        // leave _thisContext as the current context
   }
    internal void AfterAwait()
    {
        if (!_inflight)
            throw new InvalidOperationException();
        _inflight = false;
        // ignore the current context, restore _thisContext
        OperationContext.Current = _thisContext;
    }
}
// ContinueOnScope extension
public static class TaskExt
{
    public static SimpleAwaiter<TResult> ContinueOnScope<TResult>(this Task<TResult> @this, FlowingOperationContextScope scope)
    {
        return new SimpleAwaiter<TResult>(@this, scope.BeforeAwait, scope.AfterAwait);
    }
    // awaiter
    public class SimpleAwaiter<TResult> :
        System.Runtime.CompilerServices.INotifyCompletion
    {
        readonly Task<TResult> _task;
        readonly Action _beforeAwait;
        readonly Action _afterAwait;
        public SimpleAwaiter(Task<TResult> task, Action beforeAwait, Action afterAwait)
        {
            _task = task;
            _beforeAwait = beforeAwait;
            _afterAwait = afterAwait;
        }
        public SimpleAwaiter<TResult> GetAwaiter()
        {
            return this;
        }
        public bool IsCompleted
        {
            get 
            {
                // don't do anything if the task completed synchronously
                // (we're on the same thread)
                if (_task.IsCompleted)
                    return true;
                _beforeAwait();
                return false;
            }
        }
        public TResult GetResult()
        {
            return _task.Result;
        }
        // INotifyCompletion
        public void OnCompleted(Action continuation)
        {
            _task.ContinueWith(task =>
            {
                _afterAwait();
                continuation();
            },
            CancellationToken.None,
            TaskContinuationOptions.ExecuteSynchronously,
            SynchronizationContext.Current != null ?
                TaskScheduler.FromCurrentSynchronizationContext() :
                TaskScheduler.Current);
        }
    }
}

简单的方法是将 await 移到使用块之外

public Task<Document> GetDocumentAsync(string docId)
{
    var docClient = CreateDocumentServiceClient();
    using (new OperationContextScope(docClient.InnerChannel))
    {
        var task = docClient.GetDocumentAsync(docId);
    }
    return await task;
}

决定编写自己的代码来帮助解决这个问题,以防万一这对任何人有帮助。 与上面的 SimpleAwaiter 实现相比,似乎出错(不可预见的比赛等(少了一点,但你是评判者:

public static class WithOperationContextTaskExtensions
{
    public static ContinueOnOperationContextAwaiter<TResult> WithOperationContext<TResult>(this Task<TResult> @this, bool configureAwait = true)
    {
        return new ContinueOnOperationContextAwaiter<TResult>(@this, configureAwait);
    }
    public static ContinueOnOperationContextAwaiter WithOperationContext(this Task @this, bool configureAwait = true)
    {
        return new ContinueOnOperationContextAwaiter(@this, configureAwait);
    }
    public class ContinueOnOperationContextAwaiter : INotifyCompletion
    {
        private readonly ConfiguredTaskAwaitable.ConfiguredTaskAwaiter _awaiter;
        private OperationContext _operationContext;
        public ContinueOnOperationContextAwaiter(Task task, bool continueOnCapturedContext = true)
        {
            if (task == null) throw new ArgumentNullException("task");
            _awaiter = task.ConfigureAwait(continueOnCapturedContext).GetAwaiter();
        }
        public ContinueOnOperationContextAwaiter GetAwaiter() { return this; }
        public bool IsCompleted { get { return _awaiter.IsCompleted; } }
        public void OnCompleted(Action continuation)
        {
            _operationContext = OperationContext.Current;
            _awaiter.OnCompleted(continuation);
        }
        public void GetResult()
        {
            OperationContext.Current = _operationContext;
            _awaiter.GetResult();
        }
    }
    public class ContinueOnOperationContextAwaiter<TResult> : INotifyCompletion
    {
        private readonly ConfiguredTaskAwaitable<TResult>.ConfiguredTaskAwaiter _awaiter;
        private OperationContext _operationContext;
        public ContinueOnOperationContextAwaiter(Task<TResult> task, bool continueOnCapturedContext = true)
        {
            if (task == null) throw new ArgumentNullException("task");
            _awaiter = task.ConfigureAwait(continueOnCapturedContext).GetAwaiter();
        }
        public ContinueOnOperationContextAwaiter<TResult> GetAwaiter() { return this; }
        public bool IsCompleted { get { return _awaiter.IsCompleted; } }
        public void OnCompleted(Action continuation)
        {
            _operationContext = OperationContext.Current;
            _awaiter.OnCompleted(continuation);
        }
        public TResult GetResult()
        {
            OperationContext.Current = _operationContext;
            return _awaiter.GetResult();
        }
    }
}

用法(有点手动和嵌套未经测试...

    /// <summary>
    /// Make a call to the service
    /// </summary>
    /// <param name="action"></param>
    /// <param name="endpoint"> </param>
    public async Task<ResultCallWrapper<TResult>> CallAsync<TResult>(Func<T, Task<TResult>> action, EndpointAddress endpoint)
    {
        using (ChannelLifetime<T> channelLifetime = new ChannelLifetime<T>(ConstructChannel(endpoint)))
        {
            // OperationContextScope doesn't work with async/await
            var oldContext = OperationContext.Current;
            OperationContext.Current = new OperationContext((IContextChannel)channelLifetime.Channel);
            var result = await action(channelLifetime.Channel)
                .WithOperationContext(configureAwait: false);
            HttpResponseMessageProperty incomingMessageProperty = (HttpResponseMessageProperty)OperationContext.Current.IncomingMessageProperties[HttpResponseMessageProperty.Name];
            string[] keys = incomingMessageProperty.Headers.AllKeys;
            var headersOrig = keys.ToDictionary(t => t, t => incomingMessageProperty.Headers[t]);
            OperationContext.Current = oldContext;
            return new ResultCallWrapper<TResult>(result, new ReadOnlyDictionary<string, string>(headersOrig));
        }
    }

.Net 4.6.2 支持异步流。

我们有一个在.Net 4.6上运行的 ASP.Net WebApi应用程序,我们在其中使用了可接受的答案。 TaskScheduler.FromCurrentSynchronizationContext()当前同步上下文AspNetSynchronizationContext时导致死锁问题。

我相信延续任务在实际任务

之后排队,导致实际任务正在等待延续,而延续任务必须运行才能完成实际任务。 即任务都在等待对方。

因此,我通过使用继续任务更改为使用任务等待器来解决此问题。请参阅: https://blogs.msdn.microsoft.com/lucian/2012/12/11/how-to-write-a-custom-awaiter/

这个问题已经有一段时间了,但我会用我自己的自制解决方案来补充。

如果一个人不介意不做OperationContextScope,可以考虑以下几点:

扩展方法

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Security;
using System.Text;
using System.Threading.Tasks;
namespace Intexx.ServiceModel
{
    public static class WcfExtensions
    {
        [DebuggerStepThrough]
        public static void Call<TChannel>(this TChannel Client, Action<TChannel> Method) where TChannel : ICommunicationObject
        {
            try
            {
                Method.Invoke(Client);
            }
            finally
            {
                Cleanup(Client);
            }
        }
        [DebuggerStepThrough]
        public static TResult Call<TChannel, TResult>(this TChannel Client, Func<TChannel, TResult> Method) where TChannel : ICommunicationObject
        {
            try
            {
                return Method.Invoke(Client);
            }
            finally
            {
                Cleanup(Client);
            }
        }
        [DebuggerStepThrough]
        public async static Task CallAsync<TChannel>(this TChannel Client, Func<TChannel, Task> Method) where TChannel : ICommunicationObject
        {
            try
            {
                await Method.Invoke(Client);
            }
            finally
            {
                Cleanup(Client);
            }
        }
        [DebuggerStepThrough]
        public async static Task<TResult> CallAsync<TChannel, TResult>(this TChannel Client, Func<TChannel, Task<TResult>> Method) where TChannel : ICommunicationObject
        {
            try
            {
                return await Method.Invoke(Client);
            }
            finally
            {
                Cleanup(Client);
            }
        }
        private static void Cleanup<TChannel>(TChannel Client) where TChannel : ICommunicationObject
        {
            try
            {
                if (Client.IsNotNull)
                {
                    if (Client.State == CommunicationState.Faulted)
                        Client.Abort();
                    else
                        Client.Close();
                }
            }
            catch (Exception ex)
            {
                Client.Abort();
                if (!ex is CommunicationException && !ex is TimeoutException)
                    throw new Exception(ex.Message, ex);
            }
            finally
            {
                Client = null;
            }
        }
    }
}

客户端类

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Security;
using System.Text;
using System.Threading.Tasks;
namespace Reader
{
    public class Client
    {
        public static CemReaderClient Create()
        {
            Tuple<Channels.Binding, EndpointAddress, double> oService;
            try
            {
                oService = Main.Services(typeof(ICemReader));
                return new CemReaderClient(oService.Item1, oService.Item2);
            }
            catch (KeyNotFoundException ex)
            {
                return null;
            }
        }
    }
}

用法(在 VB 中,因为代码不会转换(

Using oReader As Reader.CemReaderClient = Reader.Client.Create
  If oReader.IsNotNothing Then
    Dim lIsReading = Await oReader.CallAsync(Function(Reader As Reader.CemReaderClient)
                                               Me.ConfigFilePath = If(Me.ConfigFilePath, Reader.GetConfigFilePath)
                                               Me.BackupDrive = If(Me.BackupDrive, Reader.GetBackupDrive)
                                               Me.SerialPort = If(Me.SerialPort, Reader.GetSerialPort)
                                               Me.LogFolder = If(Me.LogFolder, Reader.GetLogFolder)
                                               Return Reader.GetIsReadingAsync
                                             End Function)
  End If
End Using

我已经在客户端大约 15 次调用/秒的频率负载下在生产环境中可靠地运行了它(这与串行处理允许的速度一样快(。不过,这是在单个线程上 - 这还没有经过严格的线程安全性测试。扬子晚报.

就我而言,我决定将扩展方法滚动到它们自己的专用 NuGet 包中。事实证明,整个结构非常方便。

当然,如果最终需要OperationContextScope,就必须重新评估这一点。

Client类中具有Tuple的位用于服务发现支持。如果有人也想看到该代码,请大声喊叫,我会更新我的答案。

我有点困惑,我找到了这个 博客:WCF 中基于任务的异步操作

这是一个异步 wcf 通信:

[ServiceContract]
public interface IMessage
{
    [OperationContract]
    Task<string> GetMessages(string msg);
}
public class MessageService : IMessage
{
   async Task<string> IMessage.GetMessages(string msg)
   {
      var task = Task.Factory.StartNew(() =>
                                     {
                                         Thread.Sleep(10000);
                                         return "Return from Server : " + msg;
                                     });
     return await task.ConfigureAwait(false);
   }
}

客户:

var client = new Proxy("BasicHttpBinding_IMessage");
       var task = Task.Factory.StartNew(() => client.GetMessages("Hello"));
       var str = await task;

那么这也是一个好方法吗?

我遇到了同样的问题,但是我突然意识到我根本不需要使用 async/await。

由于您没有对结果进行后处理,因此无需等待回复。 如果您确实需要处理结果,只需使用旧方式的 TPL 延续。

public Task<MyDomainModel> GetHomeInfoAsync(DateTime timestamp)
{
    using (var helper = new ServiceHelper<ServiceClient, ServiceContract>())
    {
        return helper.Proxy.GetHomeInfoAsync(timestamp).ContinueWith(antecedent=>processReplay(antecedent.Result));
    }
}

我不知道这是否有帮助,但是在我搜索回答同一问题时看到这个问题后,我遇到了这个问题。

由此开始,我认为您的代码应该如下所示:

public async Task<HomeInfo> GetHomeInfoAsync(DateTime timestamp)
{
    using (var client = CreateDocumentServiceClient())
    {
        await client.BeginGetHomeInfoAsync(timestamp);
    }
}

我意识到我的回答来得很晚:P但它可能会帮助其他人。