事件存储重复提交异常
本文关键字:提交 异常 存储 事件 | 更新日期: 2023-09-27 17:55:54
更新:我们越来越System.Data.SqlClient.SqlException
.消息是:
违反主键约束"PK_Commits"。无法在对象 'dbo 中插入重复键。提交。''r'语句已终止
似乎EventStore正在使用streamid和commitid作为唯一ID。
我们使用事件存储来追加事件,如下所示。
public bool TryAppend(object[] content)
{
if (content == null)
throw new ArgumentNullException("content");
try
{
using (var stream = m_storage.OpenStream(m_streamID, 0, int.MaxValue))
{
var versionInStore = stream.StreamRevision;
content.ToList().ForEach(m =>
{
var version = ++versionInStore;
var key = string.Format("{0}-{1:00000000}", m.GetType().Name, version);
var savedMessage = new SavedRecord(key, version, m);
stream.Add(new EventMessage { Body = savedMessage });
});
stream.CommitChanges(Guid.NewGuid());
}
return true;
}
catch (Exception e)
{
m_logger.LogError(e);
return false;
}
}
事件存储的配置如下。我们使用 Sql Serer 2008 作为持久性存储。
return Wireup.Init()
.LogToOutputWindow()
.UsingSqlPersistence(m_connectionName)
.WithDialect(new MsSqlDialect())
.EnlistInAmbientTransaction() // two-phase commit
.InitializeStorageEngine()
.UsingJsonSerialization()
.Compress()
.UsingSynchronousDispatchScheduler()
.DispatchTo(new DelegateMessageDispatcher(DispatchCommit))
.Build();
任何想法为什么会得到重复提交异常?
谢谢
遇到了同样的问题;就我而言,这可能是因为不同的线程同时向具有相同 id 的流中添加了不同的事件。已编写以下代码以便能够重试添加事件:
private void TryAddEvent(IStoreEvents storeEvents, IUserEvent anEvent, Guid streamId)
{
var isCommitSuccessful = false;
for (var i = 0; i < 10 && !isCommitSuccessful; i++)
{
try
{
using (var stream = storeEvents.OpenStream(streamId, 0, int.MaxValue))
{
stream.Add(new EventMessage {Body = anEvent});
if (stream.UncommittedEvents.All(e => e.Body != anEvent))
{
stream.Add(new EventMessage {Body = anEvent});
}
stream.CommitChanges(Guid.NewGuid());
}
isCommitSuccessful = true;
}
catch (Exception ex)
{
if (!(ex is SqlException) && !(ex is ConcurrencyException))
{
throw;
}
using (var stream = storeEvents.OpenStream(streamId, 0, int.MaxValue))
{
if (stream.CommittedEvents.Any(e => e.Body == anEvent))
{
isCommitSuccessful = true;
}
}
}
}
if (!isCommitSuccessful)
{
throw new ConcurrencyException(String.Format("Cannot add {0} to event store", anEvent.GetType()));
}
}
希望它能有所帮助。