混合异步任务和阻塞同步任务

本文关键字:任务 同步 异步 混合 | 更新日期: 2023-09-27 18:01:52

我正在编写一组异步任务,删除下载和解析数据,但是我在下一步更新数据库时遇到了一点空白。

问题是,出于性能的考虑,我正在使用一个TableLock来加载相当大的数据集,所以我想做的是让我的导入服务等待第一个Task返回,开始导入。如果在第一个导入运行时另一个Task完成,该流程将加入队列并等待任务1的导入服务完成。

异步——Task1——Task2——Task3

同步- ImportService

RunAsync任务
Task3 returns first > ImportService.Import(Task3)
Task1 return, ImportService is still running. Wait()
ImportService.Complete() event
Task2 returns. Wait()
ImportService.Import(Task1)
ImportService.Complete() event
ImportService.Import(Task2)
ImportService.Complete() event

希望这是有意义的!

混合异步任务和阻塞同步任务

这里不能真正使用await,但是可以等待多个任务完成:

var tasks = new List<Task)();
// start the tasks however
tasks.Add(Task.Run(Task1Function);
tasks.Add(Task.Run(Task2Function);
tasks.Add(Task.Run(Task2Function);
while (tasks.Count > 0)
{
   var i = Task.WaitAny(tasks.ToArray()); // yes this is ugly but an array is required
   var task = tasks[i];
   tasks.RemoveAt(i);
   ImportService.Import(task); // do you need to pass the task or the task.Result
}

在我看来应该有一个更好的选择。您可以让任务和导入运行,并在ImportService部分添加锁,例如:

// This is the task code doing whatever
....
// Task finishes and calls ImportService.Import
lock(typeof(ImportService)) // actually the lock should probably be inside the Import method
{
   ImportService.Import(....);
}

有几件事让我对你的需求感到困扰(包括使用静态ImportService,静态类很少是一个好主意),但没有进一步的细节,我无法提供更好的建议。

虽然这可能不是最优雅的解决方案,但我会尝试启动工作任务并让它们将其输出放在ConcurrentQueue中。您可以在计时器上检查工作队列,直到所有任务完成。

var rand = new Random();
var importedData = new List<string>();
var results = new ConcurrentQueue<string>();
var tasks = new List<Task<string>>
{
    new Task<string>(() =>
    {
        Thread.Sleep(rand.Next(1000, 5000));
        Debug.WriteLine("Task 1 Completed");
        return "ABC";
    }),
    new Task<string>(() =>
    {
        Thread.Sleep(rand.Next(1000, 5000));
        Debug.WriteLine("Task 2 Completed");
        return "FOO";
    }),
    new Task<string>(() =>
    {
        Thread.Sleep(rand.Next(1000, 5000));
        Debug.WriteLine("Task 3 Completed");
        return "BAR";
    })
};
tasks.ForEach(t =>
{
    t.ContinueWith(r => results.Enqueue(r.Result));
    t.Start();
});
var allTasksCompleted = new AutoResetEvent(false);
new Timer(state =>
{
    var timer = (Timer) state;
    string item;
    if (!results.TryDequeue(out item)) 
        return;
    importedData.Add(item);
    Debug.WriteLine("Imported " + item);
    if (importedData.Count == tasks.Count)
    {
        timer.Dispose();
        Debug.WriteLine("Completed.");
        allTasksCompleted.Set();
    }
}).Change(1000, 100);

allTasksCompleted.WaitOne();