动作内的c# async

本文关键字:async | 更新日期: 2023-09-27 18:04:29

我想写一个方法,它接受几个参数,包括一个动作和重试次数,并调用它。

我有这样的代码:

public static IEnumerable<Task> RunWithRetries<T>(List<T> source, int threads, Func<T, Task<bool>> action, int retries, string method)
    {
        object lockObj = new object();
        int index = 0;
        return new Action(async () =>
        {
            while (true)
            {
                T item;
                lock (lockObj)
                {
                    if (index < source.Count)
                    {
                        item = source[index];
                        index++;
                    }
                    else
                        break;
                }
                int retry = retries;
                while (retry > 0)
                {
                    try
                    {
                        bool res = await action(item);
                        if (res)
                            retry = -1;
                        else
                            //sleep if not success..
                            Thread.Sleep(200);
                    }
                    catch (Exception e)
                    {
                        LoggerAgent.LogException(e, method);
                    }
                    finally
                    {
                        retry--;
                    }
                }
            }
        }).RunParallel(threads);
    }

RunParallel是Action的一个扩展方法,它看起来像这样:

public static IEnumerable<Task> RunParallel(this Action action, int amount)
    {
        List<Task> tasks = new List<Task>();
        for (int i = 0; i < amount; i++)
        {
            Task task = Task.Factory.StartNew(action);
            tasks.Add(task);
        }
        return tasks;
    }

现在,问题是:线程正在消失或崩溃,而没有等待操作完成。

我写了这个示例代码:

private static async Task ex()
    {
        List<int> ints = new List<int>();
        for (int i = 0; i < 1000; i++)
        {
            ints.Add(i);
        }
        var tasks = RetryComponent.RunWithRetries(ints, 100, async (num) =>
        {
            try
            {
                List<string> test = await fetchSmthFromDb();
                Console.WriteLine("#" + num + "  " + test[0]);
                return test[0] == "test";
            }
            catch (Exception e)
            {
                Console.WriteLine(e.StackTrace);
                return false;
            }
        }, 5, "test");
        await Task.WhenAll(tasks);
    }

fetchSmthFromDb是一个简单的任务>,它从数据库中获取一些东西,在这个例子之外调用时工作得很好。

每当List<string> test = await fetchSmthFromDb();行被调用时,线程似乎正在关闭,Console.WriteLine("#" + num + " " + test[0]);甚至没有被触发,也在调试断点时从未命中。

最终工作代码

private static async Task DoWithRetries(Func<Task> action, int retryCount, string method)
    {
        while (true)
        {
            try
            {
                await action();
                break;
            }
            catch (Exception e)
            {
                LoggerAgent.LogException(e, method);
            }
            if (retryCount <= 0)
                break;
            retryCount--;
            await Task.Delay(200);
        };
    }
    public static async Task RunWithRetries<T>(List<T> source, int threads, Func<T, Task<bool>> action, int retries, string method)
    {
        Func<T, Task> newAction = async (item) =>
        {
            await DoWithRetries(async ()=>
            {
                await action(item);
            }, retries, method);
        };
        await source.ParallelForEachAsync(newAction, threads);
    }

动作内的c# async

问题在这一行:

return new Action(async () => ...

使用async lambda函数启动异步操作,但不返回等待任务。也就是说,它运行在工作线程上,但你永远不会知道它什么时候完成。程序在异步操作完成之前就终止了——这就是为什么看不到任何输出。

必须是:

return new Func<Task>(async () => ...

首先,您需要拆分方法的职责,这样您就不会将重试策略(不应该硬编码为布尔结果的检查)与并行运行的任务混合在一起。

然后,如前所述,您运行while (true)循环100次,而不是并行执行操作。

正如@MachineLearning指出的,使用Task.Delay而不是Thread.Sleep

总的来说,你的解决方案看起来像这样:

using System.Collections.Async;
static async Task DoWithRetries(Func<Task> action, int retryCount, string method)
{
    while (true)
    {
        try
        {
            await action();
            break;
        }
        catch (Exception e)
        {
            LoggerAgent.LogException(e, method);
        }
        if (retryCount <= 0)
            break;
        retryCount--;
        await Task.Delay(millisecondsDelay: 200);
    };
}
static async Task Example()
{
    List<int> ints = new List<int>();
    for (int i = 0; i < 1000; i++)
        ints.Add(i);
    Func<int, Task> actionOnItem =
        async item =>
        {
            await DoWithRetries(async () =>
            {
                List<string> test = await fetchSmthFromDb();
                Console.WriteLine("#" + item + "  " + test[0]);
                if (test[0] != "test")
                    throw new InvalidOperationException("unexpected result"); // will be re-tried
            },
            retryCount: 5,
            method: "test");
        };
    await ints.ParallelForEachAsync(actionOnItem, maxDegreeOfParalellism: 100);
}

您需要使用AsyncEnumerator NuGet包,以便从System.Collections.Async命名空间使用ParallelForEachAsync扩展方法。

除了最后的完整的重新设计,我认为强调原始代码的真正错误是非常重要的。

0)首先,正如@Serge Semenov立即指出的,Action必须被

取代
Func<Task>

但是还有另外两个重要的变化。

1)使用异步委托作为参数时,有必要使用最近的Task。运行代替旧模式的新TaskFactory。StartNew(否则你必须显式地添加Unwrap())

2)而且ex()方法不能是异步的,因为Task。WhenAll必须用Wait()方法等待,不能用await方法。

在这一点上,即使有逻辑错误需要重新设计,从纯粹的技术角度来看,它确实有效并且产生了输出。

在线测试:http://rextester.com/HMMI93124