如何在多线程时传递不同的实例
本文关键字:实例 多线程 | 更新日期: 2023-09-27 18:18:15
我正在建一个摩天大楼。我的目标是启动X个浏览器(其中X是线程数),然后通过将每个浏览器的url列表分成X部分来抓取该列表。
我决定使用3个线程(3个浏览器)和10个url列表。
问题:如何像这样在浏览器之间分离每个任务:
-
Browser1将列表中的项目从0刮到3
-
Browser2将列表中的项目从4刮到7
-
Browser3将列表中的项目从8刮到10
所有浏览器应该同时抓取传递的url列表。
我已经有了这个BlockingCollection
:
BlockingCollection<Action> _taskQ = new BlockingCollection<Action>();
public Multithreading(int workerCount)
{
// Create and start a separate Task for each consumer:
for (int i = 0; i < workerCount; i++)
Task.Factory.StartNew(Consume);
}
public void Dispose() { _taskQ.CompleteAdding(); }
public void EnqueueTask(Action action) { _taskQ.Add(action); }
void Consume()
{
// This sequence that we’re enumerating will block when no elements
// are available and will end when CompleteAdding is called.
foreach (Action action in _taskQ.GetConsumingEnumerable())
action(); // Perform task.
}
public int ItemsCount()
{
return _taskQ.Count;
}
可以这样使用:
Multithreading multithread = new Multithreading(3); //3 threads
foreach(string url in urlList){
multithread.EnqueueTask(new Action(() =>
{
startScraping(browser1); //or browser2 or browser3
}));
}
我需要在抓取之前创建浏览器实例,因为我不想每个线程都启动一个新的浏览器。
考虑到Henk Holtermans的评论,你可能想要最大的速度,即尽可能让浏览器繁忙,使用这个:
private static void StartScraping(int id, IEnumerable<Uri> urls)
{
// Construct browser here
foreach (Uri url in urls)
{
// Use browser to process url here
Console.WriteLine("Browser {0} is processing url {1}", id, url);
}
}
在主要:
int nrWorkers = 3;
int nrUrls = 10;
BlockingCollection<Uri> taskQ = new BlockingCollection<Uri>();
foreach (int i in Enumerable.Range(0, nrWorkers))
{
Task.Run(() => StartScraping(i, taskQ.GetConsumingEnumerable()));
}
foreach (int i in Enumerable.Range(0, nrUrls))
{
taskQ.Add(new Uri(String.Format("http://Url{0}", i)));
}
taskQ.CompleteAdding();
我认为通常的方法是有一个阻塞队列,一个提供者线程和一个任意的工作池。
提供者线程负责将url添加到队列。当没有可添加的元素时,会阻塞。
一个工作线程实例化一个浏览器,然后从队列中检索一个URL,擦除它,然后循环返回更多。当队列为空时阻塞。
你可以随便安排多少工人,他们会在他们之间进行排序。
主线启动所有线程并退到场边。如果有UI的话,它会照看UI。
多线程真的很难调试。您可能希望至少在部分作业中使用Tasks。
您可以给任务和工人一些Id
。然后你会得到BlockingCollection[]
而不是BlockingCollection
。每个消费者将从其自己的BlockingCollection
中从数组中消费。我们的工作是找到合适的消费者并发布工作。
BlockingCollection<Action>[] _taskQ;
private int taskCounter = -1;
public Multithreading(int workerCount)
{
_taskQ = new BlockingCollection<Action>[workerCount];
for (int i = 0; i < workerCount; i++)
{
int workerId = i;//To avoid closure issue
_taskQ[workerId] = new BlockingCollection<Action>();
Task.Factory.StartNew(()=> Consume(workerId));
}
}
public void EnqueueTask(Action action)
{
int value = Interlocked.Increment(ref taskCounter);
int index = value / 4;//Your own logic to find the index here
_taskQ[index].Add(action);
}
void Consume(int workerId)
{
foreach (Action action in _taskQ[workerId].GetConsumingEnumerable())
action();// Perform task.
}
一个使用后台worker的简单解决方案可以限制线程数:
public class Scraper : IDisposable
{
private readonly BlockingCollection<Action> tasks;
private readonly IList<BackgroundWorker> workers;
public Scraper(IList<Uri> urls, int numberOfThreads)
{
for (var i = 0; i < urls.Count; i++)
{
var url = urls[i];
tasks.Add(() => Scrape(url));
}
for (var i = 0; i < numberOfThreads; i++)
{
var worker = new BackgroundWorker();
worker.DoWork += (sender, args) =>
{
Action task;
while (tasks.TryTake(out task))
{
task();
}
};
workers.Add(worker);
worker.RunWorkerAsync();
}
}
public void Scrape(Uri url)
{
Console.WriteLine("Scraping url {0}", url);
}
public void Dispose()
{
throw new NotImplementedException();
}
}