平行的.ForEach CancellationTokenSource未停止

本文关键字:CancellationTokenSource ForEach | 更新日期: 2023-09-27 18:08:48

我目前正在编写ProxyChecker库。我正在使用一个线程,funa并行。ForEach循环检查所有代理。我使用CancellationTokenSource (cts)进行软中止(与cts.Cancel())。正如你可以在下面的代码中看到的,我添加了一点"测试代码",它将当前线程写入控制台。

这是你需要的代码:

private void CheckProxies(string[] proxies, int timeout, int threads, string domainToCheckWith)
        {
            _cts = new CancellationTokenSource();
            int checkedProxyCount = 0, uncheckedProxyCount = proxies.Length, goodProxies = 0, badProxies = 0;
            mainThread = new Thread(() =>
            {
                try
                {
                    Parallel.ForEach(proxies, new ParallelOptions {MaxDegreeOfParallelism = threads, CancellationToken = _cts.Token}, prox =>
                    {
                        Interlocked.Increment(ref running);
                        Console.WriteLine("thread running: {0}", running);
                        try
                        {
                            _cts.Token.ThrowIfCancellationRequested();
                            if (CheckProxy(prox, domainToCheckWith, timeout))
                            {
                                Interlocked.Increment(ref checkedProxyCount);
                                Interlocked.Increment(ref goodProxies);
                                Interlocked.Decrement(ref uncheckedProxyCount);
                            }
                            else
                            {
                                Interlocked.Increment(ref checkedProxyCount);
                                Interlocked.Decrement(ref uncheckedProxyCount);
                                Interlocked.Increment(ref badProxies);
                            }
                            _cts.Token.ThrowIfCancellationRequested();
                            OnUpdate(uncheckedProxyCount, checkedProxyCount, goodProxies, badProxies);
                        }
                        catch (OperationCanceledException ex) {}
                        catch (ObjectDisposedException ex) {}
                        catch (Exception ex)
                        {
                            OnLog(ex.Message, Color.Red);
                        }
                        finally
                        {
                            Console.WriteLine("thread running: {0}", running);
                            Interlocked.Decrement(ref running);
                        }
                    });
                }
                catch (OperationCanceledException ex) {}
                catch (ObjectDisposedException ex) {}
                catch (Exception ex)
                {
                    OnLog(ex.Message, Color.Red);
                }
                finally
                {
                    isRunning = false;
                    OnComplete();
                }
            });
            mainThread.Start();
        }

输出(我去掉了几行,因为给你完整的代码是没有用的)

thread running: 1
thread running: 1
thread running: 2
thread running: 2
//Slowly going up to  50
thread running: 50
thread running: 50
thread running: 50
//Staying at 50 till I press stop
thread running: 50
thread running: 50
thread running: 50
thread running: 50
thread running: 50
thread running: 49
thread running: 48
thread running: 47
thread running: 46
//Going down...
thread running: 17
thread running: 16
thread running: 15
thread running: 14
thread running: 13
thread running: 12
thread running: 11
thread running: 10
thread running: 10
thread running: 8
thread running: 7
thread running: 6
thread running: 5
thread running: 4

然后停在4或3或2(每次不同)。我等了几分钟,但它没下,平行线之后的代码也没下。ForEach被执行

请求超时为5000,线程数为50.

下面是检查的其他代码:

private bool CheckProxy(string proxy, string domainToCheckWith, int timeout)
{
    try
    {
        WebRequest req = WebRequest.Create(domainToCheckWith);
        req.Proxy = new WebProxy(proxy);
        req.Timeout = timeout;
        var response = (HttpWebResponse) req.GetResponse();
        string responseString = ReadResponseString(response);
        if (responseString.Contains("SOMETHING HERE"))
        {
            OnGoodProxy(proxy);
            return true;
        }
        if (responseString.Contains("SOMEOTHERTHINGHERE"))
        {
            OnBadProxy(proxy);
            return false;
        }
        OnBadProxy(proxy);
        return false;
    }
    catch (WebException ex)
    {
        OnBadProxy(proxy);
        return false;
    }
    catch (Exception ex)
    {
        OnLog(ex.Message, Color.Red);
        return false;
    }
}

停止功能:

public void StopChecking()
{
    try
    {
        if (_cts != null && mainThread.IsAlive)
        {
            if (_cts.IsCancellationRequested)
            {
                mainThread.Abort();
                OnLog("Hard aborting Filter Threads...", Color.DarkGreen);
                while (mainThread.IsAlive) ;
                OnComplete();
                isRunning = false;
            }
            else
            {
                _cts.Cancel();
                OnLog("Soft aborting Filter Threads...", Color.DarkGreen);
            }
        }
    }
    catch (Exception ex)
    {
        OnLog(ex.Message, Color.Red);
    }
}

重要的编辑:

我将这个添加到CeckProxy函数中:

        Stopwatch sw = new Stopwatch();
        sw.Start();
        string responseString = new StreamReader(response.GetResponseStream()).ReadToEnd();
        sw.Stop();

这是最后几个线程的结果:

thread running: 6
4449
thread running: 5
72534
thread running: 4
180094
thread running: 3

为什么这么长?我是说180秒?!

平行的.ForEach CancellationTokenSource未停止

你可以尝试lock inside尝试锁定对象

Object lockObject = new Object();
try
{
    Parallel.ForEach(proxies, new ParallelOptions {MaxDegreeOfParallelism = threads, CancellationToken = _cts.Token}, prox =>
    {
        Interlocked.Increment(ref running);
        Console.WriteLine("thread running: {0}", running);
        try
        {
            lock(lockObject)
            {
                //code.............
            }
        }
        catch
        {
        }
    }
}
catch
{
}

好吧,我自己弄明白了。

我现在连续读取响应,并使用秒表(和request.ReadWriteTimeout)检查读取部分在达到特定时间(在我的情况下readTimeout)后停止。代码

HttpWebRequest req = (HttpWebRequest)WebRequest.Create(domainToCheckWith);
            req.Proxy = new WebProxy(proxy);
            req.Timeout = timeout;
            req.ReadWriteTimeout = readTimeout;
            req.Headers.Add(HttpRequestHeader.AcceptEncoding, "deflate,gzip");
            req.AutomaticDecompression = DecompressionMethods.Deflate | DecompressionMethods.GZip;
            byte[] responseByte = new byte[1024];
            string responseString = string.Empty;
            sw.Start();
            using (WebResponse res = req.GetResponse())
            {
                using (Stream stream = res.GetResponseStream())
                {
                    while (stream.Read(responseByte, 0, responseByte.Length) > 0)
                    {
                        responseString += Encoding.UTF8.GetString(responseByte);
                        if(sw.ElapsedMilliseconds > (long)timeout)
                            throw new WebException();
                    }
                }
            }
            sw.Stop();
相关文章:
  • 没有找到相关文章