TPL FromAsync任务一直被取消

本文关键字:取消 一直 任务 FromAsync TPL | 更新日期: 2023-09-27 18:27:05

我在一个循环中从客户端向服务器发送10条消息,然后使用TPL的FromAsync(目标为.NET 4.0)从服务器向客户端返回响应

从客户端发送最后一条消息后,我将尝试测量收到最终响应所需的总时间。为此,我调用Task.WaitAll,但我得到了一个AggregateException,它的内部异常为所有任务显示"一个任务被取消"。服务器正在接收请求。

代码:

private static double MeasureAsyncTPL(List<string> stringList)
    {
        var stopwatch = new Stopwatch();
        stopwatch.Start();
        var tasks = new Task[stringList.Count];
        for (int i = 0; i < stringList.Count; i++)
        {
            string source = stringList[i];
            tasks[i] = SendMessageToServerAsync(source);
            //SendMessageToServerAsync(source);
        }
        Task.WaitAll(tasks);

        stopwatch.Stop();            
        return stopwatch.Elapsed.TotalMilliseconds; 
    }
        static Task SendMessageToServerAsync(string message)
    {
        Task task;
        var byteArray = MessageToByteArray(message, Encoding.UTF8);
        using (var tcpClient = new TcpClient())
        {
            tcpClient.Connect("127.0.0.1", 5000);
            using (var networkStream = tcpClient.GetStream())
            {
                task = Write(networkStream,byteArray,0);
                Task continuation = task.ContinueWith(ant => Read(ant, networkStream), TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.AttachedToParent);
                return continuation;
            }
        }
    }
        private static Task Write(NetworkStream stream, byte[] buffer, int offset)
    {
        return Task.Factory.FromAsync(stream.BeginWrite, stream.EndWrite, buffer, offset, buffer.Length, null);
    }
    private static Task Read(Task write, NetworkStream stream)
    {
        byte[] data = new byte[50];
        return Task<int>.Factory.FromAsync(stream.BeginRead, stream.EndRead, data, 0, data.Length, null);
    }

异常是在MeasureSyncTPL内的Task.WaitAll(tasks)行上发现的。如何修复此异常?

第二个问题-我的Read方法中的FromAsync是否保证从服务器返回整个消息?还是我需要多次调用它,同时跟踪服务器发送的字节数组的大小?

TPL FromAsync任务一直被取消

在写入和读取操作开始或有时间工作之前,您将处理networkStreamtcpClient

使用.NET 4.0,你需要做一些像这样复杂的事情(这是一个开始,它需要更多的工作):

var byteArray = MessageToByteArray(message, Encoding.UTF8);
var tcpClient = new TcpClient();
NetworkStream networkStream = null;
try
{
    tcpClient.Connect("127.0.0.1", 5000);
    networkStream = tcpClient.GetStream();
}
catch
{
    ((IDisposable)tcpClient).Dispose();
    throw;
}
byte[] read_buffer = new byte[50];
Action dispose_action = () =>
{
    ((IDisposable)tcpClient).Dispose();
    networkStream.Dispose();
};
var write_task = networkStream.WriteAsync(byteArray, 0, byteArray.Length);
write_task
    .ContinueWith(wt =>
    {
        var read_task = networkStream.ReadAsync(read_buffer, 0, read_buffer.Length);

        read_task.ContinueWith(rt =>
        {
            //See if rt task is successful and if so, process read data here
            dispose_action();
        });
    },
    TaskContinuationOptions.OnlyOnRanToCompletion);
write_task.ContinueWith(wt => dispose_action, TaskContinuationOptions.NotOnRanToCompletion);

上面代码的主要思想是,在完成读/写操作之前,您不会处理TCP客户端和网络流对象。

想象一下,如果你想在某种循环中多次阅读,这段代码会变得多么复杂。

使用.NET 4.5,您可以使用asyncawait来做一些(更简单的)事情,比如:

static async Task SendMessageToServerAsync(string message)
{
    var byteArray = MessageToByteArray(message, Encoding.UTF8);
    using (var tcpClient = new TcpClient())
    {
        tcpClient.Connect("127.0.0.1", 5000);
        using (var networkStream = tcpClient.GetStream())
        {
            await networkStream.WriteAsync(byteArray, 0, byteArray.Length);
            byte[] read_buffer = new byte[50];
            int read = await networkStream.ReadAsync(read_buffer, 0, read_buffer.Length);
            // do something with the read_buffer
        }
    }
}