无法使NetMQ发布-订阅模式与ReceiveReady一起工作

本文关键字:模式 ReceiveReady 一起 工作 NetMQ 发布 | 更新日期: 2023-09-27 18:14:22

我正在尝试使用NetMQ(3.3.3.4)并创建一个发布-订阅模式。

我希望主机/服务器侦听一个端口(9000)上的所有传入数据,并将数据转发给另一个端口(9001)上的所有客户端/订阅者。

客户端将在9000上发送数据,并接收在9001上发送的所有消息。

在文档之后,我创建了类似下面代码的东西,但我不能让它工作。我认为主要是因为ReceiveReady从来没有被调用过!

我认为它应该如何工作:

  • client.Publish应该使host.SubscriberSocket_ReceiveReady中的第一行解除阻塞并将数据传递给另一个套接字
  • 当数据被传递时,它应该出现在客户端无限运行的Task

结果:

  • // This line is never reached上的断点永远不会到达
  • 没有例外。
  • 在主机上切换端口使publish = 9000和subscribe = 9001无效
  • Windows防火墙已关闭,所以不应该有任何阻塞
  • 它没有区别,如果我把地址放入PublisherSocket构造器,或者如果我使用_publisherSocket.Bind(address)在主机或_publisherSocket.Connect(address)在客户端

我做错了什么?

<

主机/strong>

public class MyNetMQHost {
    private NetMQSocket _publishSocket;
    private NetMQSocket _subscribeSocket;
    private NetMQPoller _poller;
    public MyNetMQHost(string publishAddress = "@tcp://localhost:9001", string subscribeAddress = "@tcp://localhost:9000") {
        Task.Factory.StartNew(() => {
            using (_publishSocket = new PublisherSocket(publishAddress))
            using (_subscribeSocket = new SubscriberSocket(subscribeAddress))
            using (_poller = new NetMQPoller { _publishSocket, _subscribeSocket }) {
                _subscriberSocket.ReceiveReady += SubscriberSocket_ReceiveReady;
                _poller.Run();
            }
        });
    }
    private void SubscriberSocket_ReceiveReady(object sender, NetMQSocketEventArgs e) {
        var data = e.Socket.ReceiveMultipartBytes(); // This line is never reached
        _publishSocket.SendMultipartBytes(data);
    }
}

public class MyNetMQClient {
    private readonly NetMQSocket _publishSocket;
    private readonly NetMQSocket _subscribeSocket;
    public MyNetMQClient(string publishAddress = ">tcp://localhost:9000", string subscribeAddress = ">tcp://localhost:9001") {
        _publishSocket = new PublisherSocket(publishAddress);
        _subscribeSocket = new SubscriberSocket(subscribeAddress);
        Task.Factory.StartNew(() => {
            while (true) {
                byte[] frameBytes = _subscribeSocket.ReceiveFrameBytes();
                int one = 1; // This line is never reached
            }
        });
    }
    public void Publish(byte[] data) {
        _publishSocket.SendFrame(data);
    }
}

测试

public class Tester {
    public void MyTester() {
        MyNetMQHost host = new MyNetMQHost();
        MyNetMQClient client = new MyNetMQClient();
        client.Publish(Encoding.Unicode.GetBytes("Hello world!");
    }
}

无法使NetMQ发布-订阅模式与ReceiveReady一起工作

你的经纪人和客户都不会调用subscribe。在代理上调用subscriber . subscribe(")为所有人订阅。在你的客户端订阅任何你想要的。

在代理中,应该实际使用XSubscriber和XPublisher来移动订阅。这样你就不需要订阅了。你可以使用Proxy类。