iproducerconsumercollection是什么类型的?用于我的任务

本文关键字:用于 我的 任务 是什么 类型 iproducerconsumercollection | 更新日期: 2023-09-27 17:53:51

我有100个传感器每个"测量"自己的数据。我正好有一个数据发送器,应该从"传感器"发送信息。应该发送最新的信息。

通道带宽可能小于100个传感器产生的数据。在这种情况下,一些数据可以被跳过——但我们应该"大致公平"。例如,我们可以跳过每个传感器的每秒测量。

我不知道每个传感器生成数据的频率,但总的来说它们生成数据的频率相当高。

在我的其他帖子之后:

  • 如何创建总是在单独的线程中运行的单例?
  • 修改后的生产者/消费者的例子,有什么问题吗?

我已经决定我有经典的生产者/消费者问题,与:

  • 100生产者和
  • 1消费者

有人建议我使用BlockingCollectionBlockingCollection的唯一问题-一旦您添加了项目,您就不能替换它。但是在我的应用中,如果传感器产生了一个新值,而之前的值没有被Consumer处理,那么该值应该用替换

我应该使用ConcurentDictionary还是ConcurentBag来完成这个任务?

从概念上讲,我只需要一个包含100个元素的数组。

传感器#33应该的值替换为数组[33]:

| Sensor | Value |
|--------|-------|
|      1 |       |
|      2 |       |
|      3 |       |
/......../......./
|     32 |       |
|     33 | 101.9 |
|     34 |       |
/......../......./
|     98 |       |
|     99 |       |
|    100 |       |

Consumer应该从array[33]获取值,如果不是null,则发送它并设置array[33]为null。

iproducerconsumercollection是什么类型的?用于我的任务

Consumer应该对数组中的非空值尽快做出反应。

我认为你应该实现自己的IProducerConsumerCollection<T>。这就是为什么它是一个接口:这样你就可以很容易地创建自己的。

你可以使用Dictionary<K,V>Queue<T>来确保接收数据是公平的,也就是说,如果你只有一个设备产生数据非常快,你不会只从这个设备发送数据。

public class DeviceDataQueue<TDevice, TData>
    : IProducerConsumerCollection<Tuple<TDevice, TData>>
{
    private readonly object m_lockObject = new object();
    private readonly Dictionary<TDevice, TData> m_data
        = new Dictionary<TDevice, TData>();
    private readonly Queue<TDevice> m_queue = new Queue<TDevice>();
    //some obviously implemented methods elided, just make sure they are thread-safe
    public int Count { get { return m_queue.Count; } }
    public object SyncRoot { get { return m_lockObject; } }
    public bool IsSynchronized { get { return true; } }
    public bool TryAdd(Tuple<TDevice, TData> item)
    {
        var device = item.Item1;
        var data = item.Item2;
        lock (m_lockObject)
        {
            if (!m_data.ContainsKey(device))
                m_queue.Enqueue(device);
            m_data[device] = data;
        }
        return true;
    }
    public bool TryTake(out Tuple<TDevice, TData> item)
    {
        lock (m_lockObject)
        {
            if (m_queue.Count == 0)
            {
                item = null;
                return false;
            }
            var device = m_queue.Dequeue();
            var data = m_data[device];
            m_data.Remove(device);
            item = Tuple.Create(device, data);
            return true;
        }
    }
}

Queue = new BlockingCollection<Tuple<IDevice, Data>>(
    new DeviceDataQueue<IDevice, Data>());
Device1 = new Device(1, TimeSpan.FromSeconds(3), Queue);
Device2 = new Device(2, TimeSpan.FromSeconds(5), Queue);
while (true)
{
    var tuple = Queue.Take();
    var device = tuple.Item1;
    var data = tuple.Item2;
    Console.WriteLine("{0}: Device {1} produced data at {2}.",
        DateTime.Now, device.Id, data.Created);
    Thread.Sleep(TimeSpan.FromSeconds(2));
}

产生如下输出:

30.4.2011 20:40:43: Device 1 produced data at 30.4.2011 20:40:43.
30.4.2011 20:40:45: Device 2 produced data at 30.4.2011 20:40:44.
30.4.2011 20:40:47: Device 1 produced data at 30.4.2011 20:40:47.
30.4.2011 20:40:49: Device 2 produced data at 30.4.2011 20:40:49.
30.4.2011 20:40:51: Device 1 produced data at 30.4.2011 20:40:51.
30.4.2011 20:40:54: Device 2 produced data at 30.4.2011 20:40:54.

不使用另一种数据结构,而是使用另一种技巧。集合中的元素不能被替换,但您可以存储一个迷你容器,而不是存储实际值。当你想替换的时候,你实际上是替换容器中的值,而不是替换容器。

class ElementFromQueue
{
     public object SensorData;
}
...
ElementFromQueue elem = new ElementFromQueue();
elem.SensorData = new object();
...
queue.Add(elem); //Element is in queue now
...
elem.SensorData = new object(); //Update the data, simulating replace

或者只是创建一个索引队列,将指向一个传感器编号。当弹出一个值时,将从另一个可更新的集合

查询最新的传感器值。