数据库事务中的并发线程导致中的严重延迟.NET核心EF核心

本文关键字:核心 延迟 NET EF 事务 并发 线程 数据库 | 更新日期: 2023-09-27 18:08:21

背景

我正在尝试编写一个异步服务器,它可以与SQLite DB一起工作。我正在使用。NET核心与实体框架核心。

我将UnitOfWork与GenericRepository模式结合使用,但正如下面的示例代码所示,这与这些模式实际上并不相关。

我正在使用Windows 10,但我希望任何支持。NET核心平台的行为相同。

我想要实现的目标

我想要实现的是一个合理的交易行为。我已经将整个问题简化为一个简单的场景,在这个场景中,如果某个对象存在,我会查看数据库,如果不存在,我就会添加它,否则我会失败。整个操作是在一个事务中进行的,预期的场景是:

一个

  1. 数据库中不存在该对象
  2. 线程1检查该对象是否存在于数据库中,并发现它不存在
  3. 线程1将对象添加到数据库中
  4. 对象存在于数据库中

两个

  1. 数据库中存在对象
  2. 线程1检查对象是否存在于数据库中,并查看它是否存在
  3. 线程1报告失败,因为对象存在
  4. 对象仍然存在于数据库中

三个

  1. 数据库中不存在该对象
  2. 线程1检查该对象是否存在于数据库中,并发现它不存在
  3. 线程2检查该对象是否存在于数据库中,并发现它不存在
  4. 线程1尝试将对象添加到数据库中
  5. 线程2尝试将对象添加到数据库中
  6. 线程1或线程2成功地将对象添加到数据库中,另一个线程由于事务约束而失败
  7. 对象存在于数据库中

显然,场景一和场景二可以完美地工作,因为只有一个线程在操作。问题出在数字三上。

问题

问题是,在第三种情况下,当我们进入第五步时,整个事情就爆炸了。两个线程都有30秒的挂起时间,大多数时候两个线程均无法将对象添加到数据库中。

我知道如何使用全局应用程序锁定轻松解决此问题,但我想知道是否有可能在不锁定的情况下解决此问题并为数据库访问保留异步/等待功能。

有时,一个线程设法添加了对象,另一个线程失败了,但即使这样,两个线程也需要30秒才能完成操作,这是完全不可用的。

样本输出

17:41:18|first: Started
17:41:19|main: Press ENTER
17:41:19|second: Started
17:41:20|second: Object does not exist, entering wait ...
17:41:20|first: Object does not exist, entering wait ...
17:41:22|first: Wait done
17:41:22|second: Wait done
17:41:22|first: Call Insert
17:41:22|second: Call Insert
17:41:22|second: Call SaveThrowAsync
17:41:22|first: Call SaveThrowAsync
17:41:22|first: Call Commit
17:41:52|second: Exception: Microsoft.EntityFrameworkCore.DbUpdateException: An error occurred while updating the entries. See the inner exception for details. ---> Microsoft.Data.Sqlite.SqliteException: SQLite Error 5: 'database is locked'.
   at Microsoft.Data.Sqlite.Interop.MarshalEx.ThrowExceptionForRC(Int32 rc, Sqlite3Handle db)
   at Microsoft.Data.Sqlite.SqliteCommand.ExecuteReader(CommandBehavior behavior)
   at Microsoft.Data.Sqlite.SqliteCommand.ExecuteReaderAsync(CommandBehavior behavior, CancellationToken cancellationToken)
   at Microsoft.Data.Sqlite.SqliteCommand.<ExecuteDbDataReaderAsync>d__53.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Microsoft.EntityFrameworkCore.Storage.Internal.RelationalCommand.<ExecuteAsync>d__20.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Microsoft.EntityFrameworkCore.Update.ReaderModificationCommandBatch.<ExecuteAsync>d__32.MoveNext()
   --- End of inner exception stack trace ---
   at Microsoft.EntityFrameworkCore.Update.ReaderModificationCommandBatch.<ExecuteAsync>d__32.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Microsoft.EntityFrameworkCore.Update.Internal.BatchExecutor.<ExecuteAsync>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Microsoft.EntityFrameworkCore.ChangeTracking.Internal.StateManager.<SaveChangesAsync>d__47.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Microsoft.EntityFrameworkCore.ChangeTracking.Internal.StateManager.<SaveChangesAsync>d__45.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Microsoft.EntityFrameworkCore.DbContext.<SaveChangesAsync>d__30.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
   at ConsoleApp1.UnitOfWork.<SaveThrowAsync>d__6.MoveNext() in X:'Dev'NetCore'shit'test1'src'ConsoleApp1'UnitOfWork.cs:line 35
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
   at ConsoleApp1.Program.<ThreadProc>d__2.MoveNext() in X:'Dev'NetCore'shit'test1'src'ConsoleApp1'Program.cs:line 72
17:41:52|first: Exception: Microsoft.Data.Sqlite.SqliteException: SQLite Error 5: 'database is locked'.
   at Microsoft.Data.Sqlite.Interop.MarshalEx.ThrowExceptionForRC(Int32 rc, Sqlite3Handle db)
   at Microsoft.Data.Sqlite.SqliteCommand.ExecuteReader(CommandBehavior behavior)
   at Microsoft.Data.Sqlite.SqliteCommand.ExecuteNonQuery()
   at Microsoft.Data.Sqlite.SqliteTransaction.Commit()
   at Microsoft.EntityFrameworkCore.Storage.RelationalTransaction.Commit()
   at ConsoleApp1.Program.<ThreadProc>d__2.MoveNext() in X:'Dev'NetCore'shit'test1'src'ConsoleApp1'Program.cs:line 75
17:41:52|second: Finished
17:41:52|first: Finished
17:42:00|main: We have 0 object(s) in the database.

代码

我试着把所有不相关的东西都剪掉,尽量减少。如果您想运行该程序,只需在Visual Studio中创建这些文件,等待。NET Core项目同步,编译项目,运行"先添加迁移"answers"更新数据库"来创建数据库,然后您就可以运行它了。如果没有Visual Studio,您需要使用"dotnet"answers"dotnet ef"命令。

程序.cs:

using Microsoft.EntityFrameworkCore.Storage;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1
{
  public class Program
  {
    public static void Main(string[] args)
    {
      Thread thread1 = new Thread(new ParameterizedThreadStart(ThreadProc));
      thread1.Start("first");
      Thread.Sleep(1000);
      Thread thread2 = new Thread(new ParameterizedThreadStart(ThreadProc));
      thread2.Start("second");
      log("main", "Press ENTER");
      Console.ReadLine();
      using (UnitOfWork uow = new UnitOfWork())
      {
        IEnumerable<DatabaseObject> dbos = uow.DatabaseObjectRepository.GetAsync().Result;
        log("main", "We have {0} object(s) in the database.", dbos.Count());
        foreach (DatabaseObject dbo in dbos)
          log("main", " -> id:{0}, value:{1}", dbo.DatabaseObjectId, dbo.Value);
      }
    }
    public static void log(string Id, string Format, params object[] Args)
    {
      string prefix = string.Format("{0}|{1}: ", DateTime.Now.ToString("HH:mm:ss"), Id);
      string msg = string.Format(prefix + Format, Args);
      Console.WriteLine(msg);
    }
    public async static void ThreadProc(object State)
    {
      string id = (string)State; 
      log(id, "Started", id);
      int ourObjectId = 1234;
      using (UnitOfWork uow = new UnitOfWork())
      {
        using (IDbContextTransaction transaction = await uow.BeginTransactionAsync())
        {
          bool rollback = false;
          try
          {
            DatabaseObject dbo = (await uow.DatabaseObjectRepository.GetAsync(o => o.DatabaseObjectId == ourObjectId)).FirstOrDefault();
            if (dbo == null)
            {
              log(id, "Object does not exist, entering wait ...");
              await Task.Delay(2000); // Same result with Thread.Sleep(2000) instead.
              log(id, "Wait done");
              dbo = new DatabaseObject()
              {
                DatabaseObjectId = ourObjectId,
                Value = id
              };
              log(id, "Call Insert");
              uow.DatabaseObjectRepository.Insert(dbo);
              log(id, "Call SaveThrowAsync");
              await uow.SaveThrowAsync();
              log(id, "Call Commit");
              transaction.Commit(); // .NET Core should commit automatically on transaction Dispose, but that does not work for me.
            }
            else
            {
              log(id, "Object already exists");
              rollback = true;
            }
          }
          catch (Exception exception)
          {
            log(id, "Exception: {0}", exception.ToString());
          }
          if (rollback)
          {
            log(id, "Rolling back");
            transaction.Rollback();
          }
        }
      }
      log(id, "Finished");
    }
  }
}

UnitOfWork.cs:

using Microsoft.EntityFrameworkCore.Storage;
using System;
using System.Threading.Tasks;
namespace ConsoleApp1
{
  public class UnitOfWork : IDisposable
  {
    private DatabaseContext context = null;
    public DatabaseContext Context
    {
      get
      {
        if (context == null)
          context = new DatabaseContext();
        return context;
      }
    }
    private GenericRepository<DatabaseObject> databaseObjectRepository;
    public GenericRepository<DatabaseObject> DatabaseObjectRepository
    {
      get
      {
        if (databaseObjectRepository == null)
          databaseObjectRepository = new GenericRepository<DatabaseObject>(Context);
        return databaseObjectRepository;
      }
    }
    public async Task SaveThrowAsync()
    {
      await Context.SaveChangesAsync();
    }
    public async Task<IDbContextTransaction> BeginTransactionAsync()
    {
      return await Context.Database.BeginTransactionAsync();
    }

    private bool disposed = false;
    public void Dispose()
    {
      Dispose(true);
      GC.SuppressFinalize(this);
    }
    protected virtual void Dispose(bool Disposing)
    {
      if (disposed) return;
      if (Disposing)
      {
        if (context != null) context.Dispose();
        context = null;
        disposed = true;
      }
    }
  }
}

GenericRepository.cs:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Linq.Expressions;
using Microsoft.EntityFrameworkCore;
namespace ConsoleApp1
{
  public class GenericRepository<TEntity> where TEntity : class
  {
    internal DatabaseContext context;
    internal DbSet<TEntity> dbSet;
    public GenericRepository(DatabaseContext context)
    {
      this.context = context;
      dbSet = context.Set<TEntity>();
    }
    public virtual async Task<IEnumerable<TEntity>> GetAsync(Expression<Func<TEntity, bool>> filter = null)
    {
      IQueryable<TEntity> query = dbSet;
      if (filter != null)
        query = query.Where(filter);
      List<TEntity> result = await query.ToListAsync();
      return result;
    }
    public virtual void Insert(TEntity entity)
    {
      dbSet.Add(entity);
    }
    public virtual void Update(TEntity entityToUpdate)
    {
      dbSet.Attach(entityToUpdate);
      context.Entry(entityToUpdate).State = EntityState.Modified;
    }
  }
}

DatabaseObject.cs:

namespace ConsoleApp1
{
  public class DatabaseObject
  {
    public int DatabaseObjectId { get; set; }
    public string Value { get; set; }
  }
}

DatabaseContext.cs:

using Microsoft.EntityFrameworkCore;
namespace ConsoleApp1
{
  public class DatabaseContext : DbContext
  {
    public DbSet<DatabaseObject> DatabaseObjects { get; set; }
    protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
    {
      optionsBuilder.UseSqlite("Filename=mysqlite.db");
    }
  }
}

project.json:

{
  "version": "1.0.0-*",
  "buildOptions": {
    "emitEntryPoint": true
  },
  "dependencies": {
    "Microsoft.EntityFrameworkCore.Sqlite": "1.0.0",
    "Microsoft.EntityFrameworkCore.Design": {
      "version": "1.0.0-preview2-final",
      "type": "build"
    },
    "Microsoft.NETCore.App": {
      "version": "1.0.0"
    },
    "System.Runtime.InteropServices": "4.1.0",
    "Microsoft.EntityFrameworkCore.Tools": "1.0.0-preview2-final"
  },
  "frameworks": {
    "netcoreapp1.0": {
      "imports": "dnxcore50"
    }
  },
  "tools": {
    "Microsoft.EntityFrameworkCore.Tools": "1.0.0-preview2-final"
  },
  "runtimes": {
    "win10-x64": {}
  }
}

数据库事务中的并发线程导致中的严重延迟.NET核心EF核心

我在将其与EF6SQLite一起使用时遇到了类似的问题。这个问题很突出,因为您试图在不关闭连接的情况下使用可重复用于选择和更新操作的连接。尝试使用带有using关键字的本地DbContext。这将在dbContext使用后对其进行处置。你至少可以避免目前遇到的异常情况。

SQLite中的另一条规则是,只有一个连接可以执行写入操作

因此,在执行任何其他操作之前,我们需要确保写入连接已关闭,以便使写入可用于其他连接。