设计将推方法转换为拉方法的协程线程实现的替代方案

本文关键字:方法 线程 方案 实现 转换 程线程 | 更新日期: 2023-09-27 18:10:04

我有一个集合类,它以压缩格式保存许多不同类型的数据。为了枚举集合中的所有值,它有一个Execute(Query, IDataWriter)方法。你传递给它一个定义你想要的数据的查询,然后对于每一个匹配的数据,它调用你传入的IDataWriter对象上的一个方法。

IDataWriter接口有15个不同的方法,一个用于集合中不同的数据类型。现在我需要将这些数据提交到数据库我希望能够实现IEnumerator<SqlDataRecord>将这些东西提交到数据库。问题在于如何将调用Execute(将大量数据转储到IDataWriter对象(push)中)转换为pull方法,以便IEnumerator的MoveNext和Current可以使用。

我已经看过协程和纤维,但是我发现的例子中没有一个看起来像它们对一个内部不知道协程的现有方法(在我的例子中是Execute)工作。因此,我在伪代码中的计划是使用线程和手动同步执行以下操作。

Execute方法将在一个单独的线程中运行,我将手动等待并在每个IDataWriter方法中发送信号

class EnumeratorAdapterObject : IEnumerator<SqlDataRecord>, IDataWriter
{
public EnumeratorAdapterObject(Store storeObject)
{
    workerThread = new Thread(storeObject.Execute(query, this));
}
public bool MoveNext()
{
    if (firstTimeCalled)
    {
        start worker thread
    }
    else
    {
        signal resume
    }
    block for either a call into an Add method or the Execute thread finishes
    if (not anything buffered)
        return false
    else
        return true
}

// 14 other methods like this implemented in IDataWriter, each with different types
public void Add_Decimal(IntvlDataHeader header, decimal data)
{
    buffer field of current SqlDataRecord = generate record;
    signal main thread
    wait for resume signal
}
public SqlDataRecord Current
{
    get { return buffer field of current SqlDataRecord; }
}
}

这看起来像一个好方法吗?有人知道已经实现这个的任何例子或问题吗?

或者是否有办法利用任何新的4.0特性?我想过使用一个限制为1件事的阻塞并发集合,但是消费者(IEnumerator的MoveNext)如何知道其他线程何时完成添加东西?

设计将推方法转换为拉方法的协程线程实现的替代方案

而不是与信号/等待进行手动线程创建和同步,我发现我可以在完成时使用调用CompleteAdding()的阻塞收集。下面是一个快速的例子,对于我上面的问题,我将把它包装在一个实现IEnumerator<SqlDataRecord>IDataWriter的对象中,因此,我将调用Execute而不是GenerateStuff调用,Add_*方法将调用col.Add(new SqlDataRecord(....))

static void Main(string[] args)
{
    var col = new BlockingCollection<int>(1);
    Task.Factory.StartNew(
        () =>
        {
            GenerateStuff(col);
            col.CompleteAdding();
        });
    while (!col.IsCompleted)
    {
        Thread.Sleep(100);
        int result;
        if (!col.TryTake(out result, -1))
        {
            break;
        }
        Console.WriteLine("Got {0}", result);
    }
    Console.WriteLine("Done Adding!");
}
static void GenerateStuff(BlockingCollection<int> col)
{
    for (int i = 0; i < 10; i++)
    {
        Thread.Sleep(10);
        Console.WriteLine("Adding {0}", i);
        col.Add(i);
        Console.WriteLine("Added {0}", i);
    }
}

这还有一个好处,即运行Execute的工作线程将并发地生成下一个结果,IEnumerator返回Current, sql代码执行提交数据的任何操作。使用手动线程信号,每次只能运行一个线程。