如何在多线程时传递不同的实例

本文关键字:实例 多线程 | 更新日期: 2023-09-27 18:18:15

我正在建一个摩天大楼。我的目标是启动X个浏览器(其中X是线程数),然后通过将每个浏览器的url列表分成X部分来抓取该列表。

我决定使用3个线程(3个浏览器)和10个url列表。

问题:如何像这样在浏览器之间分离每个任务:

  1. Browser1将列表中的项目从0刮到3

  2. Browser2将列表中的项目从4刮到7

  3. Browser3将列表中的项目从8刮到10

所有浏览器应该同时抓取传递的url列表。

我已经有了这个BlockingCollection:

BlockingCollection<Action> _taskQ = new BlockingCollection<Action>();
public Multithreading(int workerCount)
{
        // Create and start a separate Task for each consumer:
        for (int i = 0; i < workerCount; i++)
            Task.Factory.StartNew(Consume);
}
public void Dispose() { _taskQ.CompleteAdding(); }
public void EnqueueTask(Action action) { _taskQ.Add(action); }
void Consume()
{
// This sequence that we’re enumerating will block when no elements
// are available and will end when CompleteAdding is called. 
foreach (Action action in _taskQ.GetConsumingEnumerable())
            action();     // Perform task.
}
public int ItemsCount()
{
        return _taskQ.Count;
}

可以这样使用:

Multithreading multithread = new Multithreading(3); //3 threads
foreach(string url in urlList){
    multithread.EnqueueTask(new Action(() => 
    {
         startScraping(browser1); //or browser2 or browser3
    }));
}

我需要在抓取之前创建浏览器实例,因为我不想每个线程都启动一个新的浏览器。

如何在多线程时传递不同的实例

考虑到Henk Holtermans的评论,你可能想要最大的速度,即尽可能让浏览器繁忙,使用这个:

private static void StartScraping(int id, IEnumerable<Uri> urls)
{
    // Construct browser here
    foreach (Uri url in urls)
    {
        // Use browser to process url here
        Console.WriteLine("Browser {0} is processing url {1}", id, url);
    }
}
在主要

:

    int nrWorkers = 3;
    int nrUrls = 10;
    BlockingCollection<Uri> taskQ = new BlockingCollection<Uri>();
    foreach (int i in Enumerable.Range(0, nrWorkers))
    {
        Task.Run(() => StartScraping(i, taskQ.GetConsumingEnumerable()));
    }
    foreach (int i in Enumerable.Range(0, nrUrls))
    {
        taskQ.Add(new Uri(String.Format("http://Url{0}", i)));
    }
    taskQ.CompleteAdding();

我认为通常的方法是有一个阻塞队列,一个提供者线程和一个任意的工作池。

提供者线程负责将url添加到队列。当没有可添加的元素时,会阻塞。

一个工作线程实例化一个浏览器,然后从队列中检索一个URL,擦除它,然后循环返回更多。当队列为空时阻塞。

你可以随便安排多少工人,他们会在他们之间进行排序。

主线启动所有线程并退到场边。如果有UI的话,它会照看UI。

多线程真的很难调试。您可能希望至少在部分作业中使用Tasks。

您可以给任务和工人一些Id。然后你会得到BlockingCollection[]而不是BlockingCollection。每个消费者将从其自己的BlockingCollection中从数组中消费。我们的工作是找到合适的消费者并发布工作。

BlockingCollection<Action>[] _taskQ;
private int taskCounter = -1;
public Multithreading(int workerCount)
{
    _taskQ = new BlockingCollection<Action>[workerCount];
    for (int i = 0; i < workerCount; i++)
    {
        int workerId = i;//To avoid closure issue
        _taskQ[workerId] = new BlockingCollection<Action>();
        Task.Factory.StartNew(()=> Consume(workerId));
    }
}
public void EnqueueTask(Action action)
{
    int value = Interlocked.Increment(ref taskCounter);
    int index = value / 4;//Your own logic to find the index here    
    _taskQ[index].Add(action);
}
void Consume(int workerId)
{
    foreach (Action action in _taskQ[workerId].GetConsumingEnumerable())
       action();// Perform task.
}

一个使用后台worker的简单解决方案可以限制线程数:

public class Scraper : IDisposable
{
    private readonly BlockingCollection<Action> tasks;
    private readonly IList<BackgroundWorker> workers;
    public Scraper(IList<Uri> urls, int numberOfThreads)
    {
        for (var i = 0; i < urls.Count; i++)
        {
            var url = urls[i];
            tasks.Add(() => Scrape(url));
        }
        for (var i = 0; i < numberOfThreads; i++)
        {
            var worker = new BackgroundWorker();
            worker.DoWork += (sender, args) =>
            {
                Action task;
                while (tasks.TryTake(out task))
                {
                    task();
                }
            };
            workers.Add(worker);
            worker.RunWorkerAsync();
        }
    }
    public void Scrape(Uri url)
    {
        Console.WriteLine("Scraping url {0}", url);
    }
    public void Dispose()
    {
        throw new NotImplementedException();
    }
}