使用反应式扩展 - 这是异步的

本文关键字:异步 反应式 扩展 | 更新日期: 2023-09-27 18:32:12

尝试将现有数据访问代码转换为异步代码并遇到 Rx,因为您无法在方法主体中返回带有yield returnTask<IEnumerable<T>>

我写了这个,但不确定它的异步,所以指针感激地收到

public class EmployeeRepository : IEmployeeRepository
{
    public IAsyncEnumerable<Employee> GetEmployees()
    {
        return Enumerable().ToAsyncEnumerable();
    }
    private IEnumerable<Employee> Enumerable()
    {
        using (var connection = new SqlConnection(ConfigurationManager.ConnectionStrings["DBConnString"].ConnectionString))
        {
            connection.Open();
            using (var command = new SqlCommand(@"SELECT * FROM EMPLOYEES", connection))
            {
                using (var reader = command.ExecuteReader())
                {
                    while (reader.Read())
                    {
                        yield return
                            new Employee()
                                {
                                    Id = ReadField<int>(reader, "Id"),
                                    Name = ReadField<string>(reader, "Name")
                                };
                    }
                }
            }
        }
    }
    private static T ReadField<T>(IDataRecord reader, string fieldName)
    {
        var value = reader[fieldName];
        return value == DBNull.Value ? default(T) : (T)value;
    }
}

使用反应式扩展 - 这是异步的

这不是

异步的。 ToAsyncEnumerable创建一个简单的适配器,该适配器阻止每次调用MoveNext。返回这样的异步适配器是一种不好的做法,与执行Task.Run(() => BlockingMethod())相同。它向用户隐藏了实现效率低下的问题,如果他们知道它的存在,他们可能已经能够以更好的方式解决。

IAsyncEnumerable没有语言集成的 yield 特性,但它是可以模拟的。我有代码来做到这一点,但公平警告这会产生一些开销:

IAsyncEnumerable<Employee> async = AsyncEnumerableEx.Create<Employee>(
                                                  async (y, cancellationToken) =>
{
    using (var connection = new SqlConnection(ConfigurationManager
                            .ConnectionStrings["DBConnString"].ConnectionString))
    {
        await connection.OpenAsync(cancellationToken);
        using (var command = new SqlCommand(@"SELECT * FROM EMPLOYEES",
                                            connection))
        {
            using (var reader = await
                                   command.ExecuteReaderAsync(cancellationToken))
            {
                while (await reader.ReadAsync(cancellationToken))
                {
                    await y.YieldReturn(new Employee()
                    {
                        Id = ReadField<int>(reader, "Id"),
                        Name = ReadField<string>(reader, "Name")
                    });
                }
            }
        }
    }
});

如果你想使用实际的Rx,它内置了一个几乎相同的Observable.Create实用程序。由于削减了一些等待开销,它将更有效率。

IObservable<Employee> async = Observable.Create<Employee>(
                                                    async (obs, cancellationToken) =>
{
    using (var connection = new SqlConnection(ConfigurationManager
                            .ConnectionStrings["DBConnString"].ConnectionString))
    {
        await connection.OpenAsync(cancellationToken);
        using (var command = new SqlCommand(@"SELECT * FROM EMPLOYEES",
                                            connection))
        {
            using (var reader = await
                                   command.ExecuteReaderAsync(cancellationToken))
            {
                while (await reader.ReadAsync(cancellationToken))
                {
                    obs.OnNext(new Employee()
                    {
                        Id = ReadField<int>(reader, "Id"),
                        Name = ReadField<string>(reader, "Name")
                    });
                }
            }
        }
    }
});

如果你想使用 Rx,试试这样的事情:

public IObservable<Employee> GetEmployees()
{
    return Observable.Create<Employee>(o =>
        Observable.Using(() => new SqlConnection(ConfigurationManager
            .ConnectionStrings["DBConnString"].ConnectionString),
            connection =>
                Observable.Using(() =>
                {
                    connection.Open();
                    return new SqlCommand(
                        @"SELECT * FROM EMPLOYEES", connection);
                },
                    command =>
                        Observable.Using(() => command.ExecuteReader(),
                            reader =>
                                Observable.Generate(
                                    0,
                                    x => reader.Read(),
                                    x => x,
                                    x => new Employee()
                                {
                                    Id = ReadField<int>(reader, "Id"),
                                    Name = ReadField<string>(reader, "Name")
                                }, Scheduler.Default)))).Subscribe(o));
}