我怎么能只阻止直到第一个线程完成

本文关键字:第一个 线程 怎么能 | 更新日期: 2023-09-27 18:31:10

我最近遇到了一个案例,能够生成一堆线程,阻止并等待一个答案(第一个到达),取消其余线程然后取消阻止会很方便。

例如,假设我有一个采用种子值的搜索函数。 让我们规定搜索函数可以简单并行化。 此外,我们的搜索空间包含许多潜在的解决方案,对于某些种子值,函数将无限期搜索,但至少一个种子值将在合理的时间内产生解决方案。

如果我能完全天真地并行进行此搜索,那就太好了,例如:

let seeds = [|0..100|]
Array.Parallel.map(fun seed -> Search(seed)) seeds

可悲的是,Array.Parallel.map将阻塞,直到所有线程完成。 无赖。 我总是可以在搜索函数中设置超时,但随后我几乎可以肯定等待运行时间最长的线程完成;此外,对于某些问题,超时可能不够长。

简而言之,我想要类似于UNIX套接字select()调用的东西,仅用于任意函数。 这可能吗? 如上所述,它不必处于漂亮的数据并行抽象中,也不必是 F# 代码。 我什至很乐意使用本机库并通过 P/Invoke 调用它。

我怎么能只阻止直到第一个线程完成

您可以创建一堆任务,然后使用 Task.WaitAnyTask.WhenAny 同步等待第一个任务完成,或者分别创建一个将在第一个任务完成时完成的任务。

一个简单的同步示例:

var tasks = new List<Task<int>>();
var cts = new CancellationTokenSource();
for (int i = 0; i < 10; i++)
{
    int temp = i;
    tasks.Add(Task.Run(() =>
    {
        //placeholder for real work of variable time
        Thread.Sleep(1000 * temp);
        return i;
    }, cts.Token));
}
var value = Task.WaitAny(tasks.ToArray());
cts.Cancel();

或者对于异步版本:

public static async Task<int> Foo()
{
    var tasks = new List<Task<int>>();
    var cts = new CancellationTokenSource();
    for (int i = 0; i < 10; i++)
    {
        int temp = i;
        tasks.Add(Task.Run(async () =>
        {
            await Task.Delay(1000 * temp, cts.Token);
            return temp;
        }));
    }
    var value = await await Task.WhenAny(tasks);
    cts.Cancel();
    return value;
}
let rnd = System.Random()
let search seed = async {
    let t = rnd.Next(10000)
    //printfn "seed: %d  ms: %d" seed t
    do! Async.Sleep t
    return sprintf "seed %d finish" seed
}
let processResult result = async {
    //Todo:
    printfn "%s" result
}
let cts = new System.Threading.CancellationTokenSource()
let ignoreFun _ = () //if you don't want handle
let tasks = 
    [0..10]
    |> List.map (fun i ->
        async {
            let! result = search i
            do! processResult result
            cts.Cancel()
        }
    )
Async.StartWithContinuations(Async.Parallel tasks, ignoreFun, ignoreFun, ignoreFun, cts.Token)

尝试使用事件对象同步所有线程,当您找到设置事件的解决方案时,所有其他线程必须定期检查事件状态并停止执行(如果已设置)。

有关更多详细信息,请查看此处。

似乎对我有用

namespace CancellParallelLoops
{
    class Program
    {
        static void Main(string[] args)
        {
            int[] nums = Enumerable.Range(0, 10000000).ToArray();
            CancellationTokenSource cts = new CancellationTokenSource();
            // Use ParallelOptions instance to store the CancellationToken
            ParallelOptions po = new ParallelOptions();
            po.CancellationToken = cts.Token;
            po.MaxDegreeOfParallelism = System.Environment.ProcessorCount;
            Console.WriteLine("Press any key to start. Press 'c' to cancel.");
            Console.ReadKey();
            // Run a task so that we can cancel from another thread.
            Task.Factory.StartNew(() =>
            {
                if (Console.ReadKey().KeyChar == 'c')
                    cts.Cancel();
                Console.WriteLine("press any key to exit");
            });
            try
            {
                Parallel.ForEach(nums, po, (num) =>
                {
                    double d = Math.Sqrt(num);
                    Console.WriteLine("{0} on {1}", d, Thread.CurrentThread.ManagedThreadId);
                    if (num == 1000) cts.Cancel();
                    po.CancellationToken.ThrowIfCancellationRequested();
                });
            }
            catch (OperationCanceledException e)
            {
                Console.WriteLine(e.Message);
            }
            Console.ReadKey();
        }
    }
}