如何可靠地从目录中(异步)移动文件,因为它们';正在创建

本文关键字:因为 创建 文件 移动 何可 异步 | 更新日期: 2023-09-27 18:24:22

在我的Windows服务解决方案中,有一个FileSystemWatcher监视目录树中的新文件,每当它触发Created事件时,我都会尝试将文件异步移动到另一台服务器以进行进一步处理。这是代码:

foreach (string fullFilePath in 
                Directory.EnumerateFiles(directoryToWatch, "*.*",  
                                         SearchOption.AllDirectories)
                         .Where(filename => fileTypes.Contains(Path.GetExtension(filename))))
            {
                string filename = Path.GetFileName(fullFilePath);
                using (FileStream sourceStream = File.Open(filename, FileMode.Open, FileAccess.Read))
                {
                    using (FileStream destStream = File.Create(Path.Combine(destination, filename)))
                    {
                        await sourceStream.CopyToAsync(destStream);
                    }
                }
            }

问题是,当这些文件被复制到我正在观察的文件夹中时,它们并不总是解锁的,对我来说是可用的。我想在遇到锁定的文件时"重试",但我不习惯异步思考,所以我不知道如何将出错的文件放回队列。

如何可靠地从目录中(异步)移动文件,因为它们';正在创建

首先,您需要"检测"异步执行过程中抛出的异常。这可以通过以下方式实现:

        try
        {
            await sourceStream.CopyToAsync(destStream);
        }
        catch (Exception copyException)
        {
        }

一旦检测到异常并得到正确处理,即您决定某个特定异常是重试的原因,您就必须维护自己的复制目标(和目标)队列,以便重试。

然后,您将不得不组织一个新的入口点,这将导致重试本身。这样的入口点可以由计时器或您使用的文件系统监视器的下一个事件触发(我不建议这样做)。在发生多个故障的情况下,您还必须实现对队列溢出的检测。请记住,这种溢出检测也存在于文件系统监视器中,如果有太多的系统事件(许多文件似乎同时被复制到受监视的文件夹中),该监视器可以简单地跳过通知。

如果这些问题不太困扰你,我建议你实现一个定时器,或者更准确地说是一个超时,以便重试复制任务。另一方面,如果您需要一个健壮的解决方案,我会自己实现一个文件系统监视器。

关于超时,它可能看起来像这样:

    private Queue<myCopyTask> queue;
    private Timer retryTimeout;
    public Program()
    {
        retryTimeout = new Timer(QueueProcess, null, Timeout.Infinite, Timeout.Infinite);
    }
    private void FileSystemMonitorEventhandler()
    {
        //New tasks are provided by the file system monitor.
        myCopyTask newTask = new myCopyTask();
        newTask.sourcePath = "...";
        newTask.destinationPath = "...";
        //Keep in mind that queue is touched from different threads.
        lock (queue)
        {
            queue.Enqueue(newTask);
        }
        //Keep in mind that Timer is touched from different threads.
        lock (retryTimeout)
        {
            retryTimeout.Change(1000, Timeout.Infinite);
        }
    }
    //Start this routine only via Timer.
    private void QueueProcess(object iTimeoutState)
    {
        myCopyTask task = null;
        do
        {
            //Keep in mind that queue is touched from different threads.
            lock (queue)
            {
                if (queue.Count > 0)
                {
                    task = queue.Dequeue();
                }
            }
            if (task != null)
            {
                CopyTaskProcess(task);
            }
        } while (task != null);
    }
    private async void CopyTaskProcess(myCopyTask task)
    {
        FileStream sourceStream = null;
        FileStream destStream = null;
        try
        {
            sourceStream = File.OpenRead(task.sourcePath);
            destStream = File.OpenWrite(task.destinationPath);
            await sourceStream.CopyToAsync(destStream);
        }
        catch (Exception copyException)
        {
            task.retryCount++;
            //In order to avoid instant retries on several problematic tasks you probably 
            //should involve a mechanism to delay retries. Keep in mind that this approach
            //delays worker thread that is implicitly involved by await keyword.
            Thread.Sleep(100);
            //Keep in mind that queue is touched from different threads.
            lock (queue)
            {
                queue.Enqueue(task);
            }
            //Keep in mind that Timer is touched from different threads.
            lock (retryTimeout)
            {
                retryTimeout.Change(1000, Timeout.Infinite);
            }
        }
        finally
        {
            if (sourceStream != null)
            {
                sourceStream.Close();
            }
            if (destStream != null)
            {
                destStream.Close();
            }
        }
    }
}
internal class myCopyTask
{
    public string sourcePath;
    public string destinationPath;
    public long retryCount;
}
相关文章: