Muta.MessageQueue.RabbitMq 1.0.0.1

There is a newer version of this package available.
See the version list below for details.
dotnet add package Muta.MessageQueue.RabbitMq --version 1.0.0.1
                    
NuGet\Install-Package Muta.MessageQueue.RabbitMq -Version 1.0.0.1
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Muta.MessageQueue.RabbitMq" Version="1.0.0.1" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Muta.MessageQueue.RabbitMq" Version="1.0.0.1" />
                    
Directory.Packages.props
<PackageReference Include="Muta.MessageQueue.RabbitMq" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Muta.MessageQueue.RabbitMq --version 1.0.0.1
                    
#r "nuget: Muta.MessageQueue.RabbitMq, 1.0.0.1"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Muta.MessageQueue.RabbitMq@1.0.0.1
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Muta.MessageQueue.RabbitMq&version=1.0.0.1
                    
Install as a Cake Addin
#tool nuget:?package=Muta.MessageQueue.RabbitMq&version=1.0.0.1
                    
Install as a Cake Tool

一个面向 .NET 的 RabbitMQ 异步消息队列封装库,提供统一、简洁、可扩展的消息发布与订阅 API,帮助业务快速接入消息驱动架构。

特性

  • 异步优先:基于 async/await,适配高并发场景
  • 统一接口抽象:通过 IMessageQueueService 屏蔽底层 MQ 实现细节
  • 支持字符串与泛型消息:开箱即用的 JSON 序列化/反序列化能力
  • 自动确认 / 手动确认:满足不同可靠性与业务控制需求
  • Dead Letter Queue(DLQ)支持:支持失败消息转入死信队列
  • 消费超时控制:支持单条消息处理超时取消
  • 连接自动恢复:断线重连、拓扑恢复(RabbitMQ Client Recovery)
  • 发布通道池化:降低频繁创建 Channel 的开销,提升吞吐
  • 线程安全设计:适用于多线程并发发布/消费场景

适用场景

  • 订单、支付、库存等业务解耦
  • 异步任务削峰填谷
  • 事件驱动架构(EDA)
  • 失败重试与死信处理
  • 微服务间轻量消息通信

快速开始

  1. 注入并连接 RabbitMqService
  2. 使用 PublishAsync 发布消息
  3. 使用 SubscribeAsyncSubscribeManualAckAsync 消费消息
  4. 按需使用 Ack/Nack/Reject 与 DLQ

设计目标

  • 简单可用:业务方无需关心 RabbitMQ 复杂细节
  • 稳定可靠:优先保证连接与消费链路稳定性
  • 性能平衡:在通用业务场景下提供良好吞吐表现

配置

appsettings.json 中添加:

{
  "MessageQueue": {
    "RabbitMQ": {
      "Hostname": "127.0.0.1",
      "Port": "5672",
      "Username": "guest",
      "Password": "guest"
    }
  }
}

注册服务(DI)

builder.Services.AddMutaRabbitMqMessageQueue();

使用方式

1)连接 MQ

public class MqHostedService : IHostedService
{
    private readonly IMessageQueueService _mq;

    public MqHostedService(IMessageQueueService mq)
    {
        _mq = mq;
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        await _mq.ConnectAsync(cancellationToken);
    }

    public async Task StopAsync(CancellationToken cancellationToken)
    {
        await _mq.CloseAsync(cancellationToken);
    }
}

2)发布消息

发布字符串消息
await _mq.PublishAsync("demo.topic", "hello rabbitmq");
发布对象消息(自动 JSON 序列化)
var order = new { OrderId = 1001, Amount = 88.5 };
await _mq.PublishAsync("order.created", order);

3)订阅消息(自动确认模式)

订阅字符串
await _mq.SubscribeAsync(
    "demo.topic",
    async (msg, ct) =>
    {
        Console.WriteLine($"收到消息: {msg}");
        return true; // true => Ack,false => Nack/Reject
    },
    maxConcurrency: 10,
    timeoutMs: 5000,
    useDeadLetterQueue: false);
订阅对象
public class OrderCreatedEvent
{
    public int OrderId { get; set; }
    public decimal Amount { get; set; }
}

await _mq.SubscribeAsync<OrderCreatedEvent>(
    "order.created",
    async (eventData, ct) =>
    {
        Console.WriteLine($"订单创建: {eventData.OrderId}, 金额: {eventData.Amount}");
        return true;
    },
    maxConcurrency: 10,
    timeoutMs: 5000);

注意:SubscribeAsync<string> 会按“原始字符串”处理,不做 JSON 反序列化。


4)手动确认模式(推荐关键业务使用)

await _mq.SubscribeManualAckAsync(
    "payment.topic",
    async (qm, ct) =>
    {
        try
        {
            Console.WriteLine($"收到: {qm.Body}");

            // 业务处理成功
            await _mq.AckAsync(qm.Topic, qm.DeliveryTag);
        }
        catch
        {
            // 失败重回队列
            await _mq.NackAsync(qm.Topic, qm.DeliveryTag, requeue: true);

            // 或丢弃/进死信:
            // await _mq.RejectAsync(qm.Topic, qm.DeliveryTag, requeue: false);
        }
    },
    prefetchCount: 50,
    useDeadLetterQueue: true);

5)取消订阅

await _mq.UnsubscribeAsync("demo.topic");

死信队列(DLQ)

发布或订阅时设置 useDeadLetterQueue: true,消费失败时可将消息投递至死信队列,便于后续排查与补偿处理。


最佳实践

  1. 普通文本消息:用 PublishAsync(string topic, string message) + SubscribeAsync(string...)
  2. 对象消息:用泛型发布/订阅,保证生产消费模型一致
  3. 关键业务建议用手动 Ack,确保“处理成功再确认”
  4. 不要吞异常,务必记录消费失败日志
  5. 合理设置 maxConcurrency/prefetchCount/timeoutMs

Product Compatible and additional computed target framework versions.
.NET net5.0 was computed.  net5.0-windows was computed.  net6.0 was computed.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
.NET Core netcoreapp3.0 was computed.  netcoreapp3.1 was computed. 
.NET Standard netstandard2.1 is compatible. 
MonoAndroid monoandroid was computed. 
MonoMac monomac was computed. 
MonoTouch monotouch was computed. 
Tizen tizen60 was computed. 
Xamarin.iOS xamarinios was computed. 
Xamarin.Mac xamarinmac was computed. 
Xamarin.TVOS xamarintvos was computed. 
Xamarin.WatchOS xamarinwatchos was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.0.0.3 99 4/29/2026
1.0.0.2 104 4/28/2026
1.0.0.1 97 4/28/2026