使用 TPL 和/或后台辅助角色处理间歇性 IO 作业

本文关键字:处理 作业 IO 角色 TPL 后台 使用 | 更新日期: 2023-09-27 18:06:40


我目前正在尝试找出一种干净的方法来"收集"并与我的 UI 线程并行运行命令。我有多个窗体,每个设备一个,每个窗体有几个可以发出命令的控件。


当事件由CommandManager处理时,它会形成命令,并将其添加到名为 DeviceCommandWorker 的子类化BackgroundWorker中的BlockingCollection<Command>,该在应用程序打开时启动。它所做的只是循环访问包含Task.Factory.StartNew()调用的代码块。


// Event handler running on the UI Thread
public void ProcessCommand(DeviceCommand command)
    // I assume I cannot add to Commands (BlockingCollection) from here?
// ....
// BackgroundWorker started upon Application startup
// ...
public class DeviceCommandWorker : BackgroundWorker
    public static DeviceCommandWorker Instance { get { return lazyInstance.Value; } }
    private static readonly Lazy<DeviceCommandWorker> lazyInstance = new Lazy<DeviceCommandWorker>(() => new DeviceCommandWorker());
    public BlockingCollection<DeviceCommand> Commands { get; set; } 
    private DeviceCommandWorker()
        WorkerSupportsCancellation = true;
        Commands = new BlockingCollection<DeviceCommand>();
    protected override void OnDoWork(DoWorkEventArgs e)
        while (!CancellationPending)
            var commandTask = Task.Factory.StartNew(() =>
                // If the BackGroundWorker is cancelled, how to esacpe this blocking call?
                DeviceCommand command = commandQueue.Take(); 
                DeviceCommandResult result;
                command.Process(out result);
                if(result == DeviceCommandResult.Error)
                    ; // How would I inform the UI of this result?
        e.Cancel = true;


的第一个问题是我认为我不能从运行BackGroundWorker之外添加到BlockingCollection中?(通过阅读有关 TPL 的信息,添加时不应该有一个我可以锁定的同步对象吗?


最后,如何将命令执行中的成功或错误报告回 UI 线程?我已经看到了这个答案,这是正确的方向吗?

使用 TPL 和/或后台辅助角色处理间歇性 IO 作业


所以我重新设计了代码。我发现没有Task.Factory.StartNew()之外的Take,我实际上只是尽可能快地完成任务,每个任务都等待从BlockingCollection中消耗。所以我把语句移到了循环之外。我还确认,要摆脱阻塞Take我需要发送某种特殊命令,以便我可以停止后台工作线程。最后(未显示(我计划使用 Control.Invoke 将任何失败返回到 UI 线程。

public Boolean StartCommandWorker()
    if (DeviceCommandWorker.Instance.IsBusy)
        return false;
        Console.Out.WriteLine("Called start command worker!");
        return DeviceCommandWorker.Instance.IsBusy;
public void StopCommandWorker()
    Console.Out.WriteLine("Called stop command worker!");
public Boolean ProcessCommand(String command)
    Console.Out.WriteLine("Enqueued command '"" + command + "'" ThreadID: " + Thread.CurrentThread.ManagedThreadId);
    return true;
internal class DeviceCommandWorker : BackgroundWorker
    public static DeviceCommandWorker Instance { get { return lazyInstance.Value; } }
    private static readonly Lazy<DeviceCommandWorker> lazyInstance = new Lazy<DeviceCommandWorker>(() => new DeviceCommandWorker());
    public BlockingCollection<String> Commands { get; set; }
    private DeviceCommandWorker()
        WorkerSupportsCancellation = true;
        Commands = new BlockingCollection<String>();
    protected override void OnDoWork(DoWorkEventArgs e)
        Console.Out.WriteLine("Background Worker Started ThreadID: " + Thread.CurrentThread.ManagedThreadId);
        while (!CancellationPending)
            Console.Out.WriteLine("Waiting for command on ThreadID: " + Thread.CurrentThread.ManagedThreadId);
            String aString = Commands.Take();
            var commandTask = Task.Factory.StartNew(() =>
                Console.Out.WriteLine("   Dequeued command '"" + aString + "'" ThreadID: " + Thread.CurrentThread.ManagedThreadId);
                if (aString.Equals("QUIT"))
                    Console.Out.WriteLine("   Quit worker called: " + aString);
        Console.Out.WriteLine("Background Worker: Stopped!");
        e.Cancel = true;

下面是一些示例输出。我制作了一个小的 UI 窗体,可以用来启动、停止和发送命令。它只是发送一个随机的双精度作为命令。

Called start command worker!
Background Worker Started ThreadID: 17
Waiting for command on ThreadID: 17
Enqueued command "Hello" ThreadID: 10
Waiting for command on ThreadID: 17
   Dequeued command "Hello" ThreadID: 16
Enqueued command "0.0745" ThreadID: 10
Waiting for command on ThreadID: 17
   Dequeued command "0.0745" ThreadID: 12
Enqueued command "0.7043" ThreadID: 10
   Dequeued command "0.7043" ThreadID: 16
Waiting for command on ThreadID: 17
Called stop command worker!
Enqueued command "QUIT" ThreadID: 10
   Dequeued command "QUIT" ThreadID: 12
   Quit worker called: QUIT
Background Worker: Stopped!
Enqueued command "0.2646" ThreadID: 10
Enqueued command "0.1619" ThreadID: 10
Enqueued command "0.5570" ThreadID: 10
Enqueued command "0.4129" ThreadID: 10
Called start command worker!
Background Worker Started ThreadID: 12
Waiting for command on ThreadID: 12
Waiting for command on ThreadID: 12
Waiting for command on ThreadID: 12
Waiting for command on ThreadID: 12
Waiting for command on ThreadID: 12
   Dequeued command "0.2646" ThreadID: 16
   Dequeued command "0.1619" ThreadID: 16
   Dequeued command "0.5570" ThreadID: 16
   Dequeued command "0.4129" ThreadID: 16
Enqueued command "0.8368" ThreadID: 10
   Dequeued command "0.8368" ThreadID: 17
Waiting for command on ThreadID: 12
Called stop command worker!
Enqueued command "QUIT" ThreadID: 10
   Dequeued command "QUIT" ThreadID: 16
   Quit worker called: QUIT
Background Worker: Stopped!