c#检查线程/后台任务是否已完成或需要中止
本文关键字:已完成 是否 检查 线程 后台任务 | 更新日期: 2023-09-27 18:28:11
我对控制台/窗口服务有一个相当简单的要求(我可以在任何一种模式下运行它)应用程序:
- 从数据库中获取要处理的项目列表
- 在后台启动一个方法(只有一个,不需要更多)来处理一个项目
- 检查它是否已完成或需要终止(通过sql查找)
- 完成时重复2次/中止下一次
- 不再时,睡一会儿,重复1
我是c#/.net的新手,可以看到各种各样的线程系统。对于这种情况,线程和任务哪个更好?
在线程的情况下,我假设每个项目都要处理类似于(这个粗略的代码):
Thread thread = new MyThread(new ThreadStart(this.SomeFunction));
thread.Start();
while(!finished) {
if (!thread.IsAlive())
finished=true;
else {
//check database for early termination of job
terminate=SomeChdck();
if(terminate) { thread.Abort(); finished=true;}
}
}
//返回并重复
或者在Task的情况下,它类似于(同样,粗略地完成,并从网络上截取):
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
Task task = Task.Factory.StartNew(this.SomeFunction, token);
while(!finished) {
if (task.IsCompleted)
finished=true;
else {
//check database for early termination of job
terminate=SoneChdck();
if(terminate) { tokenSource.Cancel(); finished=true;}
}
}
//返回并重复
这两种方法有区别吗(假设它们都能工作),我在某个地方读到Thread.Artrt()被弃用,但文档中没有提到。
谢谢。
这通常被称为生产者/消费者问题。你有一个线程(你的主线程),它将为你的后台线程(消费者)生成许多项目。
net的任务并行库中的BlockingCollection对此非常有用。看看这篇文章:http://blogs.msdn.com/b/csharpfaq/archive/2010/08/12/blocking-collection-and-the-producer-consumer-problem.aspx
此外,我建议您查看Darek在他的帖子中建议的"来自并行扩展插件的管道"项目。
一般来说,我建议Tasks或ThreadPool(按顺序)不要手动新建线程。线程池旨在通过池化来减少构造多个线程的开销。任务使用线程池。应该很少需要手动创建线程。
使用任务工厂。您可以使用等待任务完成
Task.WaitAll(task);
而不是示例代码中的while循环。
此外,CancelationToken用于向任务发出应该取消的信号,因此您需要自己执行检查。
http://msdn.microsoft.com/EN-US/library/vstudio/dd997396(v=vs.100).aspx
然而,如果您考虑并行扩展插件的Pipeline,您会更好,它也支持取消令牌。通过这种方式,您可以从DB中检索记录,然后将它们作为IEnumerable传递给管道,同时您可以自由地从单独的或内部线程中取消所有记录。您可以开始处理第一条记录,而其余记录则在后台检索。Pipeline将为其要处理的每个元素的每个步骤创建一个后台任务。每个步骤的默认平行度为1。它非常快速高效。
更新
Dapper、Parallel Extensions Extras和Reactive Extensions 的小示例
var pipeline = Pipeline.Create<SomeType, bool>(st =>
{
//Do something with st
return someBool; //some bool if you succeeded or not
});
var cts = new CancellationTokenSource();
//cancel after 10s (just for fun)
Observable.Timer(TimeSpan.FromSeconds(10)).Subscribe(s => cts.Cancel());
using (var conn = new SqlConnection("someConnectionString"))
{
conn.Open();
pipeline.Process(conn.Query<SomeType>("SOME SQL HERE", buffered:true),cts.Token).ToList();
}
选择此选项的原因是为了证明使用Dapper是多么容易,并行扩展插件是多么强大和方便,但对于您的示例,它是故意过度设计的…:)我希望你能原谅我。结尾的ToList()是必需的,否则不会对IEnumerable执行任何操作。或者你可以使用这种方法:
Console.WriteLine(
pipeline.Process(conn.Query<SomeType>("SOME SQL HERE", buffered: true), cts.Token).All(b => b)
? "All records processed successfully"
: "Some records failed");
如果您想从数据处理步骤内部取消,请首先声明cts:
var cts = new CancellationTokenSource();
var pipeline = Pipeline.Create<SomeType,bool>(st =>
{
//Do something with st
//you could even cancel from here
if(someOtherBool)
cts.Cancel();
return someBool; //some bool if you succeeded or not for example
});
如果你不想声明一个特定的类型:
var cts = new CancellationTokenSource();
var pipeline = Pipeline.Create<dynamic,bool>(d =>
{
//Do something with data
if(someOtherBool)
cts.Cancel();
return someBool; //some bool if you succeeded or not
});
using (var conn = new SqlConnection("someConnectionString"))
{
conn.Open();
foreach (var b in pipeline.Process(conn.Query("SOME SQL HERE", buffered: true), cts.Token))
{
Console.WriteLine(b?"Success":"Failure");
}
}
最后要提到的是cts。Cancel()通常在内线程上引发异常,因此如果需要,请将管道封装在try/catch中。
更新2
在阅读了作者的评论后,我仍然会选择Dapper、PEE和Rx的组合(双关语)。
var cts = new CancellationTokenSource();
var pipeline = Pipeline.Create<dynamic, dynamic>(d =>
{
//Do something with data in step 1
if (someConditionalCheck)
cts.Cancel();
return d;
}).Next<dynamic>(d =>
{
//do something with data is step 2
if(someConditionalCheck)
cts.Cancel();
return d;
});
subscription = Observable.Interval(TimeSpan.FromMinutes(1)).Subscribe(_ =>
{
try
{
using (var conn = new SqlConnection("someConnectionString"))
{
conn.Open();
foreach (var v in pipeline.Process(conn.Query("SOME SQL HERE", buffered: true), cts.Token))
{
//Do something with or ignore the result
}
}
}
catch (AggregateException e)
{
//Investigate what happened, could be error in processing
//or operation cancelled
}
catch (Exception e)
{
//All other exceptions
}
});
Rx可以让我创建一个整洁的可观察对象,它每分钟都会发射一次。我还可以设计一个在前一次跑步一段时间不活动后启动的,在这种情况下我更喜欢间隔。
PEE让我创建了一个整洁的工作流程,在那里我可以指定对从数据库检索的一个数据项执行的多个步骤。访问CancellationTokenSource可以让我在每一步完成后立即取消所有步骤,因此,如果一条记录在步骤1中,另一条在步骤N中,那么一旦完成各自的代码块,这两条记录都将被取消。
Dapper只是一个与数据库对话的时间服务器。
然而,正如你所知,我并没有真正使用线程或任务,所以我在这里回答作者的问题吗?不是。相反,我为他提供了一个替代方案,我认为这更适合他的数据处理场景。
但如果非要我选择的话,我仍然会坚持使用任务工厂,因为它比自己管理线程更精简、更方便。
希望这能有所帮助。