线程是否有等待语句

本文关键字:语句 等待 是否 线程 | 更新日期: 2023-09-27 18:30:27

嗨,我

想知道是否有类似于 await 语句的东西,该语句用于任务,我可以用 c# 中的线程实现?

我想做的是:

启动线程 A,计算一些数据并将结果放在变量 x 上。在该变量x被传输到另一个线程 B 之后,同时线程 A 再次开始计算某些数据,而线程 B 使用结果 x 开始另一个计算。

更新:好的,似乎有一些混乱,所以我的描述会更准确:

我使用两个产生数据的传感器。需要以这样一种方式检索数据:检索 SensorA 数据(这需要很长时间),然后立即必须在另一个线程中检索来自 SensorB 的数据,而 SensorA 继续检索另一个数据块。问题是我不能在同一队列中将两个传感器的数据排队,但我需要将两个传感器的数据存储在一个数据结构/对象中。

我的想法是这样的:

  1. 从线程 A 中的传感器 A 获取数据。
  2. 将结果提供给线程 B 并重新启动线程 A。
  3. 当线程 A 再次运行时,线程 B 从传感器 B 获取数据,并从传感器 A 和 B 计算数据

您可以假设线程 A 总是需要比线程 B 更长的时间

线程是否有等待语句

正如我在评论中所说。这看起来像经典的生产者/消费者,我们可以使用例如 BlockingCollection .

这是对该页面中的示例的轻微修改:

BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100);
// "Thread B"
Task.Run(() => 
{
    while (!dataItems.IsCompleted)
    {
        Data dataA = null;
        try
        {
            dataA = dataItems.Take();
        }
        catch (InvalidOperationException) { }
        if (dataA != null)
        {
            var dataB = ReadSensorB();
            Process(dataA,dataB);
        }
    }
    Console.WriteLine("'r'nNo more items to take.");
});
// "Thread A"
Task.Run(() =>
{
    while (moreItemsToAdd)
    {
        Data dataA = ReadSensorA();
        dataItems.Add(dataA);
    }
    // Let consumer know we are done.
    dataItems.CompleteAdding();
});

然后moreItemsToAdd就是您需要处理关闭此过程所需的任何代码。

我不确定你为什么要避免使用任务?也许你使用的是旧版本的 .net?如果是这样,Damien建议的BlockingCollection也不是一种选择。如果使用"普通"线程,则可以使用等待句柄在线程之间发出结果信号。例如,自动重置事件。

private int a;
private AutoResetEvent newResult = new AutoResetEvent(false);
private void ThreadA()
{
    while (true)
    {
        a = GetSensorA();
        newResult.Set();
    }
}
private void ThreadB()
{
    int b;
    while (true)
    {
        newResult.WaitOne();
        b = GetSensorB();         // or before "waitone"
        Console.WriteLine(a + b); // do something
    }
}

编辑:重置时有轻微错误,感谢您指出达米安 - 更新

如果可以使用 .Net 4.5 或更高版本,则解决此问题的最佳方法是使用 TPL 的 DataFlow 组件。

(必须使用 NuGet 安装数据流;默认情况下,它不是 CLR 的一部分。

下面是一个示例可编译控制台应用程序,它演示了如何使用数据流来执行此操作:

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace SensorDemo
{
    public sealed class SensorAData
    {
        public int Data;
    }
    public sealed class SensorBData
    {
        public double Data;
    }
    public sealed class SensorData
    {
        public SensorAData SensorAData;
        public SensorBData SensorBData;
        public override string ToString()
        {
            return $"SensorAData = {SensorAData.Data}, SensorBData = {SensorBData.Data}";
        }
    }
    class Program
    {
        static void Main()
        {
            var sensorADataSource = new TransformBlock<SensorAData, SensorData>(
                sensorAData => addSensorBData(sensorAData), 
                dataflowOptions());
            var combinedSensorProcessor = new ActionBlock<SensorData>(
                data => process(data), 
                dataflowOptions());
            sensorADataSource.LinkTo(combinedSensorProcessor, new DataflowLinkOptions { PropagateCompletion = true });
            // Create a cancellation source that will cancel after a few seconds.
            var cancellationSource = new CancellationTokenSource(delay:TimeSpan.FromSeconds(20));
            Task.Run(() => continuouslyReadFromSensorA(sensorADataSource, cancellationSource.Token));
            Console.WriteLine("Started reading from SensorA");
            sensorADataSource.Completion.Wait(); // Wait for reading from SensorA to complete.
            Console.WriteLine("Completed reading from SensorA.");
            combinedSensorProcessor.Completion.Wait();
            Console.WriteLine("Completed processing of combined sensor data.");   
        }
        static async Task continuouslyReadFromSensorA(TransformBlock<SensorAData, SensorData> queue, CancellationToken cancellation)
        {
            while (!cancellation.IsCancellationRequested)
                await queue.SendAsync(readSensorAData());
            queue.Complete();
        }
        static SensorData addSensorBData(SensorAData sensorAData)
        {
            return new SensorData
            {
                SensorAData = sensorAData,
                SensorBData = readSensorBData()
            };
        }
        static SensorAData readSensorAData()
        {
            Console.WriteLine("Reading from Sensor A");
            Thread.Sleep(1000); // Simulate reading sensor A data taking some time.
            int value = Interlocked.Increment(ref sensorValue);
            Console.WriteLine("Read Sensor A value = " + value);
            return new SensorAData {Data = value}; 
        }
        static SensorBData readSensorBData()
        {
            Console.WriteLine("Reading from Sensor B");
            Thread.Sleep(100); // Simulate reading sensor B data being much quicker.
            int value = Interlocked.Increment(ref sensorValue);
            Console.WriteLine("Read Sensor B value = " + value);
            return new SensorBData {Data = value};
        }
        static void process(SensorData value)
        {
            Console.WriteLine("Processing sensor data: " + value);
            Thread.Sleep(1000); // Simulate slow processing of combined sensor values.
        }
        static ExecutionDataflowBlockOptions dataflowOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 1,
                BoundedCapacity        = 1
            };
        }
        static int sensorValue;
    }
}