如何使用异步操作正确获取数据

本文关键字:获取 数据 何使用 异步操作 | 更新日期: 2023-09-27 18:25:54

我需要通过等待已经运行的操作的结果来处理并发请求。

private readonly object _gettingDataTaskLock = new object();
private Task<Data> _gettingDataTask;
public virtual Data GetData(Credential credential)
{
    Task<Data> inProgressDataTask = null;
    lock (_gettingDataTaskLock)
    {
        if (_gettingDataTask == null)
        {
            _gettingDataTask = Task.Factory.StartNew(() 
                                                    => GetDataInternal(credential));
            _gettingDataTask.ContinueWith((task) =>
            {
                lock (_gettingDataTaskLock)
                {
                    _gettingDataTask = null;
                }
             },
             TaskContinuationOptions.ExecuteSynchronously);
         }
          inProgressDataTask = _gettingDataTask;
     }
     try
     {
         return inProgressDataTask.Result;
     }
     catch (AggregateException ex)
     {
         _logger.ErrorException(ex, "An error occurs during getting full data");
         throw ex.InnerException;
      }
 }

我有一个潜在的问题:几个调用GetData可能使用不同的credentials。首先使用正确的凭据进行呼叫,然后使用错误凭据进行呼叫。这两个请求得到了与第一个相同的答案。如何解决这个问题?

有没有什么方法可以简化这些代码,使其线程安全且无错误?

如何使用异步操作正确获取数据

更新:

这里似乎实现了双锁检查,就像在singleton模式中一样。CCD_ 3只要执行就被缓存。例如,如果它在100ms内执行,并且每10ms运行一个GetData(),则前10个调用将使用相同的GetDataInternal()

TaskContinuationOptions.ExecuteSynchronously用于确保在同一线程中运行continuation。

 public virtual Data GetData(Credential credential)
 {
     Task<Data> inProgressDataTask = Task.Factory.StartNew(() => GetDataInternal(credential));
     try
     {
         return inProgressDataTask.Result;
     }
     catch (AggregateException ex)
     {
         _logger.ErrorException(ex, "An error occurs during getting full data");
         throw ex.InnerException;
     }
 }

异步/等待场景:

Data GetInternalData()更改为async Task<Data> GetInternalData()

将方法GetData()更改为

public virtual async Task<Data> GetData()
{
     try
     {
         return await GetInternalData();
     }
     catch (AggregateException ex)
     {
         _logger.ErrorException(ex, "An error occurs during getting full data");
         throw ex.InnerException;
     }
 }

以下是我对需求的解释:

  • 对数据的请求可能同时带有相同/不同的凭据
  • 对于每个唯一的凭据集,最多可以有一个正在进行的GetDataInternal调用,当它准备好时,该调用的结果将返回给所有排队的服务员
  • 在此之后,上一次调用的结果将无效,并且将允许具有相同凭据集的新GetDataInternal调用
  • 允许使用不同凭据并行调用GetDataInternal

很简单。

private readonly Dictionary<Credential, Lazy<Data>> Cache
    = new Dictionary<Credential, Lazy<Data>>();
public Data GetData(Credential credential)
{
    if (credential == null)
    {
        // Pass-through, no caching.
        return GetDataInternal(null);
    }
    Lazy<Data> lazy;
    lock (Cache)
    {
        if (!Cache.TryGetValue(credential, out lazy))
        {
            // ExecutionAndPublication is the default LazyThreadSafetyMode, but I 
            // wanted to spell it out to drive the point home: this Lazy instance 
            // will only allow a single call to GetDataInternal, even if multiple
            // threads query its Value property simultaneously.
            lazy = new Lazy<Data>(
                () => GetDataInternal(credential),
                LazyThreadSafetyMode.ExecutionAndPublication
            );
            Cache.Add(credential, lazy);
        }
    }
    // We have released the lock on Cache to allow other threads
    // (with potentially *different* credentials) to access the
    // cache and proceed with their own work in parallel with us.
    Data data;
    try
    {
        // Wait for the GetDataInternal call to complete.
        data = lazy.Value;
    }
    finally
    {
        // At this point the call to GetDataInternal has completed and Data is ready.
        // We will invalidate the cache entry if another thread hasn't done so already.
        lock (Cache)
        {
            // The lock is required to ensure the atomicity of our "fetch + compare + remove" operation.
            // *ANY* thread is allowed to invalidate the cached value, not just the thread that created it.
            // This ensures that the cache entry is cleaned up sooner rather than later.
            // The equality check on the Lazy<Data> instance ensures that the cache entry
            // is not cleaned up too soon, and prevents the following race:
            // (assume all operations use identical credentials)
            // - Thread A creates and stores a Lazy<Data> instance in the cache.
            // - Thread B fetches the Lazy<Data> instance created by Thread A.
            // - Threads A and B access Lazy<Data>.Value simultaneously.
            // - Thread B wins the race and enters the second (this) protected
            //   region and invalidates the cache entry created by Thread A.
            // - Thread C creates and stores a *NEW* Lazy<Data> instance in the cache.
            // - Thread C accesses its Lazy<Data>.Value.
            // - Thread A finally gets to invalidate the cache, and OOPS, Thread C's cache
            //   entry is invalidated before the call to Lazy<Data>.Value has completed.
            // With the equality check in place, Thread A will *not*
            // invalidate the cache entry created by another thread.
            Lazy<Data> currentLazy;
            if (Cache.TryGetValue(credential, out currentLazy) && lazy == currentLazy)
            {
                // Need to invalidate.
                Cache.Remove(credential);
            }
        }
    }
    return data;
}

需要注意的事项:

  • 凭证必须覆盖EqualsGetHashCode,以便进行相等性比较
  • 如果您选择将此方法设为虚拟方法,请小心覆盖此方法
  • 删除了错误处理,将重点放在"肉"上

如果您使用的是.NET 4.5或更高版本,上面的转换为Task<Data>返回方法,并进行了一些小的调整(如果您使用.NET 4.0,则会进行一些大的恶意更改)。请告诉我这是否是一个要求,如果是,您要针对的是.NET的哪个版本。

编辑:我添加了一个try/finally-应该从一开始就在那里。

此外,这里有一个ConcurrentDictionary版本,我在其中绕过了它的局限性——你会注意到它只是稍微干净了一点:

private readonly ConcurrentDictionary<Credential, Lazy<Data>> Cache
    = new ConcurrentDictionary<Credential, Lazy<Data>>();
public Data GetData(Credential credential)
{
    if (credential == null)
    {
        // Pass-through, no caching.
        return GetDataInternal(null);
    }
    // This instance will be thrown away if a cached
    // value with our "credential" key already exists.
    Lazy<Data> newLazy = new Lazy<Data>(
        () => GetDataInternal(credential),
        LazyThreadSafetyMode.ExecutionAndPublication
    );
    Lazy<Data> lazy = Cache.GetOrAdd(credential, newLazy);
    bool added = ReferenceEquals(newLazy, lazy); // If true, we won the race.
    Data data;
    try
    {
        // Wait for the GetDataInternal call to complete.
        data = lazy.Value;
    }
    finally
    {
        // Only the thread which created the cache value
        // is allowed to remove it, to prevent races.
        if (added) {
            Cache.TryRemove(credential, out lazy);
        }
    }
    return data;
}

当您使用MultithreadingMultiprocessing时,您必须执行一些操作来管理它们。CCD_ 19是一个庞大而复杂的课题。您的代码可能会出现不同的情况;假设在运行这行代码之前:

return inProgressDataTask.Result;

每个线程执行这行代码:

_gettingDataTask = null;

这是可能的,因为Task s是在async中运行的。注意,inProgressDataTask是指_gettingDataTask所指的地方:

inProgressDataTask = _gettingDataTask;

_gettingDataTask对象在所有任务(线程)之间共享,这可能会导致程序出错。如果你想使用它,在这种情况下,你可以使用类似Singleton模式的东西。如果使用共享对象,请不要在代码的锁部分中使用Task.Factory.StartNew_gettingDataTask.ContinueWith(以异步方式运行)。

或者假设在运行这行代码之前:

_gettingDataTask = null;

其他一些线程执行这行代码:

if (_gettingDataTask == null)

这使得与第一个线程相关的所有答案都被执行。

为了澄清问题,筛选新项目,并在其中添加以下代码:

class Test
{
    private readonly object _gettingDataTaskLock = new object();
    private Task<int> _gettingDataTask;
    public int GetData(int credential)
    {
        Task<int> inProgressDataTask = null;
        Console.WriteLine(credential + "-TEST0");
        lock (_gettingDataTaskLock)
        {
            Console.WriteLine(credential + "-TEST1");
            if (_gettingDataTask == null)
            {
                Console.WriteLine(credential + "-TEST2");
                _gettingDataTask = Task.Factory.StartNew(()
                    => GetDataInternal(credential));
                _gettingDataTask.ContinueWith((task) =>
                {
                    lock (_gettingDataTaskLock)
                    {
                        Console.WriteLine(credential + "-TEST3");
                        _gettingDataTask = null;
                        Console.WriteLine(credential + "-TEST4");
                    }
                },
                    TaskContinuationOptions.ExecuteSynchronously);
                Console.WriteLine(credential + "-TEST5");
            }
            //_gettingDataTask.Wait();
            Console.WriteLine(credential + "-TEST6");
            inProgressDataTask = _gettingDataTask;
            Console.WriteLine(credential + "-TEST7");
        }
        Console.WriteLine(credential + "-TEST8");
        try
        {
            Console.WriteLine(credential + "-TEST9");
            return inProgressDataTask.Result;
        }
        catch (AggregateException ex)
        {
            throw ex.InnerException;
        }
    }
    private int GetDataInternal(int credential)
    {
        return credential;
    }
}

在主要方法中写入:

private static void Main(string[] args)
{
    var test = new Test();
    var results = new List<int>();
    for (int i = 0; i < 5; i++)
        results.Add(test.GetData(i));
    Task.WaitAll();
    Console.WriteLine(string.Join(",",results));
    Console.ReadLine();
}

现在看看结果。

最后,根据给出的描述,代码的正确实现之一如下:

public virtual Data GetData(Credential credential)
{
    Task<Data> inProgressDataTask = null;
    lock (_gettingDataTaskLock)
    {
        inProgressDataTask = Task.Factory.StartNew(() => GetDataInternal(credential));
        inProgressDataTask.ContinueWith((task) =>
        {
            lock (_gettingDataTaskLock)
            {
                //inProgressDataTask = null;
            }
        },
        TaskContinuationOptions.ExecuteSynchronously);
    }
    try
    {
        return inProgressDataTask.Result;
    }
    catch (AggregateException ex)
    {
        throw ex.InnerException;
    }
}