谁能告诉我这个ZeroMQ代码有什么问题?

本文关键字:什么 问题 代码 ZeroMQ 告诉我 | 更新日期: 2023-09-27 18:02:33

我已经尝试了各种不同的事情,但这基本上是我想做的。建立一个正在进行的公共订阅。我重新运行了这段代码,在尝试将pub套接字连接到转发器设备的前端时,我得到了一个连接拒绝。

代码

string expectedAddress = "XXXX";
        string message = "hello its me";
        int count = 0;
        using (var context = ZmqContext.Create())
        {
            using (var forwarderDevice = new ForwarderDevice(context, "inproc://front", "inproc://back", DeviceMode.Threaded))
            {
                using (var pub = Helper.GetConnectedPublishSocket(context, "inproc://front"))
                {
                    using (var sub = Helper.GetConnectedSubscribeSocket(context, "inproc://back")) 
                    {
                        forwarderDevice.Start();
                        Helper.SendOneSimpleMessage(expectedAddress, message, pub);
                        var zmqMessage = Helper.ReceiveMessage(sub);
                        Assert.AreEqual(count, zmqMessage.FrameCount);
                        Frame frame = zmqMessage[0];
                        var address = Encoding.Unicode.GetString(frame.Buffer);
                        Assert.AreEqual(expectedAddress, address);
                    }
                }
                forwarderDevice.Stop();
            }
        }

好的第一件事我知道我做错了…在创建设备之后,在尝试连接设备之前,应该直接启动设备。第二件事是,设备是在另一个线程上启动的,可能在主线程尝试连接时还没有准备好,所以你需要以某种方式阻止它,直到它准备好。代码现在看起来像:

string expectedAddress = "XXXX";
        string message = "hello its me";
        int count = 0;
        using (var context = ZmqContext.Create())
        {
            using (var forwarderDevice = new ForwarderDevice(context, "inproc://front", "inproc://back", DeviceMode.Threaded))
            {
                forwarderDevice.Start();
                while (!forwarderDevice.IsRunning)
                { }
                using (var pub = Helper.GetConnectedPublishSocket(context, "inproc://front"))
                {
                    using (var sub = Helper.GetConnectedSubscribeSocket(context, "inproc://back")) 
                    {
                        Task<ZmqMessage> task = Task.Run(() =>
                            {
                                var zmqMessage = Helper.ReceiveMessage(sub);
                                return zmqMessage;
                            });
                        Helper.SendOneSimpleMessage(expectedAddress, message, pub);
                        task.Wait();
                        var result = task.Result;
                        Assert.AreEqual(count, result.FrameCount);
                        Frame frame = result[0];
                        var address = Encoding.Unicode.GetString(frame.Buffer);
                        Assert.AreEqual(expectedAddress, address);
                    }
                }
                forwarderDevice.Stop();
            }
        }

我没有得到任何错误,但我从来没有收到消息

谁能告诉我这个ZeroMQ代码有什么问题?

看看后面的代码,一个典型的SUB错误是实际上没有订阅任何内容—您需要setsockopt来订阅"(空字符串)以获取每条消息。如果那是一个PUB/SUB套接字对,除非你在helper中订阅它会过滤消息。