TPL数据流块在WCF双工中

本文关键字:WCF 数据流 TPL | 更新日期: 2023-09-27 18:05:10

我是SO的新手,请原谅我。

我有一个具有双工服务契约的WCF服务。这个服务契约有一个操作联系,该联系假定要进行长时间的数据处理。我不得不限制并发数据处理的数量,比如说最多3个。我的问题是,在数据处理之后,我需要返回到相同的服务实例上下文,因此我调用传递数据处理结果的启动器端点。我需要提到,由于各种原因,我被限制为TPL数据流和WCF双工。

这是我到目前为止写的一个演示

在控制台库中我模拟WCF调用

class Program
{
    static void Main(string[] args)
    {
        // simulate service calls
        Enumerable.Range(0, 5).ToList().ForEach(x =>
        {
            new System.Threading.Thread(new ThreadStart(async () =>
            {
                var service = new Service();
                await service.Inc(x);
            })).Start();
        });
    }
}

这里假定是WCF服务

// service contract
public class Service
{
    static TransformBlock<Message<int>, Message<int>> transformBlock;
    static Service()
    {
        transformBlock = new TransformBlock<Message<int>, Message<int>>(x => Inc(x), new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 3
        });
    }
    static Message<int> Inc(Message<int> input)
    {
        System.Threading.Thread.Sleep(100);
        return new Message<int> { Token = input.Token, Data = input.Data + 1 };
    }
    // operation contract
    public async Task Inc(int id)
    {
        var token = Guid.NewGuid().ToString();
        transformBlock.Post(new Message<int> { Token = token, Data = id });
        while (await transformBlock.OutputAvailableAsync())
        {
            Message<int> message;
            if (transformBlock.TryReceive(m => m.Token == token, out message))
            {
                // do further processing using initiator service instance members
                // something like Callback.IncResult(m.Data);
                break;
            }
        }
    }
}
public class Message<T>
{
    public string Token { get; set; }
    public T Data { get; set; }
}

操作契约并不一定是异步的,但是我需要OutputAvailableAsync通知。

这是一个好方法,还是有一个更好的解决方案为我的场景?

TPL数据流块在WCF双工中

首先,我认为您不应该这样使用标记。唯一标识符在进程间通信时非常有用。但是当你在一个单独的进程中,只使用引用相等。

实际上回答你的问题,我认为繁忙循环不是一个好主意。

异步节流的一个更简单的解决方案是使用SemaphoreSlim。比如:

static readonly SemaphoreSlim Semaphore = new SemaphoreSlim(3);
// operation contract
public async Task Inc(int id)
{
    await Semaphore.WaitAsync();
    try
    {
        Thread.Sleep(100);
        var result = id + 1;
        // do further processing using initiator service instance members
        // something like Callback.IncResult(result);
    }
    finally
    {
        Semaphore.Release();
    }
}

如果你真的想(或者必须)使用数据流,你可以使用TaskCompletionSource在操作和块之间进行同步。操作方法将等待TaskCompletionSourceTask,并且块将在完成对该消息的计算时设置它:

private static readonly ActionBlock<Message<int>> Block =
    new ActionBlock<Message<int>>(
        x => Inc(x),
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 3
        });
static void Inc(Message<int> input)
{
    Thread.Sleep(100);
    input.TCS.SetResult(input.Data + 1);
}
// operation contract
public async Task Inc(int id)
{
    var tcs = new TaskCompletionSource<int>();
    Block.Post(new Message<int> { TCS = tcs, Data = id });
    int result = await tcs.Task;
    // do further processing using initiator service instance members
    // something like Callback.IncResult(result);
}