如何使用后台加载器将未知数量的字节流式传输到服务器
本文关键字:字节流 传输 服务器 未知数 后台 何使用 加载 | 更新日期: 2023-09-27 17:49:31
>我有一个繁重的IO/CPU转换'过程',从文件类型A转换为B.目前我将其写到存储文件,然后在转换完成后使用BackgroundUploader上传。
但是,我想在输出文件仍在生成时更快地开始流式传输。此外,我什至不一定想创建输出存储文件,而是"随时随地"上传。
请注意,我不知道转换过程中输出文件的最终大小,它可能比源更小或更大。
首先,我尝试在写入接收器输出 StorageFile 时简单地打开它,并将该流传递给 BackgroundUploader,但这会导致"竞争条件",当它到达写入 StorageFile 的字节末尾时(当它赶上转换工作时(,上传终止。
除了流式传输到存储文件,我还可以将输出字节写入缓冲区,例如 2KiB。我想在转换写入后上传此缓冲区。
//simplified code...
uint bytesRead = 0;
byte[] buffer = new byte[buff_sz];
bytesRead = converter.Read(buffer);
while(bytesRead > 0)
{
// here I would like to 'upload' the data (maybe using BackgroundUploader? or some other API?)
bytesRead = converter.Read(buffer);
}
我不确定如何在不遇到与以前相同的"竞争条件"的情况下做到这一点。如何保持后台上传程序运行,直到我完成将新字节放入缓冲区?
注意 1:最后一次迭代将是 bytesRead <buff_sz,所以我也需要以某种方式传达这一点,以免在最后上传垃圾字节。>
注 2:转换代码位于跨平台C++共享库中。
谢谢!
补足的
根据Nate Diamond的建议,我研究了IINputStream界面。下面的代码有效,虽然可能是你能写的最差的性能版本,但足以证明这个概念。
从这里开始,遵循基于"maxim pg"实现的包装器代码。如何在 C# 中实现 IRandomAccessStream?
public IAsyncOperationWithProgress<IBuffer, UInt32> ReadAsync(
IBuffer buffer,// The buffer into which the asynchronous read operation places the bytes that are read.
uint count,// The number of bytes to read that is less than or equal to the Capacity value.
InputStreamOptions options) // Specifies the type of the asynchronous read operation.
{
if (buffer == null) throw new ArgumentNullException("buffer");
Func<CancellationToken, IProgress<uint>, Task<IBuffer>> taskProvider =
(token, progress) => ReadBytesAsync(buffer, count, token, progress, options);
return AsyncInfo.Run(taskProvider);
}
private Task<IBuffer> ReadBytesAsync(
IBuffer buffer,
uint count,
CancellationToken token,
IProgress<uint> progress,
InputStreamOptions options)
{
TaskCompletionSource<IBuffer> cts = new TaskCompletionSource<IBuffer>();
try
{
var ignore = ThreadPool.RunAsync((handler) => {
_buffer = new byte[count];
uint bytesRead = _reader.Read(_buffer); // this triggers file conversion work
buffer.Length = bytesRead; // this is important apparently, otherwise no data written!
Stream stream = buffer.AsStream();
stream.Write(_buffer, 0, (int)bytesRead);
stream.Flush();
cts.TrySetResult(buffer);
});
}
catch(Exception e)
{
cts.SetException(e);
}
return cts.Task;
}
最简单的方法是创建自定义IInputStream
。这是一个只有一种方法的接口,ReadAsync
.
然后,您所要做的就是创建您的CustomInputStream
,当它有足够的内容来填充请求的块时,它会不断允许输入并从ReadAsync
返回。它还具有一个Close
功能,可以告诉它已完成。
其他人已经在IRandomAccessStream
成功地实施了类似的东西。