如何使用 ZeroMQ 接收数据

本文关键字:数据 ZeroMQ 何使用 | 更新日期: 2023-09-27 18:33:05

我有一个简单的应用程序来发送和接收数据。

ZSocketExample client = new ZSocketExample("127.0.0.1:5555");
client.send("test");

这是我的客户端类:

public class ZSocketExample:IDisposable
{
    public delegate void ReceiveEventHandler(object sender, SocketEventArgs e);
    public event ReceiveEventHandler ReceiveEvent;
    private ZmqContext zmqContext;
    private ZmqSocket zmqSocket;
    private string host;
    private bool isRunning;
    private bool disposed = false;
    public ZSocketExample(string host)
    {
        try
        {
            zmqContext = ZmqContext.Create();
            zmqSocket = zmqContext.CreateSocket(SocketType.DEALER);
            ZHelpers.SetID(zmqSocket, Encoding.UTF8);
            zmqSocket.Connect(host);
            this.isRunning = true;
            zmqSocket.ReceiveReady += new EventHandler<SocketEventArgs>(zmqSocket_ReceiveReady);
            zmqSocket.SendReady += new EventHandler<SocketEventArgs>(zmqSocket_SendReady);
            Poller poller = new Poller(new List<ZmqSocket> { zmqSocket });
            while (isRunning)
            {
                poller.Poll(TimeSpan.FromSeconds(5));
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
    }
    void zmqSocket_ReceiveReady(object sender, SocketEventArgs e)
    {
        Console.WriteLine("Receive Ready");
    }
    void zmqSocket_SendReady(object sender, SocketEventArgs e)
    {
        Console.WriteLine("Send Ready");
    }
    public void send(string msg)
    {
        zmqSocket.Send(msg, Encoding.UTF8);
        if (ReceiveEvent != null)
            ReceiveEvent(this, null);
    }
    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }
    protected virtual void Dispose(bool disposing)
    {
        if (disposed)
            return;
        if (disposing)
        {
            close();
        }
        disposed = true;
    }
    public void close()
    {
        isRunning = false;
        zmqSocket.Linger = TimeSpan.FromSeconds(1);
        zmqSocket.Close();
        zmqContext.Terminate();
    }
}

但不知何故,它不发送或接收。有人可以告诉我我做错了什么吗?此示例阻止了主应用程序。如何使其不阻塞?

如何使用 ZeroMQ 接收数据

ZeroMQ 套接字不是线程安全的,如果不使用某种同步,就不能从多个线程使用它。

在您的示例中,您调用 close 设置了一个变量,然后立即关闭套接字,这是错误的,您应该在退出 while 循环时关闭套接字。

关于接收/发送就绪,您很少需要注册发送就绪,发送就绪会通知您何时可以发送消息,如果您已连接,则在经销商插座中您将随时准备好发送(除非高水位已达到)。

当有消息准备好接收时,将调用接收就绪,如果另一端向您发送消息,则将调用接收就绪。

对于最后一部分,阻塞,你需要一个专用线程来处理 zeromq 套接字,你可以让一个线程使用轮询器处理多个套接字。