Smart.RabbitMQ
1.1.0
dotnet add package Smart.RabbitMQ --version 1.1.0
NuGet\Install-Package Smart.RabbitMQ -Version 1.1.0
<PackageReference Include="Smart.RabbitMQ" Version="1.1.0" />
<PackageVersion Include="Smart.RabbitMQ" Version="1.1.0" />
<PackageReference Include="Smart.RabbitMQ" />
paket add Smart.RabbitMQ --version 1.1.0
#r "nuget: Smart.RabbitMQ, 1.1.0"
#:package Smart.RabbitMQ@1.1.0
#addin nuget:?package=Smart.RabbitMQ&version=1.1.0
#tool nuget:?package=Smart.RabbitMQ&version=1.1.0
Smart.RabbitMQ
<a name="english"></a>
English
Smart.RabbitMQ is a high-level abstraction library built on top of RabbitMQ.Client, providing a simple, thread-safe, and feature-rich API for RabbitMQ message queue operations. It supports .NET 8, 9, and 10.
Features
- Simple API: Intuitive and easy-to-use interface for producing and consuming messages.
- Factory Pattern:
IRabbitMQFactoryfor creating producer and consumer instances. - Connection Sharing: All producers and consumers created by the same factory share a single connection, reducing resource overhead.
- Thread-Safe: All operations are designed for thread-safe concurrent access.
- Lazy Initialization: Channels are initialized on first use to avoid blocking constructors.
- Automatic Reconnection: Built-in connection recovery with configurable retry intervals.
- Multiple Exchange Types: Support for Direct, Fanout, Topic, and Headers exchanges.
- Message Acknowledgment: Flexible manual and automatic acknowledgment with error handling.
- Dependency Injection: Seamless integration with Microsoft.Extensions.DependencyInjection.
- JSON Serialization: Built-in JSON serialization with camelCase naming policy.
- Queue Binding Management: Automatic prevention of duplicate bindings.
- Configuration Validation: Built-in validation for RabbitMQ connection settings.
Installation
Install the package via NuGet:
dotnet add package Smart.RabbitMQ
Core Components
IRabbitMQFactory
Factory interface for creating producer and consumer instances. All instances share the same connection.
public interface IRabbitMQFactory : IAsyncDisposable
{
Task<SmartRabbitMQProducer> CreateProducerAsync();
Task<SmartRabbitMQConsumer<T>> CreateConsumerAsync<T>() where T : class;
Task<SmartRabbitMQConsumer> CreateConsumerAsync();
}
SmartRabbitMQProducer
Message producer with support for publishing messages to exchanges.
public class SmartRabbitMQProducer : IAsyncDisposable
{
public Task ExchangeDeclareAsync(string exchange, ExchangeType type = ExchangeType.Direct, bool durable = true, CancellationToken token = default);
public Task PublishAsync<T>(string exchange, string routingKey, T message, CancellationToken token = default);
public Task PublishAsync(string exchange, string routingKey, string message, CancellationToken token = default);
}
SmartRabbitMQConsumer<T>
Generic message consumer with event-driven message processing.
public class SmartRabbitMQConsumer<T> : IAsyncDisposable where T : class
{
public event AsyncMessageReceivedHandler<T> MessageReceived;
public Task ExchangeDeclareAsync(string exchange, ExchangeType type = ExchangeType.Direct, bool durable = true, CancellationToken token = default);
public Task<string> QueueDeclareAsync(string queue = "", bool durable = true, IDictionary<string, object?>? queueParams = null, CancellationToken token = default);
public Task QueueBindAsync(string queue, string exchange, string routingKey, CancellationToken token = default);
public Task BasicQosAsync(ushort prefetchCount = 0, CancellationToken token = default);
public Task<string> StartConsumeAsync(bool autoAck = false, bool multiple = false, bool requeueOnFailure = true, CancellationToken token = default);
public Task CancelConsumeAsync(string consumerTag);
}
Configuration
RabbitMqOption
Configuration options for RabbitMQ connection.
public class RabbitMqOption
{
public string? ClientName { get; set; }
public string HostName { get; set; } = "localhost";
public int Port { get; set; } = 5672;
public string UserName { get; set; } = "guest";
public string Password { get; set; } = "guest";
public string VirtualHost { get; set; } = "/";
}
Usage Examples
1. Basic Producer
using Smart.RabbitMQ;
var options = new RabbitMqOption
{
HostName = "localhost",
Port = 5672,
UserName = "guest",
Password = "guest",
VirtualHost = "/"
};
var factory = new SmartRabbitMQFactory(options);
var producer = await factory.CreateProducerAsync();
// Declare exchange
await producer.ExchangeDeclareAsync("my.exchange", ExchangeType.Direct);
// Publish message
var message = new { Id = 1, Content = "Hello RabbitMQ" };
await producer.PublishAsync("my.exchange", "my.routing.key", message);
// Dispose producer (connection is managed by factory)
await producer.DisposeAsync();
// Dispose factory when done (closes shared connection)
await factory.DisposeAsync();
2. Basic Consumer
using Smart.RabbitMQ;
var options = new RabbitMqOption
{
HostName = "localhost",
Port = 5672,
UserName = "guest",
Password = "guest",
VirtualHost = "/"
};
var factory = new SmartRabbitMQFactory(options);
var consumer = await factory.CreateConsumerAsync<MyMessage>();
// Subscribe to message event
consumer.MessageReceived += async (sender, args) =>
{
Console.WriteLine($"Received message: {args.Data?.Content}");
Console.WriteLine($"Delivery Tag: {args.DeliveryTag}");
Console.WriteLine($"Exchange: {args.Exchange}");
Console.WriteLine($"Routing Key: {args.RoutingKey}");
};
// Declare exchange
await consumer.ExchangeDeclareAsync("my.exchange", ExchangeType.Direct);
// Declare queue
var queueName = await consumer.QueueDeclareAsync("my.queue", durable: true);
// Bind queue to exchange
await consumer.QueueBindAsync(queueName, "my.exchange", "my.routing.key");
// Set QoS (prefetch count)
await consumer.BasicQosAsync(prefetchCount: 10);
// Start consuming
var consumerTag = await consumer.StartConsumeAsync(autoAck: false);
// Cancel consuming when done
await consumer.CancelConsumeAsync(consumerTag);
// Dispose consumer (connection is managed by factory)
await consumer.DisposeAsync();
// Dispose factory when done (closes shared connection)
await factory.DisposeAsync();
3. Dependency Injection Integration
using Microsoft.Extensions.DependencyInjection;
using Smart.RabbitMQ;
// Configure services
var services = new ServiceCollection();
var rabbitMQOptions = new RabbitMqOption
{
HostName = "localhost",
Port = 5672,
UserName = "guest",
Password = "guest",
VirtualHost = "/"
};
services.AddSmartRabbitMQ(rabbitMQOptions);
// Build service provider
var serviceProvider = services.BuildServiceProvider();
// Get factory
var factory = serviceProvider.GetRequiredService<IRabbitMQFactory>();
// Create producer
var producer = await factory.CreateProducerAsync();
// Create consumer
var consumer = await factory.CreateConsumerAsync<MyMessage>();
// Or get producer/consumer directly (pre-created singletons)
var producer2 = serviceProvider.GetRequiredService<SmartRabbitMQProducer>();
var consumer2 = serviceProvider.GetRequiredService<SmartRabbitMQConsumer<MyMessage>>();
4. String Message Consumer
using Smart.RabbitMQ;
var factory = new SmartRabbitMQFactory(options);
var consumer = await factory.CreateConsumerAsync(); // String consumer
consumer.MessageReceived += async (sender, args) =>
{
Console.WriteLine($"Received string message: {args.Data}");
};
await consumer.ExchangeDeclareAsync("my.exchange");
var queueName = await consumer.QueueDeclareAsync("my.queue");
await consumer.QueueBindAsync(queueName, "my.exchange", "my.routing.key");
await consumer.StartConsumeAsync();
5. Advanced Consumer with Error Handling
using Smart.RabbitMQ;
var consumer = await factory.CreateConsumerAsync<MyMessage>();
consumer.MessageReceived += async (sender, args) =>
{
try
{
// Process message
await ProcessMessageAsync(args.Data);
// If successful, message will be automatically acknowledged
Console.WriteLine("Message processed successfully");
}
catch (Exception ex)
{
// If exception occurs, message will be rejected and requeued
Console.Error.WriteLine($"Error processing message: {ex.Message}");
throw; // Re-throw to trigger Nack
}
};
await consumer.StartConsumeAsync(autoAck: false);
6. Queue Parameters
using Smart.RabbitMQ;
var queueParams = new Dictionary<string, object?>
{
["x-message-ttl"] = 60000, // Message TTL: 60 seconds
["x-max-length"] = 10000, // Max queue length
["x-expires"] = 3600000, // Queue expires after 1 hour
["x-dead-letter-exchange"] = "dlx" // Dead letter exchange
};
var queueName = await consumer.QueueDeclareAsync(
queue: "my.queue",
durable: true,
queueParams: queueParams
);
7. Exchange Types
using Smart.RabbitMQ;
// Direct Exchange (Point-to-Point)
await producer.ExchangeDeclareAsync("direct.exchange", ExchangeType.Direct);
// Fanout Exchange (Broadcast)
await producer.ExchangeDeclareAsync("fanout.exchange", ExchangeType.Fanout);
// Topic Exchange (Pattern Matching)
await producer.ExchangeDeclareAsync("topic.exchange", ExchangeType.Topic);
// Headers Exchange (Header-based Routing)
await producer.ExchangeDeclareAsync("headers.exchange", ExchangeType.Headers);
Best Practices
Use Manual Acknowledgment: Set
autoAck: falsefor better control over message processing and error handling.Set Prefetch Count: Use
BasicQosAsyncto limit the number of unacknowledged messages and prevent consumer overload.
await consumer.BasicQosAsync(prefetchCount: 10);
- Dispose Resources: Dispose producer and consumer instances when done. The factory manages the shared connection.
await using var producer = await factory.CreateProducerAsync();
await using var consumer = await factory.CreateConsumerAsync<MyMessage>();
// When completely done, dispose the factory to close the connection
await factory.DisposeAsync();
Handle Exceptions: Always wrap message processing in try-catch blocks to handle errors gracefully.
Use Durable Queues: Set
durable: truefor production environments to survive RabbitMQ restarts.Validate Configuration: The library automatically validates configuration, but ensure your settings are correct before use.
Event Arguments
MessageReceivedArgs<T>
Arguments passed to the message received event.
public class MessageReceivedArgs<T> : EventArgs
{
public string? ConsumerTag { get; init; }
public ulong DeliveryTag { get; init; }
public string? Exchange { get; init; }
public string? RoutingKey { get; init; }
public T? Data { get; }
}
Project Structure
Smart.RabbitMQ/
├── SmartRabbitMQFactory.cs - Factory for creating producers and consumers
├── SmartRabbitMQProducer.cs - Message producer implementation
├── SmartRabbitMQConsumer.cs - Generic message consumer implementation
├── SerializeConvert.cs - JSON serialization utilities
├── RabbitMqOption.cs - Configuration options
├── MessageReceivedArgs.cs - Event argument classes
├── ExchangeType.cs - Exchange type enumeration
├── IRabbitMQFactory.cs - Factory interface
└── RabbitMQServiceExtensions.cs - Dependency injection extensions
Supported .NET Versions
- .NET 6
- .NET 8
- .NET 9
- .NET 10
<a name="chinese"></a>
中文
Smart.RabbitMQ 是一个基于 RabbitMQ.Client 构建的高级抽象库,为 RabbitMQ 消息队列操作提供了简单、线程安全且功能丰富的 API。它支持 .NET 8, 9 和 10。
功能特性
- 简单易用的 API: 直观且易于使用的生产者和消费消息接口。
- 工厂模式:
IRabbitMQFactory用于创建生产者和消费者实例。 - 连接共享: 同一工厂创建的所有生产者和消费者共享同一个连接,减少资源开销。
- 线程安全: 所有操作都设计为线程安全的并发访问。
- 延迟初始化: 通道在首次使用时初始化,避免阻塞构造函数。
- 自动重连: 内置连接恢复功能,支持可配置的重试间隔。
- 多种交换机类型: 支持 Direct、Fanout、Topic 和 Headers 交换机。
- 消息确认: 灵活的手动和自动确认机制,带有错误处理。
- 依赖注入: 与 Microsoft.Extensions.DependencyInjection 无缝集成。
- JSON 序列化: 内置 JSON 序列化,使用 camelCase 命名策略。
- 队列绑定管理: 自动防止重复绑定。
- 配置验证: 内置 RabbitMQ 连接设置的验证功能。
安装
通过 NuGet 安装:
dotnet add package Smart.RabbitMQ
核心组件
IRabbitMQFactory
用于创建生产者和消费者实例的工厂接口。所有实例共享同一个连接。
public interface IRabbitMQFactory : IAsyncDisposable
{
Task<SmartRabbitMQProducer> CreateProducerAsync();
Task<SmartRabbitMQConsumer<T>> CreateConsumerAsync<T>() where T : class;
Task<SmartRabbitMQConsumer> CreateConsumerAsync();
}
SmartRabbitMQProducer
消息生产者,支持向交换机发布消息。
public class SmartRabbitMQProducer : IAsyncDisposable
{
public Task ExchangeDeclareAsync(string exchange, ExchangeType type = ExchangeType.Direct, bool durable = true, CancellationToken token = default);
public Task PublishAsync<T>(string exchange, string routingKey, T message, CancellationToken token = default);
public Task PublishAsync(string exchange, string routingKey, string message, CancellationToken token = default);
}
SmartRabbitMQConsumer<T>
泛型消息消费者,支持事件驱动的消息处理。
public class SmartRabbitMQConsumer<T> : IAsyncDisposable where T : class
{
public event AsyncMessageReceivedHandler<T> MessageReceived;
public Task ExchangeDeclareAsync(string exchange, ExchangeType type = ExchangeType.Direct, bool durable = true, CancellationToken token = default);
public Task<string> QueueDeclareAsync(string queue = "", bool durable = true, IDictionary<string, object?>? queueParams = null, CancellationToken token = default);
public Task QueueBindAsync(string queue, string exchange, string routingKey, CancellationToken token = default);
public Task BasicQosAsync(ushort prefetchCount = 0, CancellationToken token = default);
public Task<string> StartConsumeAsync(bool autoAck = false, bool multiple = false, bool requeueOnFailure = true, CancellationToken token = default);
public Task CancelConsumeAsync(string consumerTag);
}
配置
RabbitMqOption
RabbitMQ 连接配置选项。
public class RabbitMqOption
{
public string? ClientName { get; set; }
public string HostName { get; set; } = "localhost";
public int Port { get; set; } = 5672;
public string UserName { get; set; } = "guest";
public string Password { get; set; } = "guest";
public string VirtualHost { get; set; } = "/";
}
使用示例
1. 基础生产者
using Smart.RabbitMQ;
var options = new RabbitMqOption
{
HostName = "localhost",
Port = 5672,
UserName = "guest",
Password = "guest",
VirtualHost = "/"
};
var factory = new SmartRabbitMQFactory(options);
var producer = await factory.CreateProducerAsync();
// 声明交换机
await producer.ExchangeDeclareAsync("my.exchange", ExchangeType.Direct);
// 发布消息
var message = new { Id = 1, Content = "Hello RabbitMQ" };
await producer.PublishAsync("my.exchange", "my.routing.key", message);
// 释放生产者(连接由工厂管理)
await producer.DisposeAsync();
// 完成后释放工厂(关闭共享连接)
await factory.DisposeAsync();
2. 基础消费者
using Smart.RabbitMQ;
var options = new RabbitMqOption
{
HostName = "localhost",
Port = 5672,
UserName = "guest",
Password = "guest",
VirtualHost = "/"
};
var factory = new SmartRabbitMQFactory(options);
var consumer = await factory.CreateConsumerAsync<MyMessage>();
// 订阅消息事件
consumer.MessageReceived += async (sender, args) =>
{
Console.WriteLine($"收到消息: {args.Data?.Content}");
Console.WriteLine($"投递标签: {args.DeliveryTag}");
Console.WriteLine($"交换机: {args.Exchange}");
Console.WriteLine($"路由键: {args.RoutingKey}");
};
// 声明交换机
await consumer.ExchangeDeclareAsync("my.exchange", ExchangeType.Direct);
// 声明队列
var queueName = await consumer.QueueDeclareAsync("my.queue", durable: true);
// 绑定队列到交换机
await consumer.QueueBindAsync(queueName, "my.exchange", "my.routing.key");
// 设置 QoS(预取计数)
await consumer.BasicQosAsync(prefetchCount: 10);
// 开始消费
var consumerTag = await consumer.StartConsumeAsync(autoAck: false);
// 完成后取消消费
await consumer.CancelConsumeAsync(consumerTag);
// 释放消费者(连接由工厂管理)
await consumer.DisposeAsync();
// 完成后释放工厂(关闭共享连接)
await factory.DisposeAsync();
3. 依赖注入集成
using Microsoft.Extensions.DependencyInjection;
using Smart.RabbitMQ;
// 配置服务
var services = new ServiceCollection();
var rabbitMQOptions = new RabbitMqOption
{
HostName = "localhost",
Port = 5672,
UserName = "guest",
Password = "guest",
VirtualHost = "/"
};
services.AddSmartRabbitMQ(rabbitMQOptions);
// 构建服务提供者
var serviceProvider = services.BuildServiceProvider();
// 获取工厂
var factory = serviceProvider.GetRequiredService<IRabbitMQFactory>();
// 创建生产者
var producer = await factory.CreateProducerAsync();
// 创建消费者
var consumer = await factory.CreateConsumerAsync<MyMessage>();
// 或者直接获取生产者/消费者(预先创建的单例)
var producer2 = serviceProvider.GetRequiredService<SmartRabbitMQProducer>();
var consumer2 = serviceProvider.GetRequiredService<SmartRabbitMQConsumer<MyMessage>>();
4. 字符串消息消费者
using Smart.RabbitMQ;
var factory = new SmartRabbitMQFactory(options);
var consumer = await factory.CreateConsumerAsync(); // 字符串消费者
consumer.MessageReceived += async (sender, args) =>
{
Console.WriteLine($"收到字符串消息: {args.Data}");
};
await consumer.ExchangeDeclareAsync("my.exchange");
var queueName = await consumer.QueueDeclareAsync("my.queue");
await consumer.QueueBindAsync(queueName, "my.exchange", "my.routing.key");
await consumer.StartConsumeAsync();
5. 高级消费者与错误处理
using Smart.RabbitMQ;
var consumer = await factory.CreateConsumerAsync<MyMessage>();
consumer.MessageReceived += async (sender, args) =>
{
try
{
// 处理消息
await ProcessMessageAsync(args.Data);
// 如果成功,消息将被自动确认
Console.WriteLine("消息处理成功");
}
catch (Exception ex)
{
// 如果发生异常,消息将被拒绝并重新入队
Console.Error.WriteLine($"处理消息时出错: {ex.Message}");
throw; // 重新抛出以触发 Nack
}
};
await consumer.StartConsumeAsync(autoAck: false);
6. 队列参数
using Smart.RabbitMQ;
var queueParams = new Dictionary<string, object?>
{
["x-message-ttl"] = 60000, // 消息 TTL: 60 秒
["x-max-length"] = 10000, // 队列最大长度
["x-expires"] = 3600000, // 队列在 1 小时后过期
["x-dead-letter-exchange"] = "dlx" // 死信交换机
};
var queueName = await consumer.QueueDeclareAsync(
queue: "my.queue",
durable: true,
queueParams: queueParams
);
7. 交换机类型
using Smart.RabbitMQ;
// 直连交换机(点对点)
await producer.ExchangeDeclareAsync("direct.exchange", ExchangeType.Direct);
// 扇形交换机(广播)
await producer.ExchangeDeclareAsync("fanout.exchange", ExchangeType.Fanout);
// 主题交换机(模式匹配)
await producer.ExchangeDeclareAsync("topic.exchange", ExchangeType.Topic);
// 头交换机(基于头的路由)
await producer.ExchangeDeclareAsync("headers.exchange", ExchangeType.Headers);
最佳实践
使用手动确认: 设置
autoAck: false以更好地控制消息处理和错误处理。设置预取计数: 使用
BasicQosAsync限制未确认消息的数量,防止消费者过载。
await consumer.BasicQosAsync(prefetchCount: 10);
- 释放资源: 使用完毕后释放生产者和消费者实例。工厂管理共享连接。
await using var producer = await factory.CreateProducerAsync();
await using var consumer = await factory.CreateConsumerAsync<MyMessage>();
// 完全完成后,释放工厂以关闭连接
await factory.DisposeAsync();
处理异常: 始终在 try-catch 块中包装消息处理,以优雅地处理错误。
使用持久化队列: 在生产环境中设置
durable: true以在 RabbitMQ 重启后存活。验证配置: 库会自动验证配置,但请确保在使用前设置正确。
事件参数
MessageReceivedArgs<T>
传递给消息接收事件的参数。
public class MessageReceivedArgs<T> : EventArgs
{
public string? ConsumerTag { get; init; }
public ulong DeliveryTag { get; init; }
public string? Exchange { get; init; }
public string? RoutingKey { get; init; }
public T? Data { get; }
}
项目结构
Smart.RabbitMQ/
├── SmartRabbitMQFactory.cs - 用于创建生产者和消费者的工厂
├── SmartRabbitMQProducer.cs - 消息生产者实现
├── SmartRabbitMQConsumer.cs - 泛型消息消费者实现
├── SerializeConvert.cs - JSON 序列化工具
├── RabbitMqOption.cs - 配置选项
├── MessageReceivedArgs.cs - 事件参数类
├── ExchangeType.cs - 交换机类型枚举
├── IRabbitMQFactory.cs - 工厂接口
└── RabbitMQServiceExtensions.cs - 依赖注入扩展
支持的 .NET 版本
- .NET 8
- .NET 9
- .NET 10
Developed by zenglei
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Microsoft.Extensions.DependencyInjection (>= 9.0.0)
- RabbitMQ.Client (>= 7.2.0)
-
net8.0
- Microsoft.Extensions.DependencyInjection (>= 9.0.0)
- RabbitMQ.Client (>= 7.2.0)
-
net9.0
- Microsoft.Extensions.DependencyInjection (>= 9.0.0)
- RabbitMQ.Client (>= 7.2.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 1.1.0 | 93 | 2/17/2026 |
| 1.0.0 | 95 | 2/14/2026 |
| 1.0.0-beta.14 | 53 | 2/11/2026 |
| 1.0.0-beta.13 | 54 | 2/10/2026 |
| 1.0.0-beta.12 | 65 | 12/30/2025 |
| 1.0.0-beta.11 | 138 | 4/5/2025 |
| 1.0.0-beta.10 | 194 | 3/20/2025 |
| 1.0.0-beta.9 | 206 | 3/17/2025 |
| 1.0.0-beta.8 | 172 | 3/16/2025 |
| 1.0.0-beta.7 | 183 | 3/16/2025 |
| 1.0.0-beta.6 | 178 | 3/16/2025 |
| 1.0.0-beta.5 | 180 | 3/16/2025 |
| 1.0.0-beta.4 | 225 | 3/4/2025 |
| 1.0.0-beta.3 | 228 | 3/4/2025 |
| 1.0.0-beta.2 | 229 | 3/3/2025 |
| 1.0.0-beta.1 | 155 | 3/2/2025 |