并行任务vs委托

本文关键字:委托 vs 并行任务 | 更新日期: 2023-09-27 17:53:39

我现在需要决定如何构建我的异步代码。我需要从函数Func1()中同时调用20个不同的web服务,当它们都返回xml答案时,将所有结果连接到一个大xml。

我考虑过使用TPL任务。像这样:

var task = Task.Factory.StartNew (call the web service1...);
var task2 = Task.Factory.StartNew (call the web service2...);
var task3 = Task.Factory.StartNew (call the web service3...);
task.WaitAll();
这听起来不错还是有更好的方法来完成这项工作?

并行任务vs委托

几个月前我们需要这样的东西来并发处理多个远程URL。我们通过从SemaphoreSlim类派生我们自己的类来实现这一点。

你可以这样实现:

/// <summary>
    /// Can be used to process multiple URL's concurrently.
    /// </summary>
    public class ConcurrentUrlProcessor : SemaphoreSlim
    {
        private int initialCount;
        private int maxCount;
        private readonly HttpClient httpClient;
        /// <summary>
        /// Initializes a new instance of the <see cref="T:System.Threading.SemaphoreSlim" /> class, specifying the initial number of requests that can be granted concurrently.
        /// </summary>
        /// <param name="initialCount">The initial number of requests for the semaphore that can be granted concurrently.</param>
        public ConcurrentUrlProcessor(int initialCount)
            :base(initialCount)
        {
            this.initialCount = initialCount;
            this.maxCount = int.MaxValue;
            this.httpClient = new HttpClient();
        }
        /// <summary>
        /// Initializes a new instance of the <see cref="T:System.Threading.SemaphoreSlim" /> class, specifying the initial and maximum number of requests that can be granted concurrently.
        /// </summary>
        /// <param name="initialCount">The initial number of requests for the semaphore that can be granted concurrently.</param>
        /// <param name="maxCount">The maximum number of requests for the semaphore that can be granted concurrently.</param>
        public ConcurrentUrlProcessor(int initialCount, int maxCount)
            : base(initialCount, maxCount) 
        {
            this.initialCount = initialCount;
            this.maxCount = maxCount;
            this.httpClient = new HttpClient();
        }
        /// <summary>
        /// Starts the processing.
        /// </summary>
        /// <param name="urls">The urls.</param>
        /// <returns>Task{IEnumerable{XDocument}}.</returns>
        public virtual async Task<IEnumerable<XDocument>> StartProcessing(params string[] urls)
        {
            List<Task> tasks = new List<Task>();
            List<XDocument> documents = new List<XDocument>();
            SemaphoreSlim throttler = new SemaphoreSlim(initialCount, maxCount);
            foreach (string url in urls)
            {
                await throttler.WaitAsync();
                tasks.Add(Task.Run(async () =>
                {
                    try
                    {
                        string xml = await this.httpClient.GetStringAsync(url);
                        //move on to the next page if no xml is returned.
                        if (string.IsNullOrWhiteSpace(xml))
                            return;
                        var document = XDocument.Parse(xml);
                        documents.Add(document);
                    }
                    catch (Exception)
                    {
                        //TODO: log the error or do something with it.
                    }
                    finally
                    {
                        throttler.Release();
                    }
                }));
            }
            await Task.WhenAll(tasks);
            return documents;
        }
    }

和相应的单元测试:

    [Test]
    public async void CanProcessMultipleUrlsTest()
    {
        string[] urls = new string[] 
        {
            "http://google.nl",
            "http://facebook.com",
            "http://linkedin.com",
            "http://twitter.com" 
        };
        IEnumerable<XDocument> documents = null;
        ConcurrentUrlProcessor processor = new ConcurrentUrlProcessor(100);
        documents = await processor.StartProcessing(urls);
        Assert.AreEqual(4, documents.Count());
    }

我想到了两个方法。

。您现在使用的方法是使用本回答和本文中描述的延续ContinueWith/ContinueWhenAll。因此,对于您的情况,您可以使用使用子任务的单个延续,因此

TaskCreationoptions op = TaskCreationOptions.AttachedToParent;
Task.Factory.StartNew(() => 
{
    var task1 = Task.Factory.StartNew (CallService(1));
    var task2 = Task.Factory.StartNew (CallService(2));
    var task3 = Task.Factory.StartNew (CallService(3));
})
.ContinueWith(ant => { SomeOtherselegate });

或者您可以像这里解释的那样链接这些延续。

另一种方法是使用ContinueWhenAll

var task1 = Task.Factory.StartNew (CallService(1));
var task2 = Task.Factory.StartNew (CallService(2));
var task3 = Task.Factory.StartNew (CallService(3));
var continuation = Task.Factory.ContinueWhenAll(
    new[] { task1, task2, task3 }, tasks => Console.WriteLine("Done!"));

这里唯一要考虑的是你可以有一个可变数量的任务,但这很简单,我会让你自己解决这个问题。

另一种方法是使用。net4.5 +和async/await。你的代码应该是这样的

private async void CallAllServicesAsync()
{
    await CallServiceAsync(1);
    await CallServiceAsync(2);
    await CallServiceAsync(3);
}

,

private Task CallServiceAsync(int serviceNumber)
{
    return Task.Run(() = > { SomeMethod(); });
}

上面的代码相当于第一个代码,但是框架会在底层为你处理所有的事情。

我只能想到两种主要的方法:

  1. 有一些地方,你会聚集在一起,一旦收到。这可以通过ContinueWith()方法来完成。但是,您需要在聚合代码中处理同步,并且最终仍然需要等待,直到所有任务都完成。因此,只有当聚合需要很长时间并且可以并行完成时,这种方法才有意义。

  2. 你做的方式-很好很简单:)

至于使用async,我会投票使用TaskFactory.ContinueWhenAll()方法。这样您就不会阻塞任何线程,代码将比有多个等待(取决于品味)看起来更好,并且可能具有更少的开销(可能取决于实现)。

使用async的正确方法是首先定义一个自然异步的CallServiceAsync(即使用HttpClient或者是Begin/End方法周围的TaskFactory.FromAsync包装器)。不应该使用Task.Run

一旦你有了一个自然异步的CallServiceAsync,那么你可以同时发出20个调用,并(异步地)等待它们:

public async Task<XDocument> GetAllDataAsync()
{
  var task1 = CallServiceAsync(1);
  var task2 = CallServiceAsync(2);
  var task3 = CallServiceAsync(3);
  ...
  XDocument[] results = await task.WhenAll(task1, task2, task3, ...);
  return JoinXmlDocuments(results);
}

这种方法根本不会阻塞任何线程。

您可以通过使用ConfigureAwait(false)使其性能稍微提高:

XDocument[] results = await task.WhenAll(task1, task2, task3, ...).ConfigureAwait(false);