Muta.MessageQueue.RabbitMq
1.0.0.1
There is a newer version of this package available.
See the version list below for details.
See the version list below for details.
dotnet add package Muta.MessageQueue.RabbitMq --version 1.0.0.1
NuGet\Install-Package Muta.MessageQueue.RabbitMq -Version 1.0.0.1
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Muta.MessageQueue.RabbitMq" Version="1.0.0.1" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Muta.MessageQueue.RabbitMq" Version="1.0.0.1" />
<PackageReference Include="Muta.MessageQueue.RabbitMq" />
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Muta.MessageQueue.RabbitMq --version 1.0.0.1
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: Muta.MessageQueue.RabbitMq, 1.0.0.1"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Muta.MessageQueue.RabbitMq@1.0.0.1
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Muta.MessageQueue.RabbitMq&version=1.0.0.1
#tool nuget:?package=Muta.MessageQueue.RabbitMq&version=1.0.0.1
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
一个面向 .NET 的 RabbitMQ 异步消息队列封装库,提供统一、简洁、可扩展的消息发布与订阅 API,帮助业务快速接入消息驱动架构。
特性
- ✅ 异步优先:基于
async/await,适配高并发场景 - ✅ 统一接口抽象:通过
IMessageQueueService屏蔽底层 MQ 实现细节 - ✅ 支持字符串与泛型消息:开箱即用的 JSON 序列化/反序列化能力
- ✅ 自动确认 / 手动确认:满足不同可靠性与业务控制需求
- ✅ Dead Letter Queue(DLQ)支持:支持失败消息转入死信队列
- ✅ 消费超时控制:支持单条消息处理超时取消
- ✅ 连接自动恢复:断线重连、拓扑恢复(RabbitMQ Client Recovery)
- ✅ 发布通道池化:降低频繁创建 Channel 的开销,提升吞吐
- ✅ 线程安全设计:适用于多线程并发发布/消费场景
适用场景
- 订单、支付、库存等业务解耦
- 异步任务削峰填谷
- 事件驱动架构(EDA)
- 失败重试与死信处理
- 微服务间轻量消息通信
快速开始
- 注入并连接
RabbitMqService - 使用
PublishAsync发布消息 - 使用
SubscribeAsync或SubscribeManualAckAsync消费消息 - 按需使用
Ack/Nack/Reject与 DLQ
设计目标
- 简单可用:业务方无需关心 RabbitMQ 复杂细节
- 稳定可靠:优先保证连接与消费链路稳定性
- 性能平衡:在通用业务场景下提供良好吞吐表现
配置
在 appsettings.json 中添加:
{
"MessageQueue": {
"RabbitMQ": {
"Hostname": "127.0.0.1",
"Port": "5672",
"Username": "guest",
"Password": "guest"
}
}
}
注册服务(DI)
builder.Services.AddMutaRabbitMqMessageQueue();
使用方式
1)连接 MQ
public class MqHostedService : IHostedService
{
private readonly IMessageQueueService _mq;
public MqHostedService(IMessageQueueService mq)
{
_mq = mq;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await _mq.ConnectAsync(cancellationToken);
}
public async Task StopAsync(CancellationToken cancellationToken)
{
await _mq.CloseAsync(cancellationToken);
}
}
2)发布消息
发布字符串消息
await _mq.PublishAsync("demo.topic", "hello rabbitmq");
发布对象消息(自动 JSON 序列化)
var order = new { OrderId = 1001, Amount = 88.5 };
await _mq.PublishAsync("order.created", order);
3)订阅消息(自动确认模式)
订阅字符串
await _mq.SubscribeAsync(
"demo.topic",
async (msg, ct) =>
{
Console.WriteLine($"收到消息: {msg}");
return true; // true => Ack,false => Nack/Reject
},
maxConcurrency: 10,
timeoutMs: 5000,
useDeadLetterQueue: false);
订阅对象
public class OrderCreatedEvent
{
public int OrderId { get; set; }
public decimal Amount { get; set; }
}
await _mq.SubscribeAsync<OrderCreatedEvent>(
"order.created",
async (eventData, ct) =>
{
Console.WriteLine($"订单创建: {eventData.OrderId}, 金额: {eventData.Amount}");
return true;
},
maxConcurrency: 10,
timeoutMs: 5000);
注意:
SubscribeAsync<string>会按“原始字符串”处理,不做 JSON 反序列化。
4)手动确认模式(推荐关键业务使用)
await _mq.SubscribeManualAckAsync(
"payment.topic",
async (qm, ct) =>
{
try
{
Console.WriteLine($"收到: {qm.Body}");
// 业务处理成功
await _mq.AckAsync(qm.Topic, qm.DeliveryTag);
}
catch
{
// 失败重回队列
await _mq.NackAsync(qm.Topic, qm.DeliveryTag, requeue: true);
// 或丢弃/进死信:
// await _mq.RejectAsync(qm.Topic, qm.DeliveryTag, requeue: false);
}
},
prefetchCount: 50,
useDeadLetterQueue: true);
5)取消订阅
await _mq.UnsubscribeAsync("demo.topic");
死信队列(DLQ)
发布或订阅时设置 useDeadLetterQueue: true,消费失败时可将消息投递至死信队列,便于后续排查与补偿处理。
最佳实践
- 普通文本消息:用
PublishAsync(string topic, string message)+SubscribeAsync(string...) - 对象消息:用泛型发布/订阅,保证生产消费模型一致
- 关键业务建议用手动 Ack,确保“处理成功再确认”
- 不要吞异常,务必记录消费失败日志
- 合理设置
maxConcurrency/prefetchCount/timeoutMs
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net5.0 was computed. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
| .NET Core | netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
| .NET Standard | netstandard2.1 is compatible. |
| MonoAndroid | monoandroid was computed. |
| MonoMac | monomac was computed. |
| MonoTouch | monotouch was computed. |
| Tizen | tizen60 was computed. |
| Xamarin.iOS | xamarinios was computed. |
| Xamarin.Mac | xamarinmac was computed. |
| Xamarin.TVOS | xamarintvos was computed. |
| Xamarin.WatchOS | xamarinwatchos was computed. |
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
-
.NETStandard 2.1
- Microsoft.Extensions.Configuration (>= 10.0.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.0)
- Newtonsoft.Json (>= 13.0.4)
- RabbitMQ.Client (>= 7.2.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.