我已经扩展了Stephen Cleary的例子为我的数据流管道,并使用


public class ThrottledProducerConsumer<T>
    private class TimerState<T1>
        public SemaphoreSlim Sem;
        public T1 Value;
    private BufferBlock<T> _queue;
    private IPropagatorBlock<T, T> _throttleBlock;
    private List<Task> _consumers;
    private static IPropagatorBlock<T1, T1> CreateThrottleBlock<T1>(TimeSpan Interval, Int32 MaxPerInterval)
        SemaphoreSlim _sem = new SemaphoreSlim(MaxPerInterval);
        return new TransformBlock<T1, T1>(async (x) =>
            var sw = new Stopwatch();
            //Console.WriteLine($"Current count: {_sem.CurrentCount}");
            await _sem.WaitAsync();
            var now = DateTime.UtcNow;
            var releaseTime = now.Add(Interval) - now;
            //-- Using timer as opposed to Task.Delay as I do not want to await or wait for it to complete
            var tm = new Timer((s) => {
                var state = (TimerState<T1>)s;
                //Console.WriteLine($"RELEASE: {state.Value} was released {DateTime.UtcNow:mm:ss:ff} Reset Sem");
            }, new TimerState<T1> { Sem = _sem, Value = x }, (int)Interval.TotalMilliseconds,
                Console.WriteLine($"RELEASE(FAKE): {x} was released {DateTime.UtcNow:mm:ss:ff} Reset Sem");
            //Console.WriteLine($"{x} was tramsformed in {sw.ElapsedMilliseconds}ms. Will release {now.Add(Interval):mm:ss:ff}");
            return x;
             //new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
             new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 10 });
    public ThrottledProducerConsumer(TimeSpan Interval, int MaxPerInterval, Int32 QueueBoundedMax = 5, Action<T> ConsumerAction = null, Int32 MaxConsumers = 1)
        var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1, };
        var linkOptions = new DataflowLinkOptions { PropagateCompletion = true,  };
        //-- Create the Queue
        _queue = new BufferBlock<T>(new DataflowBlockOptions { BoundedCapacity = QueueBoundedMax, });
        //-- Create and link the throttle block
        _throttleBlock = CreateThrottleBlock<T>(Interval, MaxPerInterval);
        _queue.LinkTo(_throttleBlock, linkOptions);
        //-- Create and link the consumer(s) to the throttle block
        var consumerAction = (ConsumerAction != null) ? ConsumerAction : new Action<T>(ConsumeItem);
        _consumers = new List<Task>();
        for (int i = 0; i < MaxConsumers; i++)
            var consumer = new ActionBlock<T>(consumerAction, consumerOptions);
            _throttleBlock.LinkTo(consumer, linkOptions);
        //-- TODO: Add some cancellation tokens to shut this thing down
   /// <summary>
   /// Default Consumer Action, just prints to console
   /// </summary>
   /// <param name="ItemToConsume"></param>
    private void ConsumeItem(T ItemToConsume)
        Console.WriteLine($"Consumed {ItemToConsume} at {DateTime.UtcNow}");
    public async Task EnqueueAsync(T ItemToEnqueue)
        await this._queue.SendAsync(ItemToEnqueue);
    public async Task EnqueueItemsAsync(IEnumerable<T> ItemsToEnqueue)
        foreach (var item in ItemsToEnqueue)
            await this._queue.SendAsync(item);
    public async Task CompleteAsync()
        await Task.WhenAll(_consumers);
        Console.WriteLine($"All consumers completed {DateTime.UtcNow}");


    public class WorkItem<T>
        public TaskCompletionSource<T> tcs;
        //public T respone;
        public string url;
        public WorkItem(string Url)
            tcs = new TaskCompletionSource<T>();
            url = Url;
        public override string ToString()
            return $"{url}";
    public static void TestQueue()
        Console.WriteLine("Created the queue");
        var defaultAction = new Action<WorkItem<String>>(async i => {
            var taskItem = ((WorkItem<String>)i);
            Console.WriteLine($"Consuming: {taskItem.url} {DateTime.UtcNow:mm:ss:ff}");
            //-- Assume calling another async method e.g. await httpClient.DownloadStringTaskAsync(url);
            await Task.Delay(5000);
            //Console.WriteLine($"Consumed: {taskItem.url} {DateTime.UtcNow}");
        var queue = new ThrottledProducerConsumer<WorkItem<String>>(TimeSpan.FromMilliseconds(2000), 5, 2, defaultAction);
        var results = new List<Task>();
        foreach (var no in Enumerable.Range(0, 20))
            var workItem = new WorkItem<String>($"http://someurl{no}.com");
            results.Add(workItem.tcs.Task.ContinueWith(response =>
                Console.WriteLine($"Received: {response.Result} {DateTime.UtcNow:mm:ss:ff}");
        Console.WriteLine("All Work Items Have Been Processed");



  1. 我正在使用BufferBlock作为异步队列,这链接到:
  2. 一个TransformBlock,它提供了我需要的节流和阻塞。它与SempahoreSlim一起使用来控制最大请求数。当每个项目通过块时,它增加信号量,并安排一个任务在以后运行X持续时间,以释放一个信号量。这样,我就有了一个滑动窗口,每个持续时间有X个请求;这正是我想要的。由于TPL,我还利用了连接的并行性:
  3. ActionBlock(s),负责执行我需要的任务。




public class ThrottledProducerConsumer<T>
    private BufferBlock<T> _queue;
    private IPropagatorBlock<T, T> _throttleBlock;
    private List<Task> _consumers;
    private static IPropagatorBlock<T1, T1> CreateThrottleBlock<T1>(TimeSpan Interval, 
        Int32 MaxPerInterval, Int32 BlockBoundedMax = 2, Int32 BlockMaxDegreeOfParallelism = 2)
        SemaphoreSlim _sem = new SemaphoreSlim(MaxPerInterval, MaxPerInterval);
        return new TransformBlock<T1, T1>(async (x) =>
            //Log($"Transform blk: {x} {DateTime.UtcNow:mm:ss:ff} Semaphore Count: {_sem.CurrentCount}");
            var sw = new Stopwatch();
            //Console.WriteLine($"Current count: {_sem.CurrentCount}");
            await _sem.WaitAsync();
            var delayTask = Task.Delay(Interval).ContinueWith((t) =>
                //Log($"Pre-RELEASE: {x} {DateTime.UtcNow:mm:ss:ff} Semaphore Count {_sem.CurrentCount}");
                //Log($"PostRELEASE: {x} {DateTime.UtcNow:mm:ss:ff} Semaphoere Count {_sem.CurrentCount}");
            //Log($"Transformed: {x} in queue {sw.ElapsedMilliseconds}ms. {DateTime.Now:mm:ss:ff} will release {DateTime.Now.Add(Interval):mm:ss:ff} Semaphoere Count {_sem.CurrentCount}");
            return x;
             //-- Might be better to keep Bounded Capacity in sync with the semaphore
             new ExecutionDataflowBlockOptions { BoundedCapacity = BlockBoundedMax,
                 MaxDegreeOfParallelism = BlockMaxDegreeOfParallelism });
    public ThrottledProducerConsumer(TimeSpan Interval, int MaxPerInterval, 
        Int32 QueueBoundedMax = 5, Action<T> ConsumerAction = null, Int32 MaxConsumers = 1, 
        Int32 MaxThrottleBuffer = 20, Int32 MaxDegreeOfParallelism = 10)
        //-- Probably best to link MaxPerInterval and MaxThrottleBuffer 
        //  and MaxConsumers with MaxDegreeOfParallelism
        var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1, };
        var linkOptions = new DataflowLinkOptions { PropagateCompletion = true,  };
        //-- Create the Queue
        _queue = new BufferBlock<T>(new DataflowBlockOptions { BoundedCapacity = QueueBoundedMax, });
        //-- Create and link the throttle block
        _throttleBlock = CreateThrottleBlock<T>(Interval, MaxPerInterval);
        _queue.LinkTo(_throttleBlock, linkOptions);
        //-- Create and link the consumer(s) to the throttle block
        var consumerAction = (ConsumerAction != null) ? ConsumerAction : new Action<T>(ConsumeItem);
        _consumers = new List<Task>();
        for (int i = 0; i < MaxConsumers; i++)
            var consumer = new ActionBlock<T>(consumerAction, consumerOptions);
            _throttleBlock.LinkTo(consumer, linkOptions);
        //-- TODO: Add some cancellation tokens to shut this thing down
   /// <summary>
   /// Default Consumer Action, just prints to console
   /// </summary>
   /// <param name="ItemToConsume"></param>
    private void ConsumeItem(T ItemToConsume)
        Log($"Consumed {ItemToConsume} at {DateTime.UtcNow}");
    public async Task EnqueueAsync(T ItemToEnqueue)
        await this._queue.SendAsync(ItemToEnqueue);
    public async Task EnqueueItemsAsync(IEnumerable<T> ItemsToEnqueue)
        foreach (var item in ItemsToEnqueue)
            await this._queue.SendAsync(item);
    public async Task CompleteAsync()
        await Task.WhenAll(_consumers);
        Console.WriteLine($"All consumers completed {DateTime.UtcNow}");
    private static void Log(String messageToLog)



public class WorkItem<Toutput,Tinput>
    private TaskCompletionSource<Toutput> _tcs;
    public Task<Toutput> Task { get { return _tcs.Task; } }
    public Tinput InputData { get; private set; }
    public Toutput OutputData { get; private set; }
    public WorkItem(Tinput inputData)
        _tcs = new TaskCompletionSource<Toutput>();
        InputData = inputData;
    public void Complete(Toutput result)
    public void Failed(Exception ex)
    public override string ToString()
        return InputData.ToString();


    private Action<WorkItem<Location,PointToLocation>> CreateProcessingAction()
        return new Action<WorkItem<Location,PointToLocation>>(async i => {
            var sw = new Stopwatch();
            var taskItem = ((WorkItem<Location,PointToLocation>)i);
            var inputData = taskItem.InputData;
            //Log($"Consuming: {inputData.Latitude},{inputData.Longitude} {DateTime.UtcNow:mm:ss:ff}");
            //-- Assume calling another async method e.g. await httpClient.DownloadStringTaskAsync(url);
            await Task.Delay(500);
            Location outData = new Location()
                Latitude = inputData.Latitude,
                Longitude = inputData.Longitude,
                StreetAddress = $"Consumed: {inputData.Latitude},{inputData.Longitude} Duration(ms): {sw.ElapsedMilliseconds}"
            //Console.WriteLine($"Consumed: {taskItem.url} {DateTime.UtcNow}");


    int startRange = 0;
    int nextRange = 1000;
    ThrottledProducerConsumer<WorkItem<Location,PointToLocation>> tpc;
    private void cmdTestPipeline_Click(object sender, EventArgs e)
        Log($"Pipeline test started {DateTime.Now:HH:mm:ss:ff}");
        if(tpc == null)
            tpc = new ThrottledProducerConsumer<WorkItem<Location, PointToLocation>>(
                //1010, 2, 20000,
                TimeSpan.FromMilliseconds(1010), 45, 100000,
        var workItems = new List<WorkItem<Models.Location, PointToLocation>>();
        foreach (var i in Enumerable.Range(startRange, nextRange))
            var ptToLoc = new PointToLocation() { Latitude = i + 101, Longitude = i + 100 };
            var wrkItem = new WorkItem<Location, PointToLocation>(ptToLoc);

            wrkItem.Task.ContinueWith(t =>
                var loc = t.Result;
                string line = $"[Simulated:{DateTime.Now:HH:mm:ss:ff}] - {loc.StreetAddress}";
                //txtResponse.Text = String.Concat(txtResponse.Text, line, System.Environment.NewLine);
                //var lines = txtResponse.Text.Split(new string[] { System.Environment.NewLine},
                //    StringSplitOptions.RemoveEmptyEntries).LongCount();
                //lblLines.Text = lines.ToString();
            //}, TaskScheduler.FromCurrentSynchronizationContext());
        startRange += nextRange;
        Log($"Pipeline test completed {DateTime.Now:HH:mm:ss:ff}");