使用 AWS S3 开发工具包(适用于 .NET)从 Amazon S3 下载并行批处理文件
本文关键字:S3 Amazon 并行 批处理文件 NET 下载 适用于 开发 AWS 开发工具 工具包 | 更新日期: 2023-09-27 18:33:36
问题:我想使用他们的 .NET 开发工具包从 AWS S3 并行下载 100 个文件。下载的内容应该存储在 100 个内存流中(文件足够小,我可以从那里获取(。我在 .NET 4.0 中的 Task、IAsyncResult、Parallel.* 和其他不同方法之间感到困惑。
如果我尝试自己解决问题,我会想象像这样的东西:(编辑以向某些变量添加类型(
using Amazon;
using Amazon.S3;
using Amazon.S3.Model;
AmazonS3 _s3 = ...;
IEnumerable<GetObjectRequest> requestObjects = ...;
// Prepare to launch requests
var asyncRequests = from rq in requestObjects
select _s3.BeginGetObject(rq,null,null);
// Launch requests
var asyncRequestsLaunched = asyncRequests.ToList();
// Prepare to finish requests
var responses = from rq in asyncRequestsLaunched
select _s3.EndGetRequest(rq);
// Finish requests
var actualResponses = responses.ToList();
// Fetch data
var data = actualResponses.Select(rp => {
var ms = new MemoryStream();
rp.ResponseStream.CopyTo(ms);
return ms;
});
此代码并行启动 100 个请求,这很好。但是,存在两个问题:
- 最后一条语句将串行下载文件,而不是并行下载文件。流中似乎没有 BeginCopyTo((/EndCopyTo(( 方法...
- 在所有请求响应之前,前面的语句不会放开。换句话说,在全部文件启动之前,任何文件都不会开始下载。
所以在这里我开始认为我走错了路......
帮助?
如果将操作分解为一个异步处理一个请求然后调用它 100 次的方法,可能会更容易。
首先,让我们确定您想要的最终结果。 由于您将使用的是MemoryStream
这意味着您需要从方法返回Task<MemoryStream>
。 签名将如下所示:
static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3,
GetObjectRequest request)
由于 AmazonS3
对象实现了异步设计模式,因此可以在 TaskFactory
类上使用 FromAsync
方法从实现异步设计模式的类生成Task<T>
,如下所示:
static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3,
GetObjectRequest request)
{
Task<GetObjectResponse> response =
Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
s3.BeginGetObject, s3.EndGetObject, request, null);
// But what goes here?
所以你已经处于一个好地方,你有一个Task<T>
,你可以等待或在通话完成时得到回拨。 但是,您需要以某种方式将从调用Task<GetObjectResponse>
返回的GetObjectResponse
转换为MemoryStream
。
为此,您希望在 Task<T>
类上使用 ContinueWith
方法。 可以将其视为Enumerable
类上 Select
方法的异步版本,它只是对另一个Task<T>
的投影,只是每次调用 ContinueWith
时,您都可能创建一个运行该部分代码的新任务。
这样,您的方法如下所示:
static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3,
GetObjectRequest request)
{
// Start the task of downloading.
Task<GetObjectResponse> response =
Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
s3.BeginGetObject, s3.EndGetObject, request, null
);
// Translate.
Task<MemoryStream> translation = response.ContinueWith(t => {
using (Task<GetObjectResponse> resp = t ){
var ms = new MemoryStream();
t.Result.ResponseStream.CopyTo(ms);
return ms;
}
});
// Return the full task chain.
return translation;
}
请注意,在上面,您可以调用ContinueWith
传递TaskContinuationOptions.ExecuteSynchronously
的重载,因为看起来您正在做最少的工作(我无法判断,响应可能很大(。 如果您正在执行非常小的工作,并且为了完成工作而启动新任务是有害的,则应传递TaskContinuationOptions.ExecuteSynchronously
,以免浪费时间为最少的操作创建新任务。
现在您已经有了可以将一个请求转换为Task<MemoryStream>
的方法,创建一个将处理任意数量的包装器很简单:
static Task<MemoryStream>[] GetMemoryStreamsAsync(AmazonS3 s3,
IEnumerable<GetObjectRequest> requests)
{
// Just call Select on the requests, passing our translation into
// a Task<MemoryStream>.
// Also, materialize here, so that the tasks are "hot" when
// returned.
return requests.Select(r => GetMemoryStreamAsync(s3, r)).
ToArray();
}
在上面,您只需获取GetObjectRequest
实例的序列,它将返回一个Task<MemoryStream>
数组。 它返回物化序列的事实很重要。 如果在返回之前未具体化它,则在循环访问序列之前不会创建任务。
当然,如果你想要这种行为,那么一定要删除对.ToArray()
的调用,让方法返回IEnumerable<Task<MemoryStream>>
,然后在你迭代任务时发出请求。
从那里,您可以一次处理一个(在循环中使用 Task.WaitAny
方法(或等待所有它们完成(通过调用 Task.WaitAll
方法(。 后者的一个例子是:
static IList<MemoryStream> GetMemoryStreams(AmazonS3 s3,
IEnumerable<GetObjectRequest> requests)
{
Task<MemoryStream>[] tasks = GetMemoryStreamsAsync(s3, requests);
Task.WaitAll(tasks);
return tasks.Select(t => t.Result).ToList();
}
另外,应该提到的是,这非常适合反应式扩展框架,因为它非常适合IObservable<T>
实现。
您可以使用 Nexus.Core 包中的 Nexus.Tasks。
var response = await fileNames
.WhenAll(item => GetObject(item, cancellationToken), 10, cancellationToken)
.ConfigureAwait(false);