异步和并行下载文件
本文关键字:文件 下载 并行 异步 | 更新日期: 2023-09-27 18:34:08
编辑
我更改了问题的标题以反映我遇到的问题,但也回答了如何轻松实现这一目标。
我正在尝试使第二种方法返回Task<TResult>
而不是像第一种方法那样Task
,但由于尝试修复它,我遇到了一连串错误。
- 我在
await body(partition.Current);
之前添加了return
- 反过来,它要求我在下面添加一个返回语句,所以我在下面添加了
return null
- 但是现在 select 语句抱怨它无法从查询中推断类型参数
- 我把
Task.Run
改成Task.Run<TResult>
但没有成功。
我该如何解决?
第一种方法来自 http://blogs.msdn.com/b/pfxteam/archive/2012/03/05/10278165.aspx,第二种方法是我尝试创建的重载。
public static class Extensions
{
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate
{
using (partition)
while (partition.MoveNext())
await body(partition.Current);
}));
}
public static Task ForEachAsync<T, TResult>(this IEnumerable<T> source, int dop, Func<T, Task<TResult>> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate
{
using (partition)
while (partition.MoveNext())
await body(partition.Current);
}));
}
}
使用示例:
使用此方法,我想并行和异步下载多个文件:
private async void MainWindow_Loaded(object sender, RoutedEventArgs e)
{
Artist artist = await GetArtist();
IEnumerable<string> enumerable = artist.Reviews.Select(s => s.ImageUrl);
string[] downloadFile = await DownloadFiles(enumerable);
}
public static async Task<string[]> DownloadFiles(IEnumerable<string> enumerable)
{
if (enumerable == null) throw new ArgumentNullException("enumerable");
await enumerable.ForEachAsync(5, s => DownloadFile(s));
// Incomplete, the above statement is void and can't be returned
}
public static async Task<string> DownloadFile(string address)
{
/* Download a file from specified address,
* return destination file name on success or null on failure */
if (address == null)
{
return null;
}
Uri result;
if (!Uri.TryCreate(address, UriKind.Absolute, out result))
{
Debug.WriteLine(string.Format("Couldn't create URI from specified address: {0}", address));
return null;
}
try
{
using (var client = new WebClient())
{
string fileName = Path.GetTempFileName();
await client.DownloadFileTaskAsync(address, fileName);
Debug.WriteLine(string.Format("Downloaded file saved to: {0} ({1})", fileName, address));
return fileName;
}
}
catch (WebException webException)
{
Debug.WriteLine(string.Format("Couldn't download file from specified address: {0}", webException.Message));
return null;
}
}
我解决了它并在此处发布,可能会帮助任何遇到相同问题的人。
我最初需要的是一个小助手,它可以快速下载图像,但如果服务器没有快速响应,也会断开连接,所有这些都是并行和异步的。
此帮助程序将返回一个元组,其中包含远程路径,本地路径和异常(如果发生异常(;非常有用,因为知道为什么错误的下载出错总是好的。我想我没有忘记下载可能发生的任何情况,但欢迎您发表评论。
- 指定要下载的 URL 列表
- 您可以指定保存该文件名的本地文件名,如果没有为您生成一个 (
- 可选(取消下载的持续时间(方便慢速或无法访问的服务器(
您可以只使用DownloadFileTaskAsync
本身,也可以使用 ForEachAsync
帮助程序进行并行和异步下载。
代码带有如何使用它的示例:
private async void MainWindow_Loaded(object sender, RoutedEventArgs e)
{
IEnumerable<string> enumerable = your urls here;
var results = new List<Tuple<string, string, Exception>>();
await enumerable.ForEachAsync(s => DownloadFileTaskAsync(s, null, 1000), (url, t) => results.Add(t));
}
/// <summary>
/// Downloads a file from a specified Internet address.
/// </summary>
/// <param name="remotePath">Internet address of the file to download.</param>
/// <param name="localPath">
/// Local file name where to store the content of the download, if null a temporary file name will
/// be generated.
/// </param>
/// <param name="timeOut">Duration in miliseconds before cancelling the operation.</param>
/// <returns>A tuple containing the remote path, the local path and an exception if one occurred.</returns>
private static async Task<Tuple<string, string, Exception>> DownloadFileTaskAsync(string remotePath,
string localPath = null, int timeOut = 3000)
{
try
{
if (remotePath == null)
{
Debug.WriteLine("DownloadFileTaskAsync (null remote path): skipping");
throw new ArgumentNullException("remotePath");
}
if (localPath == null)
{
Debug.WriteLine(
string.Format(
"DownloadFileTaskAsync (null local path): generating a temporary file name for {0}",
remotePath));
localPath = Path.GetTempFileName();
}
using (var client = new WebClient())
{
TimerCallback timerCallback = c =>
{
var webClient = (WebClient) c;
if (!webClient.IsBusy) return;
webClient.CancelAsync();
Debug.WriteLine(string.Format("DownloadFileTaskAsync (time out due): {0}", remotePath));
};
using (var timer = new Timer(timerCallback, client, timeOut, Timeout.Infinite))
{
await client.DownloadFileTaskAsync(remotePath, localPath);
}
Debug.WriteLine(string.Format("DownloadFileTaskAsync (downloaded): {0}", remotePath));
return new Tuple<string, string, Exception>(remotePath, localPath, null);
}
}
catch (Exception ex)
{
return new Tuple<string, string, Exception>(remotePath, null, ex);
}
}
public static class Extensions
{
public static Task ForEachAsync<TSource, TResult>(
this IEnumerable<TSource> source,
Func<TSource, Task<TResult>> taskSelector, Action<TSource, TResult> resultProcessor)
{
var oneAtATime = new SemaphoreSlim(5, 10);
return Task.WhenAll(
from item in source
select ProcessAsync(item, taskSelector, resultProcessor, oneAtATime));
}
private static async Task ProcessAsync<TSource, TResult>(
TSource item,
Func<TSource, Task<TResult>> taskSelector, Action<TSource, TResult> resultProcessor,
SemaphoreSlim oneAtATime)
{
TResult result = await taskSelector(item);
await oneAtATime.WaitAsync();
try
{
resultProcessor(item, result);
}
finally
{
oneAtATime.Release();
}
}
}
我没有更改ForEachAsync
的签名来选择并行级别,我会让您根据需要进行调整。
输出示例:
DownloadFileTaskAsync (null local path): generating a temporary file name for http://cache.thephoenix.com/secure/uploadedImages/The_Phoenix/Music/CD_Review/main_OTR_Britney480.jpg
DownloadFileTaskAsync (null local path): generating a temporary file name for http://ssimg.soundspike.com/artists/britneyspears_femmefatale_cd.jpg
DownloadFileTaskAsync (null local path): generating a temporary file name for http://a323.yahoofs.com/ymg/albumreviewsuk__1/albumreviewsuk-526650850-1301400550.jpg?ymm_1xEDE5bu0tMi
DownloadFileTaskAsync (null remote path): skipping
DownloadFileTaskAsync (time out due): http://hangout.altsounds.com/geek/gars/images/3/9/8/5/2375.jpg
DownloadFileTaskAsync (time out due): http://www.beat.com.au/sites/default/files/imagecache/630_315sr/images/article/header/2011/april/britney-spears-femme-fatale.jpg
DownloadFileTaskAsync (time out due): http://cache.thephoenix.com/secure/uploadedImages/The_Phoenix/Music/CD_Review/main_OTR_Britney480.jpg
DownloadFileTaskAsync (downloaded): http://newblog.thecmuwebsite.com/wp-content/uploads/2009/12/britneyspears1.jpg
DownloadFileTaskAsync (downloaded): http://newblog.thecmuwebsite.com/wp-content/uploads/2009/12/britneyspears1.jpg
DownloadFileTaskAsync (downloaded): http://static.guim.co.uk/sys-images/Music/Pix/site_furniture/2011/3/22/1300816812640/Femme-Fatale.jpg
DownloadFileTaskAsync (downloaded): http://www.sputnikmusic.com/images/albums/72328.jpg
过去需要 1 分钟的时间现在只需 10 秒即可获得相同的结果:)
非常感谢这 2 篇帖子的作者:
http://blogs.msdn.com/b/pfxteam/archive/2012/03/05/10278165.aspx
http://blogs.msdn.com/b/pfxteam/archive/2012/03/04/10277325.aspx
对于像我这样最近来到这里的人,我刚刚发现在 .NET 6:https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel.foreachasync?view=net-6.0 中实现了Parallel.ForEachAsync
。
它的用法就像你期望的那样,它的优点是允许指定并行度。例如:
var someInputData = new []
{
"someData1",
"someData2",
"someData3"
};
ParallelOptions parallelOptions = new()
{
MaxDegreeOfParallelism = 3
};
await Parallel.ForEachAsync(someInputData, parallelOptions,
async (input, cancellationToken) =>
{
// Some async Func.
// The Func can make use of:
// - the `input` variable, which will contain the element in the `someInputData` list;
// - the `cancellationToken` variable, usable to cancel the async operation.
});
另请参阅 https://www.hanselman.com/blog/parallelforeachasync-in-net-6。