对concurrentQueue的误解,单个消费者在自己的线程上从队列中工作
本文关键字:线程 队列 工作 自己的 concurrentQueue 误解 消费者 单个 | 更新日期: 2023-09-27 18:06:50
我在创建一个功能正常的SystemFileWatcher时遇到了麻烦,该系统将创建的事件存储在队列中,以便单独的线程工作。我在这里读了无数关于这个问题的帖子,但我无法理解这个特殊的问题。
using System;
using System.IO;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Collections;
using System.Threading;
namespace FileSystemWatcherTest
{
class Program
{
public static BlockingCollection<string> processCollection = new BlockingCollection<string>(new ConcurrentQueue<string>());
static void Main(string[] args)
{
string path = @"C:'test'";
FileSystemWatcher watcher = new FileSystemWatcher();
watcher.Path = path;
watcher.EnableRaisingEvents = true;
watcher.Filter = "*.*";
watcher.Created += new FileSystemEventHandler(onCreated);
Thread Consumer = new Thread(new ThreadStart(mover));
Consumer.Start();
while (true) ;//run infinite loop so program doesn't terminate untill we force it.
}
static void onCreated(object sender, FileSystemEventArgs e)
{
processCollection.Add(e.FullPath);
}
static void mover()
{
string current;
string processed = @"C:'test'processed'";
while (true)
{
while (processCollection.IsCompleted)
{
Thread.Sleep(1000);
}
while (processCollection.TryTake(out current))
{
System.IO.File.Move(current, processed);
}
}
}
}
}
这就是我要测试的。我知道这行不通。当文件被放入队列中时,我已经验证了FSW可以工作。当我试图在自己的线程中启动移动函数时,我的问题开始了。一旦我开始处理队列,mover函数和onCreated似乎没有通信。
我对这段代码的期望是在自己的线程中启动mover函数,并与SFW一起运行。我的期望是,连接到blockingcollection的concurrentqueue会自动更新(我通过onCreated对一个项目进行排队,移动程序看到它现在对该队列有+1。)移动器从队列中取出一个,onCreated会看到这个。)我可能正在使用线程。睡眠不正确。我不再有支持使用blockingcollection的理由(我最初选择它来处理等待队列填满的问题,基本上,不断检查队列以处理一个项目),并且我愿意将其更改为任何可能工作的内容。我已经看到了锁的使用,但据我所知,由于concurrentQueue的同步方式,这并不是真正必要的。
最终目标是处理大量随机传入的小文件,在任何给定时间可以从1到数百个不等。这些文件是。emls。
如果可能的话,我希望你能解释一下发生了什么,并提出解决这个问题的建议。我谦卑地来到这里,希望被告知我所理解的一切都是错误的!
编辑:我正在测试这个作为一个控制台应用程序,但它将被用作服务之后。我添加了while (true);
您的代码示例中有几个不同的问题:
- 您正在滥用
File.Move()
方法。它要求两个参数都是完整的文件名称。您正在传递目录名作为第二个参数,这是不正确的。 - 您正在检查集合的
IsCompleted
属性,好像这将是有用的。它总是false
,所以代码块什么都不做。这就引出了下一个问题。 你的线程在一个紧循环中运行,消耗了大量的CPU时间。这可能会也可能不会导致错误,但它可能会有所帮助。 - 你没有充分利用
BlockingCollection
设计的生产者/消费者执行模型。你应该让你的工作线程枚举GetConsumingEnumerable()
返回的枚举,使用CompleteAdding()
方法通知该线程没有更多的工作。
FileSystemWatcher
实际上并不能保证总是报告更改,其中一个原因可能是它无法获得足够的CPU时间来监视文件系统。如果您通过耗尽所有CPU时间来饿死它,您可能会发现它根本不报告任何更改。注意,这个问题也存在于你的主线程中;它也在一个紧密循环中运行,什么都不做,消耗了大量的CPU时间。所以你完全占据了系统的两个核心。下面是你的代码示例的一个版本,它纠正了上述错误,并清理了示例,使其更独立:
// The default backing collection for BlockingCollection<T>
// is ConcurrentQueue<T>. There's no need to specify that
// explicitly.
public static BlockingCollection<string> processCollection = new BlockingCollection<string>();
static void Main(string[] args)
{
string testDirectory = Path.Combine(Environment.CurrentDirectory, "test");
Console.WriteLine("Creating directory: '"{0}'"", testDirectory);
Directory.CreateDirectory(testDirectory);
FileSystemWatcher watcher = new FileSystemWatcher();
watcher.Path = testDirectory;
watcher.EnableRaisingEvents = true;
watcher.Filter = "*.*";
watcher.Created += new FileSystemEventHandler(onCreated);
Thread Consumer = new Thread(new ParameterizedThreadStart(mover));
Consumer.Start(testDirectory);
string text;
while ((text = Console.ReadLine()) != "")
{
string newFile = Path.Combine(testDirectory, text + ".txt");
File.WriteAllText(newFile, "Test file");
}
processCollection.CompleteAdding();
}
static void onCreated(object sender, FileSystemEventArgs e)
{
if (e.ChangeType == WatcherChangeTypes.Created)
{
processCollection.Add(e.FullPath);
}
}
static void mover(object testDirectory)
{
string processed = Path.Combine((string)testDirectory, "processed");
Console.WriteLine("Creating directory: '"{0}'"", processed);
Directory.CreateDirectory(processed);
foreach (string current in processCollection.GetConsumingEnumerable())
{
// Ensure that the file is in fact a file and not something else.
if (File.Exists(current))
{
System.IO.File.Move(current, Path.Combine(processed, Path.GetFileName(current)));
}
}
}