确保线程池中的任务执行顺序
本文关键字:任务 执行 顺序 线程 确保 | 更新日期: 2023-09-27 18:06:43
我一直在阅读有关线程池模式的信息,但我似乎找不到以下问题的常用解决方案。
我有时希望任务串行执行。例如,我从文件中读取文本块,出于某种原因,我需要按该顺序处理这些块。所以基本上我想消除某些任务的并发性。
请考虑此方案,其中需要按推送顺序处理具有*
的任务。其他任务可以按任意顺序处理。
push task1
push task2
push task3 *
push task4 *
push task5
push task6 *
....
and so on
在线程池的上下文中,如果没有此约束,单个待处理任务队列可以正常工作,但显然在这里它不能。
我想过让一些线程在特定于线程的队列上运行,而其他线程在"全局"队列上运行。然后,为了串行执行某些任务,我只需将它们推送到单个线程查找的队列中即可。听起来确实有点笨拙。
所以,这个长篇故事中真正的问题是:你会如何解决这个问题?您将如何确保这些任务按顺序排列?
编辑
作为一个更普遍的问题,假设上面的场景变成
push task1
push task2 **
push task3 *
push task4 *
push task5
push task6 *
push task7 **
push task8 *
push task9
....
and so on
我的意思是,组中的任务应该按顺序执行,但组本身可以混合执行。因此,例如,您可以拥有3-2-5-4-7
。
需要注意的另一件事是,我无法预先访问组中的所有任务(而且我不能在开始组之前等待所有任务到达(。
如下所示的内容将允许串行和并行任务排队,其中串行任务将一个接一个地执行,并行任务将以任何顺序执行,但并行。这使您能够在必要时序列化任务,也可以执行并行任务,但在收到任务时执行此操作,即您不需要预先了解整个序列,执行顺序是动态维护的。
internal class TaskQueue
{
private readonly object _syncObj = new object();
private readonly Queue<QTask> _tasks = new Queue<QTask>();
private int _runningTaskCount;
public void Queue(bool isParallel, Action task)
{
lock (_syncObj)
{
_tasks.Enqueue(new QTask { IsParallel = isParallel, Task = task });
}
ProcessTaskQueue();
}
public int Count
{
get{lock (_syncObj){return _tasks.Count;}}
}
private void ProcessTaskQueue()
{
lock (_syncObj)
{
if (_runningTaskCount != 0) return;
while (_tasks.Count > 0 && _tasks.Peek().IsParallel)
{
QTask parallelTask = _tasks.Dequeue();
QueueUserWorkItem(parallelTask);
}
if (_tasks.Count > 0 && _runningTaskCount == 0)
{
QTask serialTask = _tasks.Dequeue();
QueueUserWorkItem(serialTask);
}
}
}
private void QueueUserWorkItem(QTask qTask)
{
Action completionTask = () =>
{
qTask.Task();
OnTaskCompleted();
};
_runningTaskCount++;
ThreadPool.QueueUserWorkItem(_ => completionTask());
}
private void OnTaskCompleted()
{
lock (_syncObj)
{
if (--_runningTaskCount == 0)
{
ProcessTaskQueue();
}
}
}
private class QTask
{
public Action Task { get; set; }
public bool IsParallel { get; set; }
}
}
更新
若要处理具有串行和并行任务组合的任务组,GroupedTaskQueue
可以管理每个组的TaskQueue
。同样,您不需要预先了解组,这一切都是在收到任务时动态管理的。
internal class GroupedTaskQueue
{
private readonly object _syncObj = new object();
private readonly Dictionary<string, TaskQueue> _queues = new Dictionary<string, TaskQueue>();
private readonly string _defaultGroup = Guid.NewGuid().ToString();
public void Queue(bool isParallel, Action task)
{
Queue(_defaultGroup, isParallel, task);
}
public void Queue(string group, bool isParallel, Action task)
{
TaskQueue queue;
lock (_syncObj)
{
if (!_queues.TryGetValue(group, out queue))
{
queue = new TaskQueue();
_queues.Add(group, queue);
}
}
Action completionTask = () =>
{
task();
OnTaskCompleted(group, queue);
};
queue.Queue(isParallel, completionTask);
}
private void OnTaskCompleted(string group, TaskQueue queue)
{
lock (_syncObj)
{
if (queue.Count == 0)
{
_queues.Remove(group);
}
}
}
}
线程池适用于任务的相对顺序无关紧要的情况,前提是它们都完成了。特别是,它们必须并行完成。
如果任务必须按特定顺序完成,则它们不适合并行,因此线程池不合适。
如果要将这些串行任务移出主线程,则具有任务队列的单个后台线程将适用于这些任务。您可以继续将线程池用于适合并行的其余任务。
是的,这意味着您必须根据任务是按顺序任务还是"可以并行化"的任务来决定在哪里提交任务,但这没什么大不了的。
如果您有必须序列化但可以与其他任务并行运行的组,则有多种选择:
- 为每个组创建一个任务,
- 该任务按顺序执行相关的组任务,并将此任务发布到线程池。
- 让组中的每个任务显式等待组中的每个任务,并将它们发布到线程池。这要求线程池可以处理线程正在等待尚未计划的任务而不会死锁的情况。
- 为每个组设置一个专用线程,并在相应的消息队列中发布组任务。
基本上,有许多待处理的任务。某些任务只能在一个或多个其他挂起任务完成执行后执行。
可以在依赖关系图中对挂起的任务进行建模:
- "任务 1 ->任务 2"表示"任务 2 只能在任务 1 完成后执行",箭头指向执行顺序的方向。 任务
- 的入位数(指向它的任务数(决定了任务是否已准备好执行。如果 indegree 为 0,则可以执行。
- 有时一个任务必须等待多个任务完成,然后是>1。
- 如果任务不再需要等待其他任务完成(其入度为零(,则可以将其提交到包含工作线程的线程池,或提交到包含等待由工作线程选取的任务的队列。您知道提交的任务不会导致死锁,因为任务不会等待任何内容。作为优化,您可以使用优先级队列,例如,将首先执行依赖关系图中更多任务所依赖的任务。这也不会引发死锁,因为线程池中的所有任务都可以执行。然而,它会引起饥饿。
- 如果任务完成执行,则可以将其从依赖关系图中删除,从而可能减少其他任务的无度,而这些任务又可以提交到工作线程池中。
因此,(至少(有一个线程用于添加/删除待处理任务,并且存在一个工作线程的线程池。
将任务添加到依赖关系图时,必须检查:
- 任务
- 在依赖关系图中的连接方式:它必须等待哪些任务才能完成,哪些任务必须等待它完成?相应地从新任务中绘制连接。
- 绘制连接后:新连接是否在依赖关系图中导致任何循环?如果是这样,则会出现僵局。
性能:
- 如果并行执行实际上几乎不可能,则此模式比顺序执行慢,因为无论如何,您都需要额外的管理才能几乎按顺序执行所有操作。
- 如果在实践中可以同时执行许多任务,则此模式很快。
假设:
正如您可能已经从字里行间读到的那样,您必须设计任务,以便它们不会干扰其他任务。此外,必须有一种方法来确定任务的优先级。任务优先级应包括每个任务处理的数据。两个任务不能同时更改同一对象;其中一个任务应优先于另一个任务,或者对对象执行的操作必须是线程安全的。
要对线程池执行要执行的操作,可能需要创建某种调度程序。
像这样:
TaskQueue -> Scheduler -> Queue -> ThreadPool
调度程序在其自己的线程中运行,跟踪作业之间的依赖关系。当作业准备好完成时,调度程序只需将其推送到线程池的队列中。
线程池可能必须向调度程序发送信号以指示作业何时完成,以便调度程序可以将依赖于该作业的作业放入队列中。
在您的情况下,依赖项可能存储在链表中。
假设您有以下依赖项:3 -> 4 -> 6 -> 8
作业3 正在线程池上运行,您仍然不知道作业 8 存在。
作业 3 结束。从链表中删除 3,将作业 4 放在线程池的队列中。
约伯8到来。你把它放在链表的末尾。
唯一必须完全同步的构造是调度程序之前和之后的队列。
如果我正确理解了这个问题,jdk 执行器没有此功能,但很容易推出自己的功能。你基本上需要
- 工作线程
- 池,每个工作线程都有一个专用队列
- 对你提供工作的那些队列的一些抽象(c.f. the
ExecutorService
- 某种算法,可以确定地为每项工作选择特定的队列
- 然后,每件工作都会被报价发送到正确的队列,从而以正确的顺序进行处理
与 jdk 执行程序的区别在于,它们有 1 个队列和 n 个线程,但您需要 n 个队列和 m 个线程(其中 n 可能等于也可能不等于 m(
*阅读每个任务都有一个键后编辑 *
更详细一点
- 编写一些代码,将键转换为给定范围内的索引(int(,其中 n 是您想要的线程数(,这可以像
key.hashCode() % n
一样简单,也可以是已知键值到线程或任何您想要的静态映射 - 启动时
- 创建 n 个队列,将它们放在索引结构中(数组,列出任何内容(
- 启动 N 个线程,每个线程只是从队列中执行一个阻塞获取
- 当它收到一些工作时,它知道如何执行特定于该任务/事件的工作(如果你有异构事件,你显然可以将任务映射到操作(
- 将其存储在接受工作项的某个外观后面
- 当任务到达时,将其交给门面
- 外观根据键为任务找到正确的队列,并将其提供给该队列
添加到此方案中非常容易,然后您只需要工作线程向某个管理器注册以声明"我拥有此队列",然后围绕该 + 检测线程中的错误进行一些内务处理(这意味着它取消注册该队列的所有权,将队列返回到空闲队列池,这是启动新线程的触发器(
我认为线程池可以在这种情况下有效使用。这个想法是为每组依赖任务使用单独的strand
对象。您可以使用或不带strand
对象将任务添加到队列中。对依赖任务使用相同的strand
对象。您的计划程序检查下一个任务是否有strand
以及此strand
是否已锁定。如果没有 - 锁定此strand
并运行此任务。如果strand
已锁定 - 请将此任务保留在队列中,直到下一个计划事件。任务完成后,解锁其strand
。
因此,您需要单个队列,不需要任何额外的线程,不需要复杂的组等。 strand
对象可以通过两种方法非常简单 lock
和 unlock
.
我经常遇到相同的设计问题,例如,对于处理多个同时会话的异步网络服务器。当会话内的任务从属时,会话是独立的(这会将它们映射到您的独立任务和从属任务组((这会将会话内部任务映射到组内的从属任务(。使用所描述的方法,我完全避免了会话中的显式同步。每个会话都有自己的strand
对象。
更重要的是,我使用这个想法的现有(伟大的(实现:Boost Asio 库 (C++(。我只是用了他们的术语strand
.实现是优雅的:在调度它们之前,我将异步任务包装到相应的strand
对象中。
选项 1 - 复杂的
由于您有顺序作业,因此您可以将这些作业收集到一个链中,并让作业本身在完成后重新提交到线程池中。假设我们有一个作业列表:
[Task1, ..., Task6]
就像你的例子一样。我们有一个顺序依赖关系,因此[Task3, Task4, Task6]
是一个依赖链。我们现在做一个工作(Erlang伪代码(:
Task4Job = fun() ->
Task4(), % Exec the Task4 job
push_job(Task6Job)
end.
Task3Job = fun() ->
Task3(), % Execute the Task3 Job
push_job(Task4Job)
end.
push_job(Task3Job).
也就是说,我们通过将其包装到一个作业中来更改Task3
作业,该作业作为延续将队列中的下一个作业推送到线程池。这里与一般的延续传递风格有很强的相似之处,在Node.js
或Pythons Twisted
框架等系统中也可以看到。
概括,您可以创建一个系统,您可以在其中定义作业链,这些作业链可以defer
进一步的工作并重新提交进一步的工作。
选项 2 - 简单的一个
为什么我们甚至费心拆分工作?我的意思是,由于它们是顺序依赖的,因此在同一线程上执行所有这些操作不会比获取该链并将其分散到多个线程上更快或更慢。假设"足够"的工作负载,任何线程都将始终有工作,因此将作业捆绑在一起可能是最简单的:
Task = fun() ->
Task3(),
Task4(),
Task6() % Just build a new job, executing them in the order desired
end,
push_job(Task).
如果你有作为一等公民的函数,那么做这样的事情是相当容易的,所以你可以随心所欲地用你的语言构建它们,就像你可以在任何函数式编程语言、Python、Ruby-blocks等中一样。
我不是特别喜欢构建队列或延续堆栈的想法,就像在"选项 1"中一样,我肯定会选择第二个选项。在 Erlang 中,我们甚至有一个名为 jobs
的程序,由 Erlang Solutions 编写并作为开源发布。 jobs
旨在执行和加载此类作业执行。如果要解决此问题,我可能会将选项 2 与工作结合起来。
建议不使用线程池的答案就像硬编码任务依赖关系/执行顺序的知识一样。相反,我将创建一个管理两个任务之间的开始/结束依赖关系的CompositeTask
。通过将依赖关系封装在任务接口后面,可以统一处理所有任务,并将其添加到池中。这将隐藏执行详细信息,并允许更改任务依赖项,而不会影响您是否使用线程池。
这个问题没有指定语言 - 我将使用Java,我希望大多数人都可以阅读。
class CompositeTask implements Task
{
Task firstTask;
Task secondTask;
public void run() {
firstTask.run();
secondTask.run();
}
}
这将在同一线程上按顺序执行任务。您可以将多个CompositeTask
链接在一起,以根据需要创建一系列顺序任务。
这里的缺点是,这会在按顺序执行的所有任务的持续时间内占用线程。您可能希望在第一个任务和第二个任务之间执行其他任务。因此,与其直接执行第二个任务,不如让第二个任务的复合任务计划执行:
class CompositeTask implements Runnable
{
Task firstTask;
Task secondTask;
ExecutorService executor;
public void run() {
firstTask.run();
executor.submit(secondTask);
}
}
这可确保第二个任务在第一个任务完成之前不会运行,并且还允许池执行其他(可能更紧急的(任务。请注意,第一个和第二个任务可以在单独的线程上执行,因此尽管它们不并发执行,但任务使用的任何共享数据都必须对其他线程可见(例如,通过使变量volatile
.(
这是一种简单但功能强大且灵活的方法,它允许任务本身定义执行约束,而不是使用不同的线程池来执行此操作。
使用两个活动对象。简而言之:活动对象模式由优先级队列和 1 个或多个工作线程组成,这些线程可以从队列中获取任务并处理它。
因此,使用一个活动对象和一个工作线程:所有作为队列位置的任务都将按顺序处理。使用工作线程数大于 1 的第二个活动对象。在这种情况下,工作线程将以任何顺序从队列中获取和处理任务。
运气。
我认为你在混合概念。当您想在线程之间分配一些工作时,线程池是可以的,但是如果您开始在线程之间混合依赖关系,那么这不是一个好主意。
我的建议是,不要将线程池用于这些任务。只需创建一个专用线程,并保留一个简单的顺序项队列,这些队列必须由该线程单独处理。然后,您可以在没有顺序要求时继续将任务推送到线程池,并在有顺序要求时使用专用线程。
澄清:使用常识,串行任务队列应由单个线程执行,每个任务一个接一个地处理:)
我了解您的情况,这是可以实现的。基本上你需要做的是做一些聪明的事情来协调你在主线程中的任务。您需要的Java API是ExecutorCompletionService和Callable
首先,实现可调用任务:
public interface MyAsyncTask extends Callable<MyAsyncTask> {
// tells if I am a normal or dependent task
private boolean isDependent;
public MyAsyncTask call() {
// do your job here.
return this;
}
}
然后在主线程中,使用 CompletionService 协调依赖任务的执行(即等待机制(:
ExecutorCompletionService<MyAsyncTask> completionExecutor = new
ExecutorCompletionService<MyAsyncTask>(Executors.newFixedThreadPool(5));
Future<MyAsyncTask> dependentFutureTask = null;
for (MyAsyncTask task : tasks) {
if (task.isNormal()) {
// if it is a normal task, submit it immediately.
completionExecutor.submit(task);
} else {
if (dependentFutureTask == null) {
// submit the first dependent task, get a reference
// of this dependent task for later use.
dependentFutureTask = completionExecutor.submit(task);
} else {
// wait for last one completed, before submit a new one.
dependentFutureTask.get();
dependentFutureTask = completionExecutor.submit(task);
}
}
}
通过这样做,您使用单个执行器(线程池大小 5(执行正常任务和依赖任务,正常任务在提交后立即执行,依赖任务逐个执行(在提交新的依赖任务之前,通过在 Future 上调用 get(( 在主线程中执行等待(,因此在任何时间点, 您始终有许多正常任务和一个依赖任务(如果存在(在单个线程池中运行。
这只是一个良好的开端,通过使用 ExecutorCompletionService、FutureTask 和信号量,可以实现更复杂的线程协调方案。
您将如何确保这些任务按顺序排列?
push task1
push task2
push task346
push task5
响应编辑:
push task1
push task27 **
push task3468 *
push task5
push task9
您有两种不同类型的任务。将它们混合在一个队列中感觉很奇怪。而不是让一个队列有两个。为了简单起见,您甚至可以同时使用ThreadPoolExecutor。对于串行任务,只需为其提供固定大小 1,对于可以并发执行的任务,只需为其提供更多大小。我不明白为什么这会很笨拙。保持简单和愚蠢。您有两个不同的任务,因此请相应地对待它们。
由于您只需要等待单个任务完成即可启动依赖任务,因此如果可以在第一个任务中安排依赖任务,则可以轻松完成。所以在你的第二个例子中:在任务 2 结束时,计划任务 7和在任务 3 结束时,为 4->6 和 6->8 安排任务 4,依此类推。
一开始,只需安排任务 1、2、5、9 ...其余的应该随之而来。
一个更普遍的问题是,当您必须等待多个任务才能启动依赖任务时。有效地处理这一点是一项不平凡的工作。
通过使用ConcurrentExclusiveSchedulerPair
实例的 ExclusiveScheduler
属性,并在每次启动任务时将其用作TaskScheduler
,在ThreadPool
上串行执行任务非常简单:
var taskFactory = new TaskFactory(
new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler);
Task task1 = taskFactory.StartNew(() => DoSomething());
Task task2 = taskFactory.StartNew(() => DoSomethingElse());
DoSomething()
和DoSomethingElse
都将在ThreadPool
上运行,一个接一个。可以保证这两个调用不会重叠,并且它们将按照最初计划的相同顺序调用。
但是,如果这些调用中的任何一个失败,会发生什么?问题是:DoSomething()
或DoSomethingElse
抛出的任何异常都将被困在各自的Task
(task1
或task2
(中。这意味着我们不能只是开始任务并忘记它们。我们有责任将任务存储在某个地方,并最终await
它们并处理它们的异常。这可能正是我们想要的。
但是,如果我们只想安排任务并"忘记"它们,并且在不太可能的情况下,它们中的任何一个都无法让异常作为未处理的异常传播并终止进程,该怎么办?这并不像听起来那么疯狂。某些任务可能对应用程序的生命周期至关重要,并且不太可能失败,并且很难设计手动观察其异常的策略,以至于将其异常升级到即时应用程序终止(在引发AppDomain.UnhandledException
事件之后(可能是可用选项中较小的弊端。那么有可能做到这一点吗?是的,但令人惊讶的困难和棘手:
using System.Runtime.ExceptionServices;
var taskFactory = new TaskFactory(
new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler);
void RunOnThreadPoolExclusive(Action action)
{
_ = taskFactory.StartNew(() =>
{
try
{
action();
}
catch (Exception ex)
{
var edi = ExceptionDispatchInfo.Capture(ex);
ThreadPool.QueueUserWorkItem(_ => edi.Throw());
}
});
}
RunOnThreadPoolExclusive(() => DoSomething());
RunOnThreadPoolExclusive(() => DoSomethingElse());
action
在 try/catch 块中调用。如果失败,将在ExceptionDispatchInfo
实例中捕获异常,以保留其堆栈跟踪,然后在ThreadPool
上重新引发异常。请注意,taskFactory.StartNew
仍然返回一个 Task
,该 是使用 discard ( _
( 丢弃的,因为现在任务极不可能失败。但我们真的取得了任何进展吗?我们一开始的前提是DoSomething
不太可能失败,我们最终放弃了一个我们认为极不可能失败的Task
。确实不是很满意!我们能做得更好吗?是的!进入臭名昭著的async void
世界:
var taskFactory = new TaskFactory(
new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler);
async void RunOnThreadPoolExclusive(Action action)
{
await taskFactory.StartNew(action);
}
RunOnThreadPoolExclusive(() => DoSomething());
RunOnThreadPoolExclusive(() => DoSomethingElse());
async void
方法具有有趣的特征,即在async void
方法启动时捕获的SynchronizationContext
上引发其中的任何异常,或者在ThreadPool
上(作为回退(。因此,例如,如果在 WinForms 应用程序的 UI 线程上调用RunOnThreadPoolExclusive
,并且操作失败,则会弹出一个消息框,询问用户是要继续还是退出应用程序(屏幕截图(。因此,该错误不一定是致命的,因为用户可以选择忽略该错误并继续。这可能正是我们想要的。或者可能不会。
澄清一下,错误将在UI线程上抛出,但仍会在ThreadPool
上调用DoSomething()
/DoSomethingElse()
。这一点没有改变。
那么,我们究竟如何确保错误将被抛到ThreadPool
,而不是其他地方,无论如何,无论当前上下文如何,并且不允许任何任务变得一劳永逸? 方法如下:
var taskFactory = new TaskFactory(
new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler);
void RunOnThreadPoolExclusive(Action action)
{
Task task = taskFactory.StartNew(action);
ThreadPool.QueueUserWorkItem(async state => await (Task)state, task);
}
RunOnThreadPoolExclusive(() => DoSomething());
RunOnThreadPoolExclusive(() => DoSomethingElse());
在ThreadPool
上以正确的顺序序列化执行,在ThreadPool
上抛出错误,并且没有泄漏的即发即弃任务。完善!
有一个专门用于此目的的Java框架,称为dexecutor(免责声明:我是所有者(
DefaultDependentTasksExecutor<String, String> executor = newTaskExecutor();
executor.addDependency("task1", "task2");
executor.addDependency("task4", "task6");
executor.addDependency("task6", "task8");
executor.addIndependent("task3");
executor.addIndependent("task5");
executor.addIndependent("task7");
executor.execute(ExecutionBehavior.RETRY_ONCE_TERMINATING);
任务 1、任务 3、任务 5、任务 7 并行运行(取决于线程池大小(,一旦任务 1
有很多答案,显然有一个已经被接受了。但是为什么不使用延续呢?
如果您有已知的"串行"条件,则当您将第一个任务排队时,请按住该任务;对于其他任务,请调用 Task.ContinueWith((。
public class PoolsTasks
{
private readonly object syncLock = new object();
private Task serialTask = Task.CompletedTask;
private bool isSerialTask(Action task) {
// However you determine what is serial ...
return true;
}
public void RunMyTask(Action myTask) {
if (isSerialTask(myTask)) {
lock (syncLock)
serialTask = serialTask.ContinueWith(_ => myTask());
} else
Task.Run(myTask);
}
}
具有有序和无序执行方法的线程池:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class OrderedExecutor {
private ExecutorService multiThreadExecutor;
// for single Thread Executor
private ThreadLocal<ExecutorService> threadLocal = new ThreadLocal<>();
public OrderedExecutor(int nThreads) {
this.multiThreadExecutor = Executors.newFixedThreadPool(nThreads);
}
public void executeUnordered(Runnable task) {
multiThreadExecutor.submit(task);
}
public void executeOrdered(Runnable task) {
multiThreadExecutor.submit(() -> {
ExecutorService singleThreadExecutor = threadLocal.get();
if (singleThreadExecutor == null) {
singleThreadExecutor = Executors.newSingleThreadExecutor();
threadLocal.set(singleThreadExecutor);
}
singleThreadExecutor.submit(task);
});
}
public void clearThreadLocal() {
threadLocal.remove();
}
}
填充所有队列后,应清除线程本地。唯一的缺点是每次方法都会创建单线程执行器
执行有序(可运行任务(
在单独的线程中调用