在c#中运行多个任务

本文关键字:任务 运行 | 更新日期: 2023-09-27 18:09:00

我正在开发一个实时软件,我有一些非实时功能。

规格:

  • 需要实时执行method1;
  • 需要每60分钟做一次method2

我应该使用多线程吗?任务吗?

现在我正在使用计时器,但我不认为这是一个很好的用途。

在c#中运行多个任务

在文件夹中出现文件时立即启动文件处理通常不是一个好主意。例如,如果一次到达100个文件,那么最终会有100个线程都在争夺相同的物理资源,例如磁盘磁头等。在我看来,最好的方法是有一个实时,或者至少是用。net通知存在一个新文件,然后使用生产者-消费者队列

处理这些文件。

最好使用线程安全的集合,如ConcurrentQueue和长时间运行的活动线程数,例如,在任何时候通过同步机制(如SemaphoreSlim)控制发送邮件给正在处理的线程

使用FileSystemWatcher类可以轻松实现目录中新创建文件的通知。理想情况下,以基于时间的方式执行处理应该使用观察者模式

下面我创建了一个简单的应用程序来演示这些概念,这应该会有所帮助。

应用程序由许多关键类组成

  1. SingletonBase类,它实现了应该只实例化一次的类的单例模式,例如filesystemwatcher类
  2. FileSystemMonitor类,它监视目录中正在创建的新文件,并且立即通知处理队列有新文件存在。当然,您可以很容易地修改它以开始立即处理文件,但如上所述,这通常是一个坏主意
  3. 处理队列访问和相关任务同步的FilesWorker类。

    using System;
    using System.Collections.Concurrent;
    using System.Globalization;
    using System.Reactive.Linq;
    using System.Reflection;
    using System.Threading;
    using System.Threading.Tasks;
    using System.IO;
    using System.Security.Permissions;
    
    namespace ConsoleApplication9
    {
        internal class Program
        {
            private static void Main(string[] args)
            {
    
                const string directorytowatch = @"d:'junk'watch'"; // the directory to watch for new files
                // this initiates a filesystemmonitor to watch for new files being created 
                Task.Factory.StartNew(() => FileSystemMonitor.Instance.WatchDirectory(directorytowatch));          
                // initiate the processing of any new files
                FilesWorker.Instance.ReadQueue();
                Console.ReadLine();
            }
    
        }
        /// <summary>
        /// Monitors the filesystem in "real-time" to check for new files
        /// </summary>
        [PermissionSet(SecurityAction.Demand, Name = "FullTrust")]
        internal class FileSystemMonitor : SingletonBase<FileSystemMonitor>
        {
            private FileSystemMonitor()
            {
            }
            internal void WatchDirectory(string dir)
            {
                var watcher = new FileSystemWatcher(dir)
                {
                    NotifyFilter = NotifyFilters.FileName | NotifyFilters.LastWrite | NotifyFilters.LastAccess,
                    Filter = "*.*"
                };
                // watch all files
                watcher.Created += WatcherOnCreated;
                watcher.EnableRaisingEvents = true;
            }
            private static void WatcherOnCreated(object sender, FileSystemEventArgs fileSystemEventArgs)
            {
                Console.WriteLine(fileSystemEventArgs.FullPath + "" + fileSystemEventArgs.ChangeType); // for test purposes
                var fileInfo = new FileInfo(fileSystemEventArgs.FullPath);
                FilesWorker.Instance.AddToQueue(fileInfo);
            }
        }
        /// <summary>
        /// handles the queue of files to be processed and the syncronisation of tasks related to the queue
        /// </summary>
        internal class FilesWorker : SingletonBase<FilesWorker>
        {
            private FilesWorker()
            {
            }
            /// <summary>
            /// The queue of files which still need to be processed
            /// </summary>
            private readonly ConcurrentQueue<FileInfo> _filesQueue = new ConcurrentQueue<FileInfo>();
            /// <summary>
            /// create a semaphore to limit the number of threads which can process a file at any given time
            // In this case only allow 2 to be processed at any given time
            /// </summary>
            private static readonly SemaphoreSlim Semaphore = new SemaphoreSlim(2, 2);
            /// <summary>
            /// add new file to the queue
            /// </summary>
            /// <param name="fileInfo"></param>
            internal void AddToQueue(FileInfo fileInfo)
            {
                _filesQueue.Enqueue(fileInfo);
            }
            /// <summary>
            /// executes a method on a given timeframe
            /// </summary>
            /// <param name="method">method to execute</param>
            /// <param name="timer">time between execution runs (seconds)</param>
            internal void ExecuteMethod(Action method, double timer)
            {
                IObservable<long> observable = Observable.Interval(TimeSpan.FromSeconds(timer));
                // Token for cancelation
                var source = new CancellationTokenSource();
                observable.Subscribe(x =>
                {
                    var task = new Task(method);
                    task.Start();
                }, source.Token);
            }
            /// <summary>
            /// Get any new files and send for processing
            /// </summary>
            internal void ReadQueue()
            {
                // check the queue every two seconds
                ExecuteMethod(ProcessQueue, 2d);            
            }
            /// <summary>
            /// takes files from the queue and starts processing
            /// </summary>
            internal void ProcessQueue()
            {
                try
                {
                    Semaphore.Wait();
                    FileInfo fileInfo;
                    while (_filesQueue.TryDequeue(out fileInfo))
                    {
                        var fileProcessor = new FileProcessor();
                        fileProcessor.ProcessFile(fileInfo);
                    }
                }
                finally
                {
                    Semaphore.Release();
                }
            }
        }
        internal class FileProcessor
        {
            internal void ProcessFile(FileInfo fileInfo)
            {
                // do some long running tasks with the file
            }
        }
    
        /// <summary>
        /// Implements singleton pattern on all classes which derive from it
        /// </summary>
        /// <typeparam name="T">Derived class</typeparam>
        public abstract class SingletonBase<T> where T : class
        {
            public static T Instance
            {
                get { return SingletonFactory.Instance; }
            }
            /// <summary>
            /// The singleton class factory to create the singleton instance.
            /// </summary>
            private class SingletonFactory
            {
                static SingletonFactory()
                {
                }
                private SingletonFactory()
                {
                }
                internal static readonly T Instance = GetInstance();
                private static T GetInstance()
                {
                    var theType = typeof(T);
                    T inst;
                    try
                    {
                        inst = (T)theType
                            .InvokeMember(theType.Name,
                                BindingFlags.CreateInstance | BindingFlags.Instance
                                | BindingFlags.NonPublic,
                                null, null, null,
                                CultureInfo.InvariantCulture);
                    }
                    catch (MissingMethodException ex)
                    {
                        var exception = new TypeLoadException(string.Format(
                            CultureInfo.CurrentCulture,
                            "The type '{0}' must have a private constructor to " +
                            "be used in the Singleton pattern.", theType.FullName)
                            , ex);
                        //LogManager.LogException(LogManager.EventIdInternal, exception, "error in instantiating the singleton");
                        throw exception;
                    }
                    return inst;
                }
            }
        }
    }