C# 中的多个并发定期刷新操作

本文关键字:刷新 操作 并发 | 更新日期: 2023-09-27 18:32:28

需要定期执行一些操作。主要要求是
1( 不要在上一个更新周期没有完成
的情况下开始下一个更新周期 2( 如果在上一次迭代中获得的数据仍然有效,即自上次刷新以来的时间小于 TTL 值
,则不要开始更新 3( 执行此类更新需要单独的(例如>10( 线程。
SO上有很多类似的问题,所以我在这里找到了PeriodicTaskFactory的实现@Jim。
它按预期工作,但是当涉及到同时生成多个这样的工厂时,我开始在刷新期间遇到一些开销,这会使整个过程变形(取消即将发生的几次迭代(。代码如下:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace CA_TasksTests
{
    class Program
    {
        // Result
        public class Result
        {
            public string Message { get; set; }
            public Result(int iter, string info)
            {
                Message = "#" + iter + info;
            }
            public override string ToString()
            {
                return Message;
            }
        }
        // Operation parameters
        public class Operation
        {
            public string OperationName { get; set; }
            public TimeSpan TTL { get { return TimeSpan.FromMilliseconds(Interval); } }
            public DateTime LastUpdate { get; set; }
            public Operation(int id)
            {
                OperationName = "Operation" + ((id < 10) ? "0" : "") + id;
            }
        }
        public static int Interval = 2000;
        public static int Duration = 10000;
        public static int OperationsCount = 10;
        static void Main()
        {
            // Creating 10 test operations
            var operations = Enumerable.Range(1, OperationsCount).Select(i => new Operation(i)).ToList();
            // Executing them
            var r = ExecuteActions(operations).OrderBy(i => i.Message).ToList();
            Console.WriteLine("Results (expected=" + (Duration/Interval*OperationsCount) + ") : " + r.Count);
            Console.ReadLine();
        }
        // Operation execution
        public static Result ExecuteOperation(int iter, Operation operation)
        {
            // Assiging last update timestamp
            operation.LastUpdate = DateTime.Now;
            var t = Task.Factory.StartNew(() =>
                {
                    // Some operation
                    Thread.Sleep(1000);
                    return new Result(iter, operation.OperationName);
                });
            var r = t.Result;
            return r;
        }
        public static List<Result> ExecuteActions(List<Operation> operations)
        {
            var list = new List<Result>();
            var tasks = new ConcurrentBag<Task>();
            foreach (var currentOperation in operations)
            {
                var iter = 0;
                var locker = new object();
                Operation operation = currentOperation;
                var perdiodicTask = PeriodicTaskFactory.Start(() =>
                                {
                                    // (*) Looking if we need updates semantically - 
                                    // through comparing time since last refresh with operation TTL
                                    Console.WriteLine(DateTime.Now + " : " + (DateTime.Now - operation.LastUpdate) + " ?> " + operation.TTL);
                                    // Looking if we need updates logically -
                                    // if previous operation is still running
                                    if (!Monitor.TryEnter(locker))
                                    {
                                        Console.WriteLine(">>>" + DateTime.Now + " Cancelled");
                                        return;
                                    }
                                    try
                                    {
                                        // Semantic update
                                        if (DateTime.Now - operation.LastUpdate > operation.TTL)
                                        {
                                            iter++;
                                            Console.WriteLine(DateTime.Now + " Refresh #" + iter + " " + operation.OperationName);
                                            list.Add(ExecuteOperation(iter, operation));
                                        }
                                    }
                                    finally
                                    {
                                        Monitor.Exit(locker);
                                    }
                                }, intervalInMilliseconds: (int)operation.TTL.TotalMilliseconds, duration: Duration /*maxIterations:2*/);

                var end = perdiodicTask.ContinueWith(_ =>
                    {
                        _.Dispose();
                        Console.WriteLine(">>>" + DateTime.Now + " " + operation.OperationName + " finished");
                    });
                tasks.Add(end);
            }
            Task.WaitAll(tasks.ToArray());
            return list;
        }
    }
    /// <summary>
    /// Factory class to create a periodic Task to simulate a <see cref="System.Threading.Timer"/> using <see cref="Task">Tasks.</see>
    /// </summary>
    public static class PeriodicTaskFactory
    {
        /// <summary>
        /// Starts the periodic task.
        /// </summary>
        /// <param name="action">The action.</param>
        /// <param name="intervalInMilliseconds">The interval in milliseconds.</param>
        /// <param name="delayInMilliseconds">The delay in milliseconds, i.e. how long it waits to kick off the timer.</param>
        /// <param name="duration">The duration.
        /// <example>If the duration is set to 10 seconds, the maximum time this task is allowed to run is 10 seconds.</example></param>
        /// <param name="maxIterations">The max iterations.</param>
        /// <param name="synchronous">if set to <c>true</c> executes each period in a blocking fashion and each periodic execution of the task
        /// is included in the total duration of the Task.</param>
        /// <param name="cancelToken">The cancel token.</param>
        /// <param name="periodicTaskCreationOptions"><see cref="TaskCreationOptions"/> used to create the task for executing the <see cref="Action"/>.</param>
        /// <returns>A <see cref="Task"/></returns>
        /// <remarks>
        /// Exceptions that occur in the <paramref name="action"/> need to be handled in the action itself. These exceptions will not be 
        /// bubbled up to the periodic task.
        /// </remarks>
        public static Task Start(Action action,
                                 int intervalInMilliseconds = Timeout.Infinite,
                                 int delayInMilliseconds = 0,
                                 int duration = Timeout.Infinite,
                                 int maxIterations = -1,
                                 bool synchronous = false,
                                 CancellationToken cancelToken = new CancellationToken(),
                                 TaskCreationOptions periodicTaskCreationOptions = TaskCreationOptions.None)
        {
            //Console.WriteLine(DateTime.Now + " PeriodicTaskFactory.Start");
            Stopwatch stopWatch = new Stopwatch();
            Action wrapperAction = () =>
            {
                CheckIfCancelled(cancelToken);
                action();
            };
            Action mainAction = () =>
            {
                MainPeriodicTaskAction(intervalInMilliseconds, delayInMilliseconds, duration, maxIterations, cancelToken, stopWatch, synchronous, wrapperAction, periodicTaskCreationOptions);
            };
            return Task.Factory.StartNew(mainAction, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Current);
        }
        /// <summary>
        /// Mains the periodic task action.
        /// </summary>
        /// <param name="intervalInMilliseconds">The interval in milliseconds.</param>
        /// <param name="delayInMilliseconds">The delay in milliseconds.</param>
        /// <param name="duration">The duration.</param>
        /// <param name="maxIterations">The max iterations.</param>
        /// <param name="cancelToken">The cancel token.</param>
        /// <param name="stopWatch">The stop watch.</param>
        /// <param name="synchronous">if set to <c>true</c> executes each period in a blocking fashion and each periodic execution of the task
        /// is included in the total duration of the Task.</param>
        /// <param name="wrapperAction">The wrapper action.</param>
        /// <param name="periodicTaskCreationOptions"><see cref="TaskCreationOptions"/> used to create a sub task for executing the <see cref="Action"/>.</param>
        private static void MainPeriodicTaskAction(int intervalInMilliseconds,
                                                   int delayInMilliseconds,
                                                   int duration,
                                                   int maxIterations,
                                                   CancellationToken cancelToken,
                                                   Stopwatch stopWatch,
                                                   bool synchronous,
                                                   Action wrapperAction,
                                                   TaskCreationOptions periodicTaskCreationOptions)
        {
            var iters = duration / intervalInMilliseconds;
            if (iters > 0)
            {
                maxIterations = iters;
            }
            TaskCreationOptions subTaskCreationOptions = TaskCreationOptions.AttachedToParent | periodicTaskCreationOptions;
            CheckIfCancelled(cancelToken);
            if (delayInMilliseconds > 0)
            {
                Thread.Sleep(delayInMilliseconds);
            }
            if (maxIterations == 0) { return; }
            int iteration = 0;
            ////////////////////////////////////////////////////////////////////////////
            // using a ManualResetEventSlim as it is more efficient in small intervals.
            // In the case where longer intervals are used, it will automatically use 
            // a standard WaitHandle....
            // see http://msdn.microsoft.com/en-us/library/vstudio/5hbefs30(v=vs.100).aspx
            using (ManualResetEventSlim periodResetEvent = new ManualResetEventSlim(false))
            {
                ////////////////////////////////////////////////////////////
                // Main periodic logic. Basically loop through this block
                // executing the action
                while (true)
                {
                    CheckIfCancelled(cancelToken);
                    Task subTask = Task.Factory.StartNew(wrapperAction, cancelToken, subTaskCreationOptions, TaskScheduler.Current);
                    if (synchronous)
                    {
                        stopWatch.Start();
                        try
                        {
                            subTask.Wait(cancelToken);
                        }
                        catch { /* do not let an errant subtask to kill the periodic task...*/ }
                        stopWatch.Stop();
                    }
                    // use the same Timeout setting as the System.Threading.Timer, infinite timeout will execute only one iteration.
                    if (intervalInMilliseconds == Timeout.Infinite) { break; }
                    iteration++;
                    if (maxIterations > 0 && iteration >= maxIterations) { break; }
                    try
                    {
                        stopWatch.Start();
                        periodResetEvent.Wait(intervalInMilliseconds, cancelToken);
                        stopWatch.Stop();
                    }
                    finally
                    {
                        periodResetEvent.Reset();
                    }
                    CheckIfCancelled(cancelToken);
                    if (duration > 0 && stopWatch.ElapsedMilliseconds >= duration) { break; }
                }
            }
        }
        /// <summary>
        /// Checks if cancelled.
        /// </summary>
        /// <param name="cancelToken">The cancel token.</param>
        private static void CheckIfCancelled(CancellationToken cancellationToken)
        {
            if (cancellationToken == null)
                throw new ArgumentNullException("cancellationToken");
            cancellationToken.ThrowIfCancellationRequested();
        }
    }
}

TTL 比较检查 (*( 的输出显示:

9/23/2013 2:19:17 PM : 00:00:01.9910000 >? 00:00:02
9/23/2013 2:19:17 PM : 00:00:01.9910000 >? 00:00:02
9/23/2013 2:19:17 PM : 00:00:01.9910000 >? 00:00:02
9/23/2013 2:19:17 PM : 00:00:01.9910000 >? 00:00:02
9/23/2013 2:19:17 PM : 00:00:01.0020000 >? 00:00:02
9/23/2013 2:19:17 PM : 00:00:00.9940000 >? 00:00:02

因此,由于此开销,我很少取消更新。可能导致此问题的原因是什么以及如何解决?我的第一个猜测是线程切换费用,这导致在该比较中设置一些 Epsilon 并接受它。感谢您的帮助。

C# 中的多个并发定期刷新操作

这是一种非常复杂的做事方式。我会建议使用System.Threading.Timer的不同路线。无需锁定,操作可以并发运行。此外,您可以为每个更新设置不同的时间。

防止可重入更新(即 FooUpdate在前一个FooUpdate运行时再次触发(,则可以创建一个一次性计时器,并在每次更新后重新初始化。所以你的计时器看起来像这样:

System.Threading.Timer FooUpdateTimer = new System.Threading.Timer(
    FooUpdate, null, TimeSpan.FromSeconds(2), TimeSpan.Infinite);

您的FooUpdate如下所示:

DateTime LastFooUpdate = DateTime.MinValue;
void FooUpdate(object state)
{
    // check data freshness
    if ((DateTime.UtcNow - LastFooUpdate) > SomeMinimumTime)
    {
        // do update
        // and reset last update time
        LastFooUpdate = DateTime.UtcNow;
    }
    // then, reset the timer
    FooUpdateTimer.Change(TimeSpan.FromSeconds(2), TimeSpan.Infinite);
}

如果您想要每 10 秒运行一次BarUpdate,请使用 10 秒的更新时间复制上面的代码。那是:

System.Threading.Timer BarUpdateTimer = new System.Threading.Timer(
    BarUpdate, null, TimeSpan.FromSeconds(10), TimeSpan.Infinite);
DateTime LastBarUpdate = DateTime.MinValue;
void BarUpdate(object state)
{
    ...
}

如果你只有其中的一两个,那很好。如果您希望有一堆,请将该功能包装到一个类中。我看看。。。

class PeriodicUpdater
{
    private System.Threading.Timer _timer;
    private TimeSpan _interval;
    private DateTime _lastUpdateTime = DateTime.MinValue;
    private Action _updateAction;
    private TimeSpan _freshness;
    public PeriodicUpdater(Action updateAction, TimeSpan interval, TimeSpan freshness)
    {
        _interval = interval;
        _updateAction = updateAction;
        _freshness = freshness;
        _timer = new Timer(TimerTick, null, _interval, TimeSpan.Infinite);
    }
    private void TimerTick(object state)
    {
        if ((DateTime.UtcNow - LastUpdateTime) >= _freshness)
        {
            _updateAction();
            _lastUpdateTime = DateTime.UtcNow;
        }
        _timer.Change(_interval, TimeSpan.Infinite);
    }
}

并创建一个:

var FooUpdater = new PeriodicUpdater(
    FooUpdateAction, 
    TimeSpan.FromSeconds(2.0),
    TimeSpan.FromSeconds(8.0));
var BarUpdater = new PeriodicUpdater(
    BarUpdateAction,
    TimeSpan.FromSeconds(10.0),
    TimeSpan.FromSeconds(15.5));
private void FooUpdateAction()
{
    // do the Foo update
}
private void BarUpdateAction()
{
    // do the Bar update
}

这应该给你一个基本的想法。

更新:取消

如果要添加对取消的支持,请将CancellationToken传递给构造函数,并注册回调。因此,构造函数变为:

    public PeriodicUpdater(Action updateAction, TimeSpan interval, 
        TimeSpan freshness, CancellationToken ct)
    {
        _interval = interval;
        _updateAction = updateAction;
        _freshness = freshness;
        _timer = new Timer(TimerTick, null, _interval, TimeSpan.Infinite);
        ct.Register(Cancel);
    }

然后添加Cancel方法:

    private void Cancel()
    {
        _timer.Change(0, 0);
        _timer.Dispose();
    }