流式HTTP,GZIP由StreamReader缓冲
本文关键字:StreamReader 缓冲 GZIP HTTP 流式 | 更新日期: 2023-09-27 17:58:18
努力寻找遇到类似问题或类似问题的人。
我目前正在http(json)上使用一个有GZip要求的流,从发送数据到reader.ReadLine()
读取数据都有延迟。有人建议我,这可能与解码将数据保留在缓冲区有关?
这就是我目前所拥有的,除了延迟之外,它运行良好。
HttpWebRequest request = (HttpWebRequest)WebRequest.Create(endPoint);
request.Method = "GET";
request.PreAuthenticate = true;
request.Credentials = new NetworkCredential(username, password);
request.AutomaticDecompression = DecompressionMethods.GZip;
request.ContentType = "application/json";
request.Accept = "application/json";
request.Timeout = 30;
request.BeginGetResponse(AsyncCallback, request);
然后在AsyncCallback方法中,我有:
HttpWebRequest request = result.AsyncState as HttpWebRequest;
using (HttpWebResponse response = (HttpWebResponse)request.EndGetResponse(result))
using (Stream stream = response.GetResponseStream())
using (StreamReader reader = new StreamReader(stream, Encoding.UTF8))
{
while (!reader.EndOfStream)
{
string line = reader.ReadLine();
if (string.IsNullOrWhiteSpace(line)) continue;
Console.WriteLine(line);
}
}
它只是停留在reader.Readline()
上,直到接收到更多的数据,然后甚至保留一些数据。还收到了保持活动的换行符,当它决定读取某个内容时,这些换行符通常会同时被读取。
我测试了与curl命令并行运行的流,curl命令接收和解压缩数据非常好。
任何见解都会很棒。谢谢,
Dan
编辑在streamreader上使用缓冲区大小时运气不佳。
new StreamReader(stream, Encoding.UTF8, true, 1)
编辑也没有运气更新到.NET 4.5和使用
request.AllowReadStreamBuffering = false;
更新:在较高的卷速率下,这似乎会在很长一段时间内出现问题,并且应该只在缓冲区影响应用程序功能的小卷上使用。从那以后,我又换回了StreamReader
。
所以这就是我最终想到的。这是有效的,没有延误。这不会通过自动GZip解压缩得到缓冲。
using (HttpWebResponse response = (HttpWebResponse)request.EndGetResponse(result))
using (Stream stream = response.GetResponseStream())
using (MemoryStream memory = new MemoryStream())
using (GZipStream gzip = new GZipStream(memory, CompressionMode.Decompress))
{
byte[] compressedBuffer = new byte[8192];
byte[] uncompressedBuffer = new byte[8192];
List<byte> output = new List<byte>();
while (stream.CanRead)
{
int readCount = stream.Read(compressedBuffer, 0, compressedBuffer.Length);
memory.Write(compressedBuffer.Take(readCount).ToArray(), 0, readCount);
memory.Position = 0;
int uncompressedLength = gzip.Read(uncompressedBuffer, 0, uncompressedBuffer.Length);
output.AddRange(uncompressedBuffer.Take(uncompressedLength));
if (!output.Contains(0x0A)) continue;
byte[] bytesToDecode = output.Take(output.LastIndexOf(0x0A) + 1).ToArray();
string outputString = Encoding.UTF8.GetString(bytesToDecode);
output.RemoveRange(0, bytesToDecode.Length);
string[] lines = outputString.Split(new[] { Environment.NewLine }, new StringSplitOptions());
for (int i = 0; i < (lines.Length - 1); i++)
{
Console.WriteLine(lines[i]);
}
memory.SetLength(0);
}
}
Evenhuis讨论的延迟ACK C.可能有一些问题,但我有一种奇怪的直觉,是StreamReader
让你头疼。。。你可以试试这样的东西:
public void AsyncCallback(IAsyncResult result)
{
HttpWebRequest request = result.AsyncState as HttpWebRequest;
using (HttpWebResponse response = (HttpWebResponse)request.EndGetResponse(result))
using (Stream stream = response.GetResponseStream())
{
var buffer = new byte[2048];
while(stream.CanRead)
{
var readCount = stream.Read(buffer, 0, buffer.Length);
var line = Encoding.UTF8.GetString(buffer.Take(readCount).ToArray());
Console.WriteLine(line);
}
}
}
编辑:这是我用来测试这个理论的全部工具(也许与你的情况不同会让你大吃一惊)
(LINQPad就绪)
void Main()
{
Task.Factory.StartNew(() => Listener());
_blocker.WaitOne();
Request();
}
public bool _running;
public ManualResetEvent _blocker = new ManualResetEvent(false);
public void Listener()
{
var listener = new HttpListener();
listener.Prefixes.Add("http://localhost:8080/");
listener.Start();
"Listener is listening...".Dump();;
_running = true;
_blocker.Set();
var ctx = listener.GetContext();
"Listener got context".Dump();
ctx.Response.KeepAlive = true;
ctx.Response.ContentType = "application/json";
var outputStream = ctx.Response.OutputStream;
using(var zipStream = new GZipStream(outputStream, CompressionMode.Compress))
using(var writer = new StreamWriter(outputStream))
{
var lineCount = 0;
while(_running && lineCount++ < 10)
{
writer.WriteLine("{ '"foo'": '"bar'"}");
"Listener wrote line, taking a nap...".Dump();
writer.Flush();
Thread.Sleep(1000);
}
}
listener.Stop();
}
public void Request()
{
var endPoint = "http://localhost:8080";
var username = "";
var password = "";
HttpWebRequest request = (HttpWebRequest)WebRequest.Create(endPoint);
request.Method = "GET";
request.PreAuthenticate = true;
request.Credentials = new NetworkCredential(username, password);
request.AutomaticDecompression = DecompressionMethods.GZip;
request.ContentType = "application/json";
request.Accept = "application/json";
request.Timeout = 30;
request.BeginGetResponse(AsyncCallback, request);
}
public void AsyncCallback(IAsyncResult result)
{
Console.WriteLine("In AsyncCallback");
HttpWebRequest request = result.AsyncState as HttpWebRequest;
using (HttpWebResponse response = (HttpWebResponse)request.EndGetResponse(result))
using (Stream stream = response.GetResponseStream())
{
while(stream.CanRead)
{
var buffer = new byte[2048];
var readCount = stream.Read(buffer, 0, buffer.Length);
var line = Encoding.UTF8.GetString(buffer.Take(readCount).ToArray());
Console.WriteLine("Reader got:" + line);
}
}
}
输出:
Listener is listening...
Listener got context
Listener wrote line, taking a nap...
In AsyncCallback
Reader got:{ "foo": "bar"}
Listener wrote line, taking a nap...
Reader got:{ "foo": "bar"}
Listener wrote line, taking a nap...
Reader got:{ "foo": "bar"}
Listener wrote line, taking a nap...
Reader got:{ "foo": "bar"}
Listener wrote line, taking a nap...
Reader got:{ "foo": "bar"}
Listener wrote line, taking a nap...
Reader got:{ "foo": "bar"}
这可能与延迟ACK和Nagle算法的结合有关。当服务器连续发送多个小响应时,就会发生这种情况。
在服务器端,发送第一个响应,但只有当服务器从客户端接收到ACK时,或者直到有足够的数据用于发送大数据包(Nagle算法),才发送后续的响应数据块。
在客户端,会接收到响应的第一位,但不会立即发送ACK——由于传统的应用程序具有请求-响应-请求-响应行为,它假设它可以将ACK与下一个请求一起发送——在您的情况下,这不会发生。
在一段固定的时间(500ms?)后,它决定无论如何都发送ACK,从而导致服务器发送它积累的下一个包。
这个问题(如果这确实是您遇到的问题)可以在服务器端的套接字级别通过设置NoDelay
属性、禁用Nagle的算法来解决。我认为你也可以在操作系统范围内禁用它。
您也可以在客户端临时禁用延迟ACK(我知道windows有一个注册表项),看看这是否真的是问题所在,而无需更改服务器上的任何内容。延迟ACK可防止DDOS攻击,因此请确保稍后恢复设置。
减少发送纪念品的频率可能也会有所帮助,但你仍然有机会出现问题。