c#检查线程/后台任务是否已完成或需要中止

本文关键字:已完成 是否 检查 线程 后台任务 | 更新日期: 2023-09-27 18:28:11

我对控制台/窗口服务有一个相当简单的要求(我可以在任何一种模式下运行它)应用程序:

  1. 从数据库中获取要处理的项目列表
  2. 在后台启动一个方法(只有一个,不需要更多)来处理一个项目
  3. 检查它是否已完成或需要终止(通过sql查找)
  4. 完成时重复2次/中止下一次
  5. 不再时,睡一会儿,重复1

我是c#/.net的新手,可以看到各种各样的线程系统。对于这种情况,线程和任务哪个更好?

在线程的情况下,我假设每个项目都要处理类似于(这个粗略的代码):

Thread thread = new MyThread(new ThreadStart(this.SomeFunction));
thread.Start();
while(!finished) {
  if (!thread.IsAlive())
    finished=true; 
  else {
     //check database for early termination of job
     terminate=SomeChdck();
     if(terminate) { thread.Abort(); finished=true;}
  }
}

//返回并重复

或者在Task的情况下,它类似于(同样,粗略地完成,并从网络上截取):

var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
Task task = Task.Factory.StartNew(this.SomeFunction, token);
while(!finished) {
  if (task.IsCompleted)
    finished=true;
  else {
     //check database for early termination of job
     terminate=SoneChdck();
     if(terminate) { tokenSource.Cancel(); finished=true;}
  }
}

//返回并重复

这两种方法有区别吗(假设它们都能工作),我在某个地方读到Thread.Artrt()被弃用,但文档中没有提到。

谢谢。

c#检查线程/后台任务是否已完成或需要中止

这通常被称为生产者/消费者问题。你有一个线程(你的主线程),它将为你的后台线程(消费者)生成许多项目。

net的任务并行库中的BlockingCollection对此非常有用。看看这篇文章:http://blogs.msdn.com/b/csharpfaq/archive/2010/08/12/blocking-collection-and-the-producer-consumer-problem.aspx

此外,我建议您查看Darek在他的帖子中建议的"来自并行扩展插件的管道"项目。

一般来说,我建议Tasks或ThreadPool(按顺序)不要手动新建线程。线程池旨在通过池化来减少构造多个线程的开销。任务使用线程池。应该很少需要手动创建线程。

使用任务工厂。您可以使用等待任务完成

Task.WaitAll(task);

而不是示例代码中的while循环。

此外,CancelationToken用于向任务发出应该取消的信号,因此您需要自己执行检查。

http://msdn.microsoft.com/EN-US/library/vstudio/dd997396(v=vs.100).aspx

然而,如果您考虑并行扩展插件的Pipeline,您会更好,它也支持取消令牌。通过这种方式,您可以从DB中检索记录,然后将它们作为IEnumerable传递给管道,同时您可以自由地从单独的或内部线程中取消所有记录。您可以开始处理第一条记录,而其余记录则在后台检索。Pipeline将为其要处理的每个元素的每个步骤创建一个后台任务。每个步骤的默认平行度为1。它非常快速高效。

更新

Dapper、Parallel Extensions Extras和Reactive Extensions 的小示例

var pipeline = Pipeline.Create<SomeType, bool>(st =>
{
    //Do something with st
    return someBool; //some bool if you succeeded or not
});
var cts = new CancellationTokenSource();
//cancel after 10s (just for fun)
Observable.Timer(TimeSpan.FromSeconds(10)).Subscribe(s => cts.Cancel());
using (var conn = new SqlConnection("someConnectionString"))
{
    conn.Open();
    pipeline.Process(conn.Query<SomeType>("SOME SQL HERE", buffered:true),cts.Token).ToList();
}

选择此选项的原因是为了证明使用Dapper是多么容易,并行扩展插件是多么强大和方便,但对于您的示例,它是故意过度设计的…:)我希望你能原谅我。结尾的ToList()是必需的,否则不会对IEnumerable执行任何操作。或者你可以使用这种方法:

Console.WriteLine(
    pipeline.Process(conn.Query<SomeType>("SOME SQL HERE", buffered: true), cts.Token).All(b => b)
        ? "All records processed successfully"
        : "Some records failed");

如果您想从数据处理步骤内部取消,请首先声明cts:

var cts = new CancellationTokenSource();
var pipeline = Pipeline.Create<SomeType,bool>(st =>
{
    //Do something with st
    //you could even cancel from here
    if(someOtherBool)
        cts.Cancel();
    return someBool; //some bool if you succeeded or not for example
});

如果你不想声明一个特定的类型:

var cts = new CancellationTokenSource();
var pipeline = Pipeline.Create<dynamic,bool>(d =>
{
    //Do something with data
    if(someOtherBool)
        cts.Cancel();
    return someBool; //some bool if you succeeded or not
});
using (var conn = new SqlConnection("someConnectionString"))
{
    conn.Open();
    foreach (var b in pipeline.Process(conn.Query("SOME SQL HERE", buffered: true), cts.Token))
    {
        Console.WriteLine(b?"Success":"Failure");
    }
}

最后要提到的是cts。Cancel()通常在内线程上引发异常,因此如果需要,请将管道封装在try/catch中。

更新2

在阅读了作者的评论后,我仍然会选择Dapper、PEE和Rx的组合(双关语)。

var cts = new CancellationTokenSource();
var pipeline = Pipeline.Create<dynamic, dynamic>(d =>
{
    //Do something with data in step 1
    if (someConditionalCheck)
        cts.Cancel();
    return d; 
}).Next<dynamic>(d =>
{
    //do something with data is step 2
    if(someConditionalCheck)
        cts.Cancel();
    return d;
});
subscription = Observable.Interval(TimeSpan.FromMinutes(1)).Subscribe(_ =>
{
    try
    {
        using (var conn = new SqlConnection("someConnectionString"))
        {
            conn.Open();
            foreach (var v in pipeline.Process(conn.Query("SOME SQL HERE", buffered: true), cts.Token))
            {
                //Do something with or ignore the result
            }
        }
    }
    catch (AggregateException e)
    {
        //Investigate what happened, could be error in processing 
        //or operation cancelled
    }
    catch (Exception e)
    {
        //All other exceptions
    }
});

Rx可以让我创建一个整洁的可观察对象,它每分钟都会发射一次。我还可以设计一个在前一次跑步一段时间不活动后启动的,在这种情况下我更喜欢间隔。

PEE让我创建了一个整洁的工作流程,在那里我可以指定对从数据库检索的一个数据项执行的多个步骤。访问CancellationTokenSource可以让我在每一步完成后立即取消所有步骤,因此,如果一条记录在步骤1中,另一条在步骤N中,那么一旦完成各自的代码块,这两条记录都将被取消。

Dapper只是一个与数据库对话的时间服务器。

然而,正如你所知,我并没有真正使用线程或任务,所以我在这里回答作者的问题吗?不是。相反,我为他提供了一个替代方案,我认为这更适合他的数据处理场景。

但如果非要我选择的话,我仍然会坚持使用任务工厂,因为它比自己管理线程更精简、更方便。

希望这能有所帮助。