NetMQ 消息队列

NetMQ是一个轻量级的消息队列库,它支持多种消息传输模式,比如请求-响应、发布-订阅、推送-拉取等。我最喜欢它的一点是:不需要中间服务器,直接点对点通信,简单高效!

小贴士:NetMQ的性能非常棒,每秒可以处理数百万条消息,而且延迟极低,特别适合实时性要求高的应用。

Install-Package NetMQ

简单的请求-响应模式


// 服务端代码
using NetMQ;
using NetMQ.Sockets;

using (var responder = new ResponseSocket("@tcp://*:5555"))
{
    while (true)
    {
        string msg = responder.ReceiveFrameString();
        Console.WriteLine($"收到请求: {msg}");
        
        // 模拟处理请求
        Thread.Sleep(100);
        
        responder.SendFrame("收到!");
    }
}

// 客户端代码
using (var requester = new RequestSocket(">;tcp://localhost:5555"))
{
    for (int i = 0; i <; 10; i++)
    {
        Console.WriteLine($"发送请求 {i}...");
        requester.SendFrame($"Hello {i}");
        
        string reply = requester.ReceiveFrameString();
        Console.WriteLine($"收到回复: {reply}");
    }
}


发布-订阅模式


// 发布者代码
using (var publisher = new PublisherSocket())
{
    publisher.Bind("tcp://*:5563");
    
    while (true)
    {
        string message = $"新闻播报 {DateTime.Now}";
        publisher.SendFrame(message);
        Thread.Sleep(1000);
    }
}

// 订阅者代码
using (var subscriber = new SubscriberSocket())
{
    subscriber.Connect("tcp://localhost:5563");
    subscriber.Subscribe("");  // 订阅所有消息
    
    while (true)
    {
        string message = subscriber.ReceiveFrameString();
        Console.WriteLine($"收到消息: {message}");
    }
}

消息队列模式

// 任务分发者
using (var pushSocket = new PushSocket("@tcp://*:5557"))
{
    for (int i = 0; i <; 100; i++)
    {
        pushSocket.SendFrame($"任务 {i}");
    }
}

// 工作者
using (var pullSocket = new PullSocket(">;tcp://localhost:5557"))
{
    while (true)
    {
        string task = pullSocket.ReceiveFrameString();
        Console.WriteLine($"处理任务: {task}");
    }
}

使用Poller实现多路复用

using (var subscriber1 = new SubscriberSocket(">;tcp://localhost:5556"))
using (var subscriber2 = new SubscriberSocket(">;tcp://localhost:5557"))
{
    subscriber1.Subscribe("");
    subscriber2.Subscribe("");
    
    var poller = new NetMQPoller { subscriber1, subscriber2 };
    
    subscriber1.ReceiveReady += (s, a) =>;
    {
        string message = a.Socket.ReceiveFrameString();
        Console.WriteLine($"从源1收到: {message}");
    };
    
    subscriber2.ReceiveReady += (s, a) =>;
    {
        string message = a.Socket.ReceiveFrameString();
        Console.WriteLine($"从源2收到: {message}");
    };
    
    poller.Run();
}

备注 性能优化

使用长连接:避免频繁创建和销毁socket

批量处理:使用SendMoreFrame发送多帧消息

合理使用缓冲区:通过设置HWM(高水位标记)控制内存使用

注意事项:使用using语句确保资源正确释放,避免内存泄漏。在生产环境中要做好异常处理!

小伙伴们,今天的.NET学习之旅就到这里啦!记得动手敲代码,有问题随时在评论区问我哦。祝大家学习愉快,.NET学习节节高!


作者:spike

分类: Net

创作时间:2025-04-05

更新时间:2025-04-09

联系方式放在中括号之中例如[[email protected]],回复评论在开头加上标号例如:#1