有效地利用CPU空闲时间进行并行URL处理

本文关键字:并行 URL 处理 时间 CPU 有效地 | 更新日期: 2023-09-27 18:12:08

我们在生产服务器中有一个作业服务,它在计时器的帮助下每隔2分钟触发一次。
该服务从数据库收集20个任务,然后创建20个url(相同的主机,不同的参数);使用线程池线程并行触发它们,并等待响应。
url是循环返回的。即,目标网站也托管在同一服务器。

流程是:

1. If task is simple, URL response will get back to job service within seconds. 
   But job service has to wait for another 2 minutes to pick next 20 jobs. 
   CPU will be idle here. 
   How we can efficiently utilize this CPU idle time for processing more jobs…?
2. If task is long running, job service will wait max 2 minutes 
   for the response, if not receiving a response; service will 
   pick next 20 jobs to process. As a result jobs will be queued up 
   and CPU usage will go very high. 
   How do we prevent such a situation….?


除了这个计时器和定期挑选作业之外,我们还有其他有效的方法来处理作业吗?
比如监控IIS工作进程和CPU使用情况,基于此选择作业并处理它们.....
如果是这样,我们如何通过使用c#代码......来监控IIS工作进程和CPU使用情况?

或者其他的想法.....谢谢。

更新:创建并行线程的代码片段:

public class ParallelProcess
  {
    #region "Instance constructor"
    public ParallelProcess()
    {
      //Handle unhandled execeptions, throws from threads.
      AppDomain.CurrentDomain.UnhandledException += OnUnhandledException;
    }
    #endregion
#region "Private Fields"
#region "Static"
//Number of threads to be activated to process each job request.
private static int MaxConcurrentThreads = 0;
//Minimum number of idle threads that should exist in the pool.
private static int ThreadBuffer = 0;
//Time out in ms for the theadpool to wait on all threads to complete.
private static int ThreadWaitTimeOut = 0;
//Available Worker Threads in thread pool
private static int workerThreads = 0;
//Available Port Threads in thread pool
private static int completionThreads = 0;
//Minimum Worker Threads available in ThreadPool
private static int minW = 0;
//Minimum Port Threads available in ThreadPool
#endregion
private static int minC = 0;
#endregion
 #region "static constructor"
    static ParallelProcess()
    {
      try
      {
        //Required threads defined in C:'HobooAppServer_Config'hobooAppServer_config.xml
        MaxConcurrentThreads = Configuration.RequiredThread;
        ThreadBuffer =Configuration.ThreadBuffer;
        //In milliseconds
        ThreadWaitTimeOut =Configuration.TRWaitTime;
         //In milliseconds
         //Get min number of threads from thread pool.
         ThreadPool.GetMinThreads(minW, minC);
         //set the thead limit to spawn in the current thread pool.
         if (minW >= MaxConcurrentThreads)
         {
          ThreadPool.SetMinThreads(MaxConcurrentThreads, minC);
          minW = MaxConcurrentThreads;
         }
        else
        {
          ThreadPool.SetMinThreads(minW, minC);
        }
        }
      catch (Exception ex)
      {
        //Write exception to log file.
        WriteJobLog(new JobLogDTO
        {
          Mode = "Parallel",
          UniqueId = "Thread Pool Exception",
          ThreadId = Thread.CurrentThread.ManagedThreadId.ToString(),
          StartTime = DateTime.Now.ToString(),
          ExceptionOrResult = ex.ToString()
        });
      }
    }
    #endregion

#region "public methods"
/// <summary>
/// Gets the count of rows to be retrieved from job.
/// This takes into account the current work load and available threads to process.
/// </summary>
/// <returns>int (Number of threads currently available)</returns>
private int GetMaxItemsRetrievalCount()
{
  int rtnVal = 1;
  try
  {
    //Get Available idle threads currently in the thead pool.
    ThreadPool.GetAvailableThreads(workerThreads, completionThreads);
    rtnVal = workerThreads > MaxConcurrentThreads ? MaxConcurrentThreads : workerThreads;
    rtnVal = rtnVal > 0 ? rtnVal : 0;
  }
  catch (Exception ex)
  {
    //Write exceptions to log file.
    WriteJobLog(new JobLogDTO
    {
      Mode = "Parallel",
      UniqueId = "GetMaxItemsRetrievalCount Exception",
      ThreadId = Thread.CurrentThread.ManagedThreadId.ToString(),
      StartTime = DateTime.Now.ToString(),
      ExceptionOrResult = ex.ToString()
    });
  }
  return rtnVal;
}
/// <summary>
/// The method which processes jobs on worker threads.
/// </summary>
public void ProcessBatchJobs(bool pIsNight, bool plPriority, string pUniqueId)
{
  bool isContinue = true;
  int maxRecordCount = 0;
  ManualResetEvent[] signalEvents = null;
  do
  {
    maxRecordCount = GetMaxItemsRetrievalCount();
    if (maxRecordCount > 0)
    {
      //Pick jobs from database
      List<SnapShotTask> Jobs =Business.Rtds.SnapShot.PickTasks(pIsNight, plPriority);
      if (Jobs != null && Jobs.Count > 0)
      {
        //Log Header-Thread Pool Information And Statistics - In Parallel Threads
        WriteJobLog(new JobLogDTO
        {
          Mode = "Parallel",
          UniqueId = pUniqueId,
          ThreadId = Thread.CurrentThread.ManagedThreadId.ToString(),
          StartTime = DateTime.Now.ToString(),
          AvblWorkerThread = workerThreads.ToString(),
          AvblPortThread = completionThreads.ToString(),
          AcqrdWorkerThread = minW.ToString(),
          AcqurdPortThread = minC.ToString(),
          JobsToProcess = Jobs.Count.ToString()
        });
        signalEvents = new ManualResetEvent[Jobs.Count];
        int signalCount = 0;
        //Loop through each job, create call back function, add items to queue, fire them
        foreach (SnapShotTask job in Jobs)
        {
          signalEvents(signalCount) = new ManualResetEvent(false);
          BatchCallBack threadPoolCallBack = 
           new BatchCallBack(job, signalEvents(signalCount));
          bool nResult = 
           ThreadPool.QueueUserWorkItem(
           new WaitCallback(threadPoolCallBack.ThreadPoolCallback),
           new BatchThreadData
          {
            IsNight = pIsNight,
            UniqueId = pUniqueId
          });
          signalCount += 1;
        }
        //Wait here untill child threads finish the job or timeout meets.
        bool result = WaitHandle.WaitAll(signalEvents, ParallelProcess.ThreadWaitTimeOut);
        //When one or more threads have not set the signal.
        if (result == false)
        {
          //Logger.Write("Not all threads completed in the pool. 
          //The pool exited due to time-out.");
        }
        else
        {
          //Logging time taken per batch.
          //Logger.Write(string.Format("Average Time taken for each batch of {1} orders : {0} ms",
          //New TimeSpan((ticksEnd - ticksStart)).TotalMilliseconds.ToString());
        }
        //ticksEnd = DateTime.Now.Ticks
      }
      else
      {
        //TODO : Retry Logic
        //Nothing to process.
        isContinue = false;
      }
    }
    else
    {
      //We did not get a thread to execute. So wait for a free thread(s).
      Thread.Sleep(1000);
    }
  } while (isContinue);
  //end time after batch is done.
  //endTime = DateTime.Now
  //Log the over-all time taken.
}
/// <summary>
/// Log unhandled exceptions from this application domain if any.
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void OnUnhandledException(object sender, UnhandledExceptionEventArgs e)
{
  Exception ex = e.ExceptionObject as Exception;
  //Write unhandled exception to log file.
  WriteJobLog(new JobLogDTO
  {
    Mode = "Parallel",
    UniqueId = "UnhandledException",
    ThreadId = Thread.CurrentThread.ManagedThreadId.ToString(),
    StartTime = DateTime.Now.ToString(),
    ExceptionOrResult = ex.ToString()
  });
}
#endregion
}

有效地利用CPU空闲时间进行并行URL处理

您不需要监视CPU负载,因为操作系统会为您做这些。它将根据线程的优先级调度线程。

将任务线程设置为较低的优先级。这样做,这些任务将被延迟,直到CPU有空闲时间来执行不太重要的线程。

设置优先级:

Thread.CurrentThread.ThreadPriority = ThreadPriority.BelowNormal; // Or ThreadPriority.Lowest.