多个线程之间的同步

本文关键字:同步 之间 线程 | 更新日期: 2023-09-27 18:07:32

我有几个线程被称为两个或多个方法。我需要同步它们,所以我尝试使用barrier类:

Barrier barrier = new Barrier(2); // 2 = #threads participating.
bool complete = false;
TaskFactory factory = Task.Factory;
// Start tasks
Task task_1 = factory.StartNew(() =>
{
    process_1.Server("1 and 2");
    barrier.SignalAndWait(); // Wait for task 2 to catch up.
    barrier.SignalAndWait(); // Wait for task 2 to print "2" and set complete = true.
    if (complete)
    {
        process_1.Server("1 and 3");
    }
});
Task task_6 = factory.StartNew(() =>
{
    process_6.Server("6 and 4");
    process_6.Server("6 and 3");
});
Task task_2 = factory.StartNew(() =>
{
    barrier.SignalAndWait(); // Wait for task 1 to print "1".
    process_2.Client("1 and 2");
    complete = true;
    barrier.SignalAndWait(); // Wait for task 1 to read complete as true.
    process_2.Server("2 and 5");
    process_2.Server("2 and 3");
});
Task task_4 = factory.StartNew(() =>
{
    process_4.Client("6 and 4");
    process_4.Server("4 and 7");
    process_4.Server("4 and 3");
});
Task task_5 = factory.StartNew(() =>
{
    process_5.Client("2 and 5");
    process_5.Server("5 and 3");
});
Task task_7 = factory.StartNew(() =>
{
    process_7.Client("4 and 7");
    process_7.Server("7 and 3");
});
Task task_3 = factory.StartNew(() =>
{
    process_3.Client("1 and 3");
    process_3.Client("2 and 3");
    process_3.Client("4 and 3");
    process_3.Client("5 and 3");
    process_3.Client("6 and 3");
    process_3.Client("7 and 3");
});
task_3.Wait();

我需要确保从不同线程调用方法之间的结果,例如:process_1.Server("1 and 2");process_2.Client("1 and 2"); 。在Server之前调用Client方法是不可接受的。所有依赖项:{process_1.Server("1 and 2"); process_2.Client("1 and 2");}, {process_2.Server("2 and 5"); process_5.Client("2 and 5");}, {process_6.Server("6 and 4"); process_4.Client("6 and 4");}, {process_4.Server("4 and 7"); process_7.Client("4 and 7");}, {process_1.Server("1 and 3"); process_3.Client("1 and 3");}, {process_2.Server("2 and 3"); process_3.Client("2 and 3");}, {process_4.Server("4 and 3"); process_3.Client("4 and 3");}, {process_5.Server("5 and 3"); process_3.Client("5 and 3");}, {process_6.Server("6 and 3"); process_3.Client("6 and 3");}, {process_7.Server("7 and 3"); process_3.Client("7 and 3");}.

{...}{...}之间没有依赖关系。因此,可以执行{process_6.Server("6 and 3"); process_3.Client("6 and 3");}, {process_7.Server("7 and 3"); process_3.Client("7 and 3");},反之亦然{process_7.Server("7 and 3"); process_3.Client("7 and 3");}, {process_6.Server("6 and 3"); process_3.Client("6 and 3");}.我写的元素之间有{...}依赖关系。你能帮我解决这个问题吗?我不知道如何实现这一点。

谢谢!

完整的程序代码:

class Pipe
{
    public string message;
    public Pipe()
    {
        message = "";
    }
    public Pipe(string message)
    {
        this.message = message;
    }
    public void Server(object pipeName)
    {
        // Create a name pipe
        using (NamedPipeServerStream pipeStream = new NamedPipeServerStream(pipeName.ToString()))
        {
            // Wait for a connection
            pipeStream.WaitForConnection();
            using (StreamWriter sw = new StreamWriter(pipeStream))
            {
                sw.AutoFlush = true;
                sw.WriteLine(message);
            }
        }
        Console.Write("Communication between processes " + pipeName.ToString());
    }
    public void Client(object pipeName)
    {
        using (NamedPipeClientStream pipeStream = new NamedPipeClientStream(pipeName.ToString()))
        {
            // The connect function will indefinately wait for the pipe to become available
            // If that is not acceptable specify a maximum waiting time (in ms)
            pipeStream.Connect();
            using (StreamReader sr = new StreamReader(pipeStream))
            {
                // We read a line from the pipe and print it together with the current time
                message += sr.ReadLine();
            }
        }
        Console.WriteLine(": client received message.'n");
    }
    static void Main(string[] args)
    {
            Pipe process_1 = new Pipe("Test message from process #1.");
            Pipe process_2 = new Pipe();
            Pipe process_3 = new Pipe();
            Pipe process_4 = new Pipe();
            Pipe process_5 = new Pipe();
            Pipe process_6 = new Pipe("Test message from process #6.");
            Pipe process_7 = new Pipe();
            TaskFactory factory = Task.Factory;
            // Start tasks
            Task task_1 = factory.StartNew(() => { process_1.Server("1 and 2"); process_1.Server("1 and 3"); });
            Task task_6 = factory.StartNew(() => { process_6.Server("6 and 4"); process_6.Server("6 and 3"); });
            Task task_2 = factory.StartNew(() => { process_2.Client("1 and 2"); process_2.Server("2 and 5"); process_2.Server("2 and 3"); });
            Task task_4 = factory.StartNew(() => { process_4.Client("6 and 4"); process_4.Server("4 and 7"); process_4.Server("4 and 3"); });
            Task task_5 = factory.StartNew(() => { process_5.Client("2 and 5"); process_5.Server("5 and 3"); });
            Task task_7 = factory.StartNew(() => { process_7.Client("4 and 7"); process_7.Server("7 and 3"); });
            Task task_3 = factory.StartNew(() => { process_3.Client("1 and 3"); process_3.Client("2 and 3"); process_3.Client("4 and 3"); process_3.Client("5 and 3"); process_3.Client("6 and 3"); process_3.Client("7 and 3"); });
            task_3.Wait();
    }
}

多个线程之间的同步

如果我

理解正确,您需要确保在您的Pipe对象上调用方法Client Server之前永远不会执行对方法的调用。我已将您的示例简化为并添加了一个测试类来记录行为。简化的代码包含更简单的Pipe类形式,现在只将一些字符串放在作为 c'tor 中的参数传入的列表中,而不是创建真正的管道。

同步完全由名为 BlockingPipePipe装饰子类处理。 BlockingPipe使用一些称为条件锁或条件同步的低级机制。Jeff Magee 和 Jeff Kramer 写了一本关于并发模式及其在 Java 中的应用的好书,看看条件同步。(Java( 幻灯片 12-14 或用于 C# 条件同步。在C#中,特别是看看@john斯基特的答案,他指出了另一个很好的参考。该模式包括使用 pulse 方法通知所有等待线程。

足够的理论,回到你的代码。以下是简化的Pipe类:

class Pipe
{
    internal static int counter = 0;
    private readonly int id = counter++;
    private readonly IList<string> calls;
    public Pipe(IList<string> calls) { this.calls = calls; }
    public virtual void Server(string s) { EnqueeCall(s, "server"); }
    public virtual void Client(string s) { EnqueeCall(s, "client"); }
    private void EnqueeCall(string s, string actor)
    {
        calls.Add(actor + id + " processes " + s);
    }
}

现在,BlockingPipe类使用条件同步。给定BlockingPipe对象的条件和状态可以建模为有限状态机。您的BlockingPipe可以处于两种状态 - 服务器调用和服务器未调用。状态用于维护每个方法的此依赖项。子类委托给基类的实现,以便提供更好的分离常用逻辑和同步逻辑:

class BlockingPipe : Pipe
{
    public BlockingPipe(IList<string> calls) : base(calls) { }
    private enum State { ServerCalled, ServerNotCalled }
    private State state = State.ServerNotCalled;
    public override void Server(string s)
    {
        lock (this)
        {
            base.Server(s);
            state = State.ServerCalled;
            Monitor.Pulse(this);
        }
    }
    public override void Client(string s)
    {
        lock (this)
        {
            while (state != State.ServerCalled)
                Monitor.Wait(this, 200);
            base.Client(s);
        }
    }
}

最后一步是测试类。

[TestClass]
public class SomeTestClass
{
    [TestMethod]
    public void TestMethod()
    {
        for (var i = 0; i < 100; i++) Test();
    }
    private static void Test()
    {
        Pipe.counter = 0;
        var list = new List<string>();
        var p = new BlockingPipe(list);
        var f = Task.Factory;
        var b = new Barrier(3);
        f.StartNew(() => { p.Client("asdf"); b.SignalAndWait(); });
        f.StartNew(() => { p.Server("qwer"); b.SignalAndWait(); });
        b.SignalAndWait();
        var exp = String.Join("'n", 
          new[] { "server0 processes qwer", "client0 processes asdf" });
        var act = String.Join("'n", list);
        Assert.AreEqual(exp, act);
    }
}

Test方法可以调用任意多次(希望(始终产生正确的行为。我希望这可以扩展到您的用例。测试检查在Pipe上执行的调用是否为该形式:

server0 进程 qwer 客户端 0 进程 ASDF

两个创建的线程共享 Pipe 对象的同一实例。为了测试此解决方案的鲁棒性,我添加了一个 for 循环,调用实际的 Test 方法一百次,始终产生相同的结果。我注意到的唯一缺陷是,如果不在BlockingPipewhile循环内为Monitor.Wait调用添加超时,条件同步模式本身的实现就不起作用。因为我有一个线程永远等待接收Pulse但脉冲线程已经返回,这可能还需要为这种情况添加另一个条件。

您可能希望创建一个具有自己的任务队列的自定义SynchronizationContext。然后,根据任务的依赖关系选择要执行的任务。此链接可说明如何设置自定义上下文以及如何使用它。