对concurrentQueue的误解,单个消费者在自己的线程上从队列中工作

本文关键字:线程 队列 工作 自己的 concurrentQueue 误解 消费者 单个 | 更新日期: 2023-09-27 18:06:50

我在创建一个功能正常的SystemFileWatcher时遇到了麻烦,该系统将创建的事件存储在队列中,以便单独的线程工作。我在这里读了无数关于这个问题的帖子,但我无法理解这个特殊的问题。

using System;
using System.IO;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Collections;
using System.Threading;
namespace FileSystemWatcherTest
{
    class Program
    {
        public static BlockingCollection<string> processCollection = new BlockingCollection<string>(new ConcurrentQueue<string>());
    static void Main(string[] args)
    {
        string path = @"C:'test'";
        FileSystemWatcher watcher = new FileSystemWatcher();
        watcher.Path = path;
        watcher.EnableRaisingEvents = true;
        watcher.Filter = "*.*";
        watcher.Created += new FileSystemEventHandler(onCreated);
        Thread Consumer = new Thread(new ThreadStart(mover));
        Consumer.Start();

        while (true) ;//run infinite loop so program doesn't terminate untill we force it.
    }
    static void onCreated(object sender, FileSystemEventArgs e)
    {
        processCollection.Add(e.FullPath);     
    }
    static void mover()
    {
        string current;
        string processed = @"C:'test'processed'";
        while (true)
        {
            while (processCollection.IsCompleted)
            {
                Thread.Sleep(1000);
            }
            while (processCollection.TryTake(out current))
            {
                System.IO.File.Move(current, processed);
            }
        }
    }
}

}

这就是我要测试的。我知道这行不通。当文件被放入队列中时,我已经验证了FSW可以工作。当我试图在自己的线程中启动移动函数时,我的问题开始了。一旦我开始处理队列,mover函数和onCreated似乎没有通信。

我对这段代码的期望是在自己的线程中启动mover函数,并与SFW一起运行。我的期望是,连接到blockingcollection的concurrentqueue会自动更新(我通过onCreated对一个项目进行排队,移动程序看到它现在对该队列有+1。)移动器从队列中取出一个,onCreated会看到这个。)我可能正在使用线程。睡眠不正确。我不再有支持使用blockingcollection的理由(我最初选择它来处理等待队列填满的问题,基本上,不断检查队列以处理一个项目),并且我愿意将其更改为任何可能工作的内容。我已经看到了锁的使用,但据我所知,由于concurrentQueue的同步方式,这并不是真正必要的。

最终目标是处理大量随机传入的小文件,在任何给定时间可以从1到数百个不等。这些文件是。emls。

如果可能的话,我希望你能解释一下发生了什么,并提出解决这个问题的建议。我谦卑地来到这里,希望被告知我所理解的一切都是错误的!

编辑:我正在测试这个作为一个控制台应用程序,但它将被用作服务之后。我添加了while (true);

对concurrentQueue的误解,单个消费者在自己的线程上从队列中工作

您的代码示例中有几个不同的问题:

  1. 您正在滥用File.Move()方法。它要求两个参数都是完整的文件名称。您正在传递目录名作为第二个参数,这是不正确的。
  2. 您正在检查集合的IsCompleted属性,好像这将是有用的。它总是false,所以代码块什么都不做。这就引出了下一个问题。
  3. 你的线程在一个紧循环中运行,消耗了大量的CPU时间。这可能会也可能不会导致错误,但它可能会有所帮助。FileSystemWatcher实际上并不能保证总是报告更改,其中一个原因可能是它无法获得足够的CPU时间来监视文件系统。如果您通过耗尽所有CPU时间来饿死它,您可能会发现它根本不报告任何更改。注意,这个问题也存在于你的主线程中;它也在一个紧密循环中运行,什么都不做,消耗了大量的CPU时间。所以你完全占据了系统的两个核心。
  4. 你没有充分利用BlockingCollection设计的生产者/消费者执行模型。你应该让你的工作线程枚举GetConsumingEnumerable()返回的枚举,使用CompleteAdding()方法通知该线程没有更多的工作。

下面是你的代码示例的一个版本,它纠正了上述错误,并清理了示例,使其更独立:

// The default backing collection for BlockingCollection<T>
// is ConcurrentQueue<T>. There's no need to specify that
// explicitly.
public static BlockingCollection<string> processCollection = new BlockingCollection<string>();
static void Main(string[] args)
{
    string testDirectory = Path.Combine(Environment.CurrentDirectory, "test");
    Console.WriteLine("Creating directory: '"{0}'"", testDirectory);
    Directory.CreateDirectory(testDirectory);
    FileSystemWatcher watcher = new FileSystemWatcher();
    watcher.Path = testDirectory;
    watcher.EnableRaisingEvents = true;
    watcher.Filter = "*.*";
    watcher.Created += new FileSystemEventHandler(onCreated);
    Thread Consumer = new Thread(new ParameterizedThreadStart(mover));
    Consumer.Start(testDirectory);
    string text;
    while ((text = Console.ReadLine()) != "")
    {
        string newFile = Path.Combine(testDirectory, text + ".txt");
        File.WriteAllText(newFile, "Test file");
    }
    processCollection.CompleteAdding();
}
static void onCreated(object sender, FileSystemEventArgs e)
{
    if (e.ChangeType == WatcherChangeTypes.Created)
    {
        processCollection.Add(e.FullPath);
    }
}
static void mover(object testDirectory)
{
    string processed = Path.Combine((string)testDirectory, "processed");
    Console.WriteLine("Creating directory: '"{0}'"", processed);
    Directory.CreateDirectory(processed);
    foreach (string current in processCollection.GetConsumingEnumerable())
    {
        // Ensure that the file is in fact a file and not something else.
        if (File.Exists(current))
        {
            System.IO.File.Move(current, Path.Combine(processed, Path.GetFileName(current)));
        }
    }
}