平行的.ForEach CancellationTokenSource未停止
本文关键字:CancellationTokenSource ForEach | 更新日期: 2023-09-27 18:08:48
我目前正在编写ProxyChecker库。我正在使用一个线程,funa并行。ForEach循环检查所有代理。我使用CancellationTokenSource
(cts)进行软中止(与cts.Cancel()
)。正如你可以在下面的代码中看到的,我添加了一点"测试代码",它将当前线程写入控制台。
这是你需要的代码:
private void CheckProxies(string[] proxies, int timeout, int threads, string domainToCheckWith)
{
_cts = new CancellationTokenSource();
int checkedProxyCount = 0, uncheckedProxyCount = proxies.Length, goodProxies = 0, badProxies = 0;
mainThread = new Thread(() =>
{
try
{
Parallel.ForEach(proxies, new ParallelOptions {MaxDegreeOfParallelism = threads, CancellationToken = _cts.Token}, prox =>
{
Interlocked.Increment(ref running);
Console.WriteLine("thread running: {0}", running);
try
{
_cts.Token.ThrowIfCancellationRequested();
if (CheckProxy(prox, domainToCheckWith, timeout))
{
Interlocked.Increment(ref checkedProxyCount);
Interlocked.Increment(ref goodProxies);
Interlocked.Decrement(ref uncheckedProxyCount);
}
else
{
Interlocked.Increment(ref checkedProxyCount);
Interlocked.Decrement(ref uncheckedProxyCount);
Interlocked.Increment(ref badProxies);
}
_cts.Token.ThrowIfCancellationRequested();
OnUpdate(uncheckedProxyCount, checkedProxyCount, goodProxies, badProxies);
}
catch (OperationCanceledException ex) {}
catch (ObjectDisposedException ex) {}
catch (Exception ex)
{
OnLog(ex.Message, Color.Red);
}
finally
{
Console.WriteLine("thread running: {0}", running);
Interlocked.Decrement(ref running);
}
});
}
catch (OperationCanceledException ex) {}
catch (ObjectDisposedException ex) {}
catch (Exception ex)
{
OnLog(ex.Message, Color.Red);
}
finally
{
isRunning = false;
OnComplete();
}
});
mainThread.Start();
}
输出(我去掉了几行,因为给你完整的代码是没有用的)
thread running: 1
thread running: 1
thread running: 2
thread running: 2
//Slowly going up to 50
thread running: 50
thread running: 50
thread running: 50
//Staying at 50 till I press stop
thread running: 50
thread running: 50
thread running: 50
thread running: 50
thread running: 50
thread running: 49
thread running: 48
thread running: 47
thread running: 46
//Going down...
thread running: 17
thread running: 16
thread running: 15
thread running: 14
thread running: 13
thread running: 12
thread running: 11
thread running: 10
thread running: 10
thread running: 8
thread running: 7
thread running: 6
thread running: 5
thread running: 4
然后停在4或3或2(每次不同)。我等了几分钟,但它没下,平行线之后的代码也没下。ForEach被执行
请求超时为5000,线程数为50.
下面是检查的其他代码:
private bool CheckProxy(string proxy, string domainToCheckWith, int timeout)
{
try
{
WebRequest req = WebRequest.Create(domainToCheckWith);
req.Proxy = new WebProxy(proxy);
req.Timeout = timeout;
var response = (HttpWebResponse) req.GetResponse();
string responseString = ReadResponseString(response);
if (responseString.Contains("SOMETHING HERE"))
{
OnGoodProxy(proxy);
return true;
}
if (responseString.Contains("SOMEOTHERTHINGHERE"))
{
OnBadProxy(proxy);
return false;
}
OnBadProxy(proxy);
return false;
}
catch (WebException ex)
{
OnBadProxy(proxy);
return false;
}
catch (Exception ex)
{
OnLog(ex.Message, Color.Red);
return false;
}
}
停止功能:
public void StopChecking()
{
try
{
if (_cts != null && mainThread.IsAlive)
{
if (_cts.IsCancellationRequested)
{
mainThread.Abort();
OnLog("Hard aborting Filter Threads...", Color.DarkGreen);
while (mainThread.IsAlive) ;
OnComplete();
isRunning = false;
}
else
{
_cts.Cancel();
OnLog("Soft aborting Filter Threads...", Color.DarkGreen);
}
}
}
catch (Exception ex)
{
OnLog(ex.Message, Color.Red);
}
}
重要的编辑:
我将这个添加到CeckProxy函数中:
Stopwatch sw = new Stopwatch();
sw.Start();
string responseString = new StreamReader(response.GetResponseStream()).ReadToEnd();
sw.Stop();
这是最后几个线程的结果:
thread running: 6
4449
thread running: 5
72534
thread running: 4
180094
thread running: 3
为什么这么长?我是说180秒?!
你可以尝试lock inside尝试锁定对象
Object lockObject = new Object();
try
{
Parallel.ForEach(proxies, new ParallelOptions {MaxDegreeOfParallelism = threads, CancellationToken = _cts.Token}, prox =>
{
Interlocked.Increment(ref running);
Console.WriteLine("thread running: {0}", running);
try
{
lock(lockObject)
{
//code.............
}
}
catch
{
}
}
}
catch
{
}
好吧,我自己弄明白了。
我现在连续读取响应,并使用秒表(和request.ReadWriteTimeout)检查读取部分在达到特定时间(在我的情况下readTimeout
)后停止。代码
HttpWebRequest req = (HttpWebRequest)WebRequest.Create(domainToCheckWith);
req.Proxy = new WebProxy(proxy);
req.Timeout = timeout;
req.ReadWriteTimeout = readTimeout;
req.Headers.Add(HttpRequestHeader.AcceptEncoding, "deflate,gzip");
req.AutomaticDecompression = DecompressionMethods.Deflate | DecompressionMethods.GZip;
byte[] responseByte = new byte[1024];
string responseString = string.Empty;
sw.Start();
using (WebResponse res = req.GetResponse())
{
using (Stream stream = res.GetResponseStream())
{
while (stream.Read(responseByte, 0, responseByte.Length) > 0)
{
responseString += Encoding.UTF8.GetString(responseByte);
if(sw.ElapsedMilliseconds > (long)timeout)
throw new WebException();
}
}
}
sw.Stop();