线程安全的数据缓冲区,用于批量插入大小可控的数据
本文关键字:数据 插入 用于 安全 缓冲区 线程 | 更新日期: 2023-09-27 18:04:36
我有一个模拟,生成的数据必须保存到数据库。
ParallelLoopResult res = Parallel.For(0, 1000000, options, (r, state) =>
{
ComplexDataSet cds = GenerateData(r);
SaveDataToDatabase(cds);
});
模拟生成大量数据,因此首先生成数据然后将其保存到数据库(最多1gb的数据)是不实际的,并且逐个保存到数据库也没有意义(太小的事务不实用)。我想把它们插入到数据库中,作为一个控制大小的批量插入(比如一次提交100个)。
然而,我认为我对并行计算的了解并不那么理论化。我想到了这个(正如你所看到的是非常有缺陷的):
DataBuffer buffer = new DataBuffer(...);
ParallelLoopResult res = Parallel.For(0, 10000000, options, (r, state) =>
{
ComplexDataSet cds = GenerateData(r);
buffer.SaveDataToBuffer(cds, i == r - 1);
});
public class DataBuffer
{
int count = 0;
int limit = 100
object _locker = new object();
ConcurrentQueue<ConcurrentBag<ComplexDataSet>> ComplexDataBagQueue{ get; set; }
public void SaveDataToBuffer(ComplexDataSet data, bool isfinalcycle)
{
lock (_locker)
{
if(count >= limit)
{
ConcurrentBag<ComplexDataSet> dequeueRef;
if(ComplexDataBagQueue.TryDequeue(out dequeueRef))
{
Commit(dequeueRef);
}
_lastItemRef = new ConcurrentBag<ComplexDataSet>{data};
ComplexDataSetsQueue.Enqueue(_lastItemRef);
count = 1;
}
else
{
// First time
if(_lastItemRef == null)
{
_lastItemRef = new ConcurrentBag<ComplexDataSet>{data};
ComplexDataSetsQueue.Enqueue(_lastItemRef);
count = 1;
}
// If buffer isn't full
else
{
_lastItemRef.Add(data);
count++;
}
}
if(isfinalcycle)
{
// Commit everything that hasn't been committed yet
ConcurrentBag<ComplexDataSet> dequeueRef;
while (ComplexDataSetsQueue.TryDequeue(out dequeueRef))
{
Commit(dequeueRef);
}
}
}
}
public void Commit(ConcurrentBag<ComplexDataSet> data)
{
// Commit data to database..should this be somehow in another thread or something ?
}
}
如您所见,我使用queue创建一个缓冲区,然后手动决定何时提交。然而,我有一种强烈的感觉,这不是很执行解决方案,我的问题。首先,我不确定我是否正确地进行了锁定。其次,我不确定这是否是完全线程安全的(或者根本不是)。
你能看一下并评论一下我应该怎么做吗?或者是否有更好的方法(使用某种生产者-消费者技术或其他方法)?
谢谢你,并致以最良好的祝愿。d .不需要使用锁或昂贵的并发安全数据结构。数据是完全独立的,所以引入锁和共享只会损害性能和可伸缩性。
Parallel.For
有一个重载,允许您指定每个线程的数据。在这里,您可以存储私有队列和私有数据库连接。
还有:Parallel.For
内部将您的范围划分为更小的块。传递一个大的范围是非常有效的,所以没有什么需要改变的。
Parallel.For(0, 10000000, () => new ThreadState(),
(i, loopstate, threadstate) =>
{
ComplexDataSet data = GenerateData(i);
threadstate.Add(data);
return threadstate;
}, threadstate => threadstate.Dispose());
sealed class ThreadState : IDisposable
{
readonly IDisposable db;
readonly Queue<ComplexDataSet> queue = new Queue<ComplexDataSet>();
public ThreadState()
{
// initialize db with a private MongoDb connection.
}
public void Add(ComplexDataSet cds)
{
queue.Enqueue(cds);
if(queue.Count == 100)
{
Commit();
}
}
void Commit()
{
db.Write(queue);
queue.Clear();
}
public void Dispose()
{
try
{
if(queue.Count > 0)
{
Commit();
}
}
finally
{
db.Dispose();
}
}
}
现在,MongoDb目前不支持真正的并发插入——它在服务器上持有一些昂贵的锁,所以并行提交不会给你带来多少(如果有的话)速度。他们想在将来解决这个问题,所以有一天你可能会得到一个免费的加速。
如果你需要限制数据库连接的数量,一个生产者/消费者设置是一个很好的选择。您可以使用BlockingCollection
队列来高效地完成此操作,而无需使用任何锁:
// Specify a maximum of 1000 items in the collection so that we don't
// run out of memory if we get data faster than we can commit it.
// Add() will wait if it is full.
BlockingCollection<ComplexDataSet> commits =
new BlockingCollection<ComplexDataSet>(1000);
Task consumer = Task.Factory.StartNew(() =>
{
// This is the consumer. It processes the
// "commits" queue until it signals completion.
while(!commits.IsCompleted)
{
ComplexDataSet cds;
// Timeout of -1 will wait for an item or IsCompleted == true.
if(commits.TryTake(out cds, -1))
{
// Got at least one item, write it.
db.Write(cds);
// Continue dequeuing until the queue is empty, where it will
// timeout instantly and return false, or until we've dequeued
// 100 items.
for(int i = 1; i < 100 && commits.TryTake(out cds, 0); ++i)
{
db.Write(cds);
}
// Now that we're waiting for more items or have dequeued 100
// of them, commit. More can be continue to be added to the
// queue by other threads while this commit is processing.
db.Commit();
}
}
}, TaskCreationOptions.LongRunning);
try
{
// This is the producer.
Parallel.For(0, 1000000, i =>
{
ComplexDataSet data = GenerateData(i);
commits.Add(data);
});
}
finally // put in a finally to ensure the task closes down.
{
commits.CompleteAdding(); // set commits.IsFinished = true.
consumer.Wait(); // wait for task to finish committing all the items.
}
在您的示例中,您有10 000 000个工作包。每一个都需要分配给一个线程。假设您没有真正的大量的cpu内核,这不是最佳的。您还必须在每次buffer.SaveDataToBuffer
调用时同步线程(通过使用锁)。此外,您应该意识到,在时序视图中,变量r
不一定要增加1(例如:Thread1执行1、2、3,Thread2执行4、5、6)。按时间顺序,这将导致r
依次传递到SaveDataToBuffer
(大约)(1、4、2、5、3、6)。
我会使工作包更大,然后一次提交每个包。这样做的另一个好处是,您不必经常锁定/同步所有内容。
下面是一个例子:
int total = 10000000;
int step = 1000;
Parallel.For(0, total / step, (r, state) =>
{
int start = r * start;
int end = start + step;
ComplexDataSet[] result = new ComplexDataSet[step];
for (int i = start; i < end; i++)
{
result[i - start] = GenerateData(i);
}
Commit(result);
});
在这个例子中,整个工作被分成10000个包(并行执行),每个包生成1000个数据项,并将它们提交给数据库。
对于这个解决方案,Commit
方法可能是一个瓶颈,如果不明智的设计。最好是使其线程安全而不使用任何锁。如果在需要同步的线程之间不使用公共对象,这是可以实现的。
。对于sql server后端,这意味着在每个Commit()
调用的上下文中创建自己的sql连接:
private void Commit(ComplexDataSet[] data)
{
using (var connection = new SqlConnection("connection string..."))
{
connection.Open();
// insert your data here...
}
}
与其增加软件的复杂性,不如考虑简化。您可以将代码重构为三个部分:
-
排队的工作
这是concurrent GenerateData in Parallel。对于进行一些繁重的计算并生成ComplexDataSet。
实际队列 一个存储来自[1]的结果的并发队列——那么多ComplexDataSet。这里我假设一个ComplexDataSet的实例实际上并不真正消耗资源,而且相当轻。只要队列是并发的,它就支持并行的"插入"answers"删除"。
退出队列的worker
从处理队列[2]中获取一个ComplexDataSet实例并将其放入并发包(或其他存储)的代码。一旦袋子有N个您阻塞的项目,停止退出队列,将袋子的内容刷新到数据库中并清除它。最后,解除阻塞并继续排队列。
这是一些元代码(它仍然可以编译,但需要改进)
[1]
// [1] - Class is responsible for generating complex data sets and
// adding them to processing queue
class EnqueueWorker
{
//generate data and add to queue
internal void ParrallelEnqueue(ConcurrentQueue<ComplexDataSet> resultQueue)
{
Parallel.For(1, 10000, (i) =>
{
ComplexDataSet cds = GenerateData(i);
resultQueue.Enqueue(cds);
});
}
//generate data
ComplexDataSet GenerateData(int i)
{
return new ComplexDataSet();
}
}
[3]
//[3] This guy takes sets from the processing queue and flush results when
// N items have been generated
class DequeueWorker
{
//buffer that holds processed dequeued data
private static ConcurrentBag<ComplexDataSet> buffer;
//lock to flush the data to the db once in a while
private static object syncRoot = new object();
//take item from processing queue and add it to internal buffer storage
//once buffer is full - flush it to the database
internal void ParrallelDequeue(ConcurrentQueue<ComplexDataSet> resultQueue)
{
buffer = new ConcurrentBag<ComplexDataSet>();
int N = 100;
Parallel.For(1, 10000, (i) =>
{
//try dequeue
ComplexDataSet cds = null;
var spinWait = new SpinWait();
while (cds == null)
{
resultQueue.TryDequeue(out cds);
spinWait.SpinOnce();
}
//add to buffer
buffer.Add(cds);
//flush to database if needed
if (buffer.Count == N)
{
lock (syncRoot)
{
IEnumerable<ComplexDataSet> data = buffer.ToArray();
// flush data to database
buffer = new ConcurrentBag<ComplexDataSet>();
}
}
});
}
}
[2]和用法
class ComplexDataSet { }
class Program
{
//processing queueu - [2]
private static ConcurrentQueue<ComplexDataSet> processingQueue;
static void Main(string[] args)
{
// create new processing queue - single instance for whole app
processingQueue = new ConcurrentQueue<ComplexDataSet>();
//enqueue worker
Task enqueueTask = Task.Factory.StartNew(() =>
{
EnqueueWorker enqueueWorker = new EnqueueWorker();
enqueueWorker.ParrallelEnqueue(processingQueue);
});
//dequeue worker
Task dequeueTask = Task.Factory.StartNew(() =>
{
DequeueWorker dequeueWorker = new DequeueWorker();
dequeueWorker.ParrallelDequeue(processingQueue);
});
}
}