Smart.RabbitMQ 1.0.0-beta.14

This is a prerelease version of Smart.RabbitMQ.
There is a newer version of this package available.
See the version list below for details.
dotnet add package Smart.RabbitMQ --version 1.0.0-beta.14
                    
NuGet\Install-Package Smart.RabbitMQ -Version 1.0.0-beta.14
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Smart.RabbitMQ" Version="1.0.0-beta.14" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Smart.RabbitMQ" Version="1.0.0-beta.14" />
                    
Directory.Packages.props
<PackageReference Include="Smart.RabbitMQ" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Smart.RabbitMQ --version 1.0.0-beta.14
                    
#r "nuget: Smart.RabbitMQ, 1.0.0-beta.14"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Smart.RabbitMQ@1.0.0-beta.14
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Smart.RabbitMQ&version=1.0.0-beta.14&prerelease
                    
Install as a Cake Addin
#tool nuget:?package=Smart.RabbitMQ&version=1.0.0-beta.14&prerelease
                    
Install as a Cake Tool

Smart.RabbitMQ

NuGet

English | 中文

<a name="english"></a>

English

Smart.RabbitMQ is a high-level abstraction library built on top of RabbitMQ.Client, providing a simple, thread-safe, and feature-rich API for RabbitMQ message queue operations. It supports .NET 8, 9, and 10.

Features

  • Simple API: Intuitive and easy-to-use interface for producing and consuming messages.
  • Factory Pattern: IRabbitMQFactory for creating producer and consumer instances.
  • Thread-Safe: All operations are designed for thread-safe concurrent access.
  • Lazy Initialization: Connections are initialized on first use to avoid blocking constructors.
  • Automatic Reconnection: Built-in connection recovery with configurable retry intervals.
  • Multiple Exchange Types: Support for Direct, Fanout, Topic, and Headers exchanges.
  • Message Acknowledgment: Flexible manual and automatic acknowledgment with error handling.
  • Dependency Injection: Seamless integration with Microsoft.Extensions.DependencyInjection.
  • JSON Serialization: Built-in JSON serialization with camelCase naming policy.
  • Queue Binding Management: Automatic prevention of duplicate bindings.
  • Configuration Validation: Built-in validation for RabbitMQ connection settings.

Installation

Install the package via NuGet:

dotnet add package Smart.RabbitMQ

Core Components

IRabbitMQFactory

Factory interface for creating producer and consumer instances.

public interface IRabbitMQFactory
{
    SmartRabbitMQProducer CreateProducer();
    SmartRabbitMQConsumer<T> CreateConsumer<T>() where T : class;
    SmartRabbitMQConsumer CreateConsumer();
}
SmartRabbitMQProducer

Message producer with support for publishing messages to exchanges.

public class SmartRabbitMQProducer : IAsyncDisposable
{
    public Task ExchangeDeclareAsync(string exchange, ExchangeType type = ExchangeType.Direct, bool durable = true, CancellationToken token = default);
    public Task PublishAsync<T>(string exchange, string routingKey, T message, CancellationToken token = default);
    public Task PublishAsync(string exchange, string routingKey, string message, CancellationToken token = default);
}
SmartRabbitMQConsumer<T>

Generic message consumer with event-driven message processing.

public class SmartRabbitMQConsumer<T> : IAsyncDisposable where T : class
{
    public event AsyncMessageReceivedHandler<T> MessageReceived;
    
    public Task ExchangeDeclareAsync(string exchange, ExchangeType type = ExchangeType.Direct, bool durable = true, CancellationToken token = default);
    public Task<string> QueueDeclareAsync(string queue = "", bool durable = true, IDictionary<string, object?>? queueParams = null, CancellationToken token = default);
    public Task QueueBindAsync(string queue, string exchange, string routingKey, CancellationToken token = default);
    public Task BasicQosAsync(ushort prefetchCount = 0, CancellationToken token = default);
    public Task<string> StartConsumeAsync(bool autoAck = false, bool multiple = false, CancellationToken token = default);
    public Task CancelConsumeAsync(string consumerTag);
}

Configuration

RabbitMQOptions

Configuration options for RabbitMQ connection.

public class RabbitMQOptions
{
    public string? ClientName { get; set; }
    public string HostName { get; set; } = "localhost";
    public int Port { get; set; } = 5672;
    public string UserName { get; set; } = "guest";
    public string Password { get; set; } = "guest";
    public string VirtualHost { get; set; } = "/";
}

Usage Examples

1. Basic Producer
using Smart.RabbitMQ;

var options = new RabbitMQOptions
{
    HostName = "localhost",
    Port = 5672,
    UserName = "guest",
    Password = "guest",
    VirtualHost = "/"
};

var factory = new SmartRabbitMQFactory(options);
var producer = factory.CreateProducer();

// Declare exchange
await producer.ExchangeDeclareAsync("my.exchange", ExchangeType.Direct);

// Publish message
var message = new { Id = 1, Content = "Hello RabbitMQ" };
await producer.PublishAsync("my.exchange", "my.routing.key", message);

// Dispose
await producer.DisposeAsync();
2. Basic Consumer
using Smart.RabbitMQ;

var options = new RabbitMQOptions
{
    HostName = "localhost",
    Port = 5672,
    UserName = "guest",
    Password = "guest",
    VirtualHost = "/"
};

var factory = new SmartRabbitMQFactory(options);
var consumer = factory.CreateConsumer<MyMessage>();

// Subscribe to message event
consumer.MessageReceived += async (sender, args) =>
{
    Console.WriteLine($"Received message: {args.Data.Content}");
    Console.WriteLine($"Delivery Tag: {args.DeliveryTag}");
    Console.WriteLine($"Exchange: {args.Exchange}");
    Console.WriteLine($"Routing Key: {args.RoutingKey}");
};

// Declare exchange
await consumer.ExchangeDeclareAsync("my.exchange", ExchangeType.Direct);

// Declare queue
var queueName = await consumer.QueueDeclareAsync("my.queue", durable: true);

// Bind queue to exchange
await consumer.QueueBindAsync(queueName, "my.exchange", "my.routing.key");

// Set QoS (prefetch count)
await consumer.BasicQosAsync(prefetchCount: 10);

// Start consuming
var consumerTag = await consumer.StartConsumeAsync(autoAck: false);

// Cancel consuming when done
await consumer.CancelConsumeAsync(consumerTag);

// Dispose
await consumer.DisposeAsync();
3. Dependency Injection Integration
using Microsoft.Extensions.DependencyInjection;
using Smart.RabbitMQ;

// Configure services
var services = new ServiceCollection();

var rabbitMQOptions = new RabbitMQOptions
{
    HostName = "localhost",
    Port = 5672,
    UserName = "guest",
    Password = "guest",
    VirtualHost = "/"
};

services.AddSmartRabbitMQ(rabbitMQOptions);

// Build service provider
var serviceProvider = services.BuildServiceProvider();

// Get factory
var factory = serviceProvider.GetRequiredService<IRabbitMQFactory>();

// Create producer
var producer = factory.CreateProducer();

// Create consumer
var consumer = factory.CreateConsumer<MyMessage>();

// Or get producer/consumer directly
var producer2 = serviceProvider.GetRequiredService<SmartRabbitMQProducer>();
var consumer2 = serviceProvider.GetRequiredService<SmartRabbitMQConsumer<MyMessage>>();
4. String Message Consumer
using Smart.RabbitMQ;

var factory = new SmartRabbitMQFactory(options);
var consumer = factory.CreateConsumer(); // String consumer

consumer.MessageReceived += async (sender, args) =>
{
    Console.WriteLine($"Received string message: {args.Data}");
};

await consumer.ExchangeDeclareAsync("my.exchange");
var queueName = await consumer.QueueDeclareAsync("my.queue");
await consumer.QueueBindAsync(queueName, "my.exchange", "my.routing.key");
await consumer.StartConsumeAsync();
5. Advanced Consumer with Error Handling
using Smart.RabbitMQ;

var consumer = factory.CreateConsumer<MyMessage>();

consumer.MessageReceived += async (sender, args) =>
{
    try
    {
        // Process message
        await ProcessMessageAsync(args.Data);
        
        // If successful, message will be automatically acknowledged
        Console.WriteLine("Message processed successfully");
    }
    catch (Exception ex)
    {
        // If exception occurs, message will be rejected and requeued
        Console.Error.WriteLine($"Error processing message: {ex.Message}");
        throw; // Re-throw to trigger Nack
    }
};

await consumer.StartConsumeAsync(autoAck: false);
6. Queue Parameters
using Smart.RabbitMQ;

var queueParams = new Dictionary<string, object?>
{
    ["x-message-ttl"] = 60000,           // Message TTL: 60 seconds
    ["x-max-length"] = 10000,             // Max queue length
    ["x-expires"] = 3600000,               // Queue expires after 1 hour
    ["x-dead-letter-exchange"] = "dlx"    // Dead letter exchange
};

var queueName = await consumer.QueueDeclareAsync(
    queue: "my.queue",
    durable: true,
    queueParams: queueParams
);
7. Exchange Types
using Smart.RabbitMQ;

// Direct Exchange (Point-to-Point)
await producer.ExchangeDeclareAsync("direct.exchange", ExchangeType.Direct);

// Fanout Exchange (Broadcast)
await producer.ExchangeDeclareAsync("fanout.exchange", ExchangeType.Fanout);

// Topic Exchange (Pattern Matching)
await producer.ExchangeDeclareAsync("topic.exchange", ExchangeType.Topic);

// Headers Exchange (Header-based Routing)
await producer.ExchangeDeclareAsync("headers.exchange", ExchangeType.Headers);

Best Practices

  1. Use Manual Acknowledgment: Set autoAck: false for better control over message processing and error handling.

  2. Set Prefetch Count: Use BasicQosAsync to limit the number of unacknowledged messages and prevent consumer overload.

await consumer.BasicQosAsync(prefetchCount: 10);
  1. Dispose Resources: Always dispose of producer and consumer instances when done to release connections.
await using var producer = factory.CreateProducer();
await using var consumer = factory.CreateConsumer<MyMessage>();
  1. Handle Exceptions: Always wrap message processing in try-catch blocks to handle errors gracefully.

  2. Use Durable Queues: Set durable: true for production environments to survive RabbitMQ restarts.

  3. Validate Configuration: The library automatically validates configuration, but ensure your settings are correct before use.

Event Arguments

MessageReceivedArgs<T>

Arguments passed to the message received event.

public class MessageReceivedArgs<T> : EventArgs
{
    public string ConsumerTag { get; set; }
    public ulong DeliveryTag { get; set; }
    public string Exchange { get; set; }
    public string RoutingKey { get; set; }
    public T Data { get; set; }
}

Project Structure

Smart.RabbitMQ/
├── SmartRabbitMQFactory.cs           - Factory for creating producers and consumers
├── SmartRabbitMQProducer.cs          - Message producer implementation
├── SmartRabbitMQConsumer.cs          - Generic message consumer implementation
├── SerializeConvert.cs               - JSON serialization utilities
├── RabbitMQOptions.cs               - Configuration options
├── MessageReceivedArgs.cs           - Event argument classes
├── ExchangeType.cs                  - Exchange type enumeration
├── IRabbitMQFactory.cs             - Factory interface
└── RabbitMQServiceExtensions.cs    - Dependency injection extensions

Supported .NET Versions

  • .NET 6
  • .NET 8
  • .NET 9
  • .NET 10

<a name="chinese"></a>

中文

Smart.RabbitMQ 是一个基于 RabbitMQ.Client 构建的高级抽象库,为 RabbitMQ 消息队列操作提供了简单、线程安全且功能丰富的 API。它支持 .NET 8, 9 和 10。

功能特性

  • 简单易用的 API: 直观且易于使用的生产者和消费消息接口。
  • 工厂模式: IRabbitMQFactory 用于创建生产者和消费者实例。
  • 线程安全: 所有操作都设计为线程安全的并发访问。
  • 延迟初始化: 连接在首次使用时初始化,避免阻塞构造函数。
  • 自动重连: 内置连接恢复功能,支持可配置的重试间隔。
  • 多种交换机类型: 支持 Direct、Fanout、Topic 和 Headers 交换机。
  • 消息确认: 灵活的手动和自动确认机制,带有错误处理。
  • 依赖注入: 与 Microsoft.Extensions.DependencyInjection 无缝集成。
  • JSON 序列化: 内置 JSON 序列化,使用 camelCase 命名策略。
  • 队列绑定管理: 自动防止重复绑定。
  • 配置验证: 内置 RabbitMQ 连接设置的验证功能。

安装

通过 NuGet 安装:

dotnet add package Smart.RabbitMQ

核心组件

IRabbitMQFactory

用于创建生产者和消费者实例的工厂接口。

public interface IRabbitMQFactory
{
    SmartRabbitMQProducer CreateProducer();
    SmartRabbitMQConsumer<T> CreateConsumer<T>() where T : class;
    SmartRabbitMQConsumer CreateConsumer();
}
SmartRabbitMQProducer

消息生产者,支持向交换机发布消息。

public class SmartRabbitMQProducer : IAsyncDisposable
{
    public Task ExchangeDeclareAsync(string exchange, ExchangeType type = ExchangeType.Direct, bool durable = true, CancellationToken token = default);
    public Task PublishAsync<T>(string exchange, string routingKey, T message, CancellationToken token = default);
    public Task PublishAsync(string exchange, string routingKey, string message, CancellationToken token = default);
}
SmartRabbitMQConsumer<T>

泛型消息消费者,支持事件驱动的消息处理。

public class SmartRabbitMQConsumer<T> : IAsyncDisposable where T : class
{
    public event AsyncMessageReceivedHandler<T> MessageReceived;
    
    public Task ExchangeDeclareAsync(string exchange, ExchangeType type = ExchangeType.Direct, bool durable = true, CancellationToken token = default);
    public Task<string> QueueDeclareAsync(string queue = "", bool durable = true, IDictionary<string, object?>? queueParams = null, CancellationToken token = default);
    public Task QueueBindAsync(string queue, string exchange, string routingKey, CancellationToken token = default);
    public Task BasicQosAsync(ushort prefetchCount = 0, CancellationToken token = default);
    public Task<string> StartConsumeAsync(bool autoAck = false, bool multiple = false, CancellationToken token = default);
    public Task CancelConsumeAsync(string consumerTag);
}

配置

RabbitMQOptions

RabbitMQ 连接配置选项。

public class RabbitMQOptions
{
    public string? ClientName { get; set; }
    public string HostName { get; set; } = "localhost";
    public int Port { get; set; } = 5672;
    public string UserName { get; set; } = "guest";
    public string Password { get; set; } = "guest";
    public string VirtualHost { get; set; } = "/";
}

使用示例

1. 基础生产者
using Smart.RabbitMQ;

var options = new RabbitMQOptions
{
    HostName = "localhost",
    Port = 5672,
    UserName = "guest",
    Password = "guest",
    VirtualHost = "/"
};

var factory = new SmartRabbitMQFactory(options);
var producer = factory.CreateProducer();

// 声明交换机
await producer.ExchangeDeclareAsync("my.exchange", ExchangeType.Direct);

// 发布消息
var message = new { Id = 1, Content = "Hello RabbitMQ" };
await producer.PublishAsync("my.exchange", "my.routing.key", message);

// 释放资源
await producer.DisposeAsync();
2. 基础消费者
using Smart.RabbitMQ;

var options = new RabbitMQOptions
{
    HostName = "localhost",
    Port = 5672,
    UserName = "guest",
    Password = "guest",
    VirtualHost = "/"
};

var factory = new SmartRabbitMQFactory(options);
var consumer = factory.CreateConsumer<MyMessage>();

// 订阅消息事件
consumer.MessageReceived += async (sender, args) =>
{
    Console.WriteLine($"收到消息: {args.Data.Content}");
    Console.WriteLine($"投递标签: {args.DeliveryTag}");
    Console.WriteLine($"交换机: {args.Exchange}");
    Console.WriteLine($"路由键: {args.RoutingKey}");
};

// 声明交换机
await consumer.ExchangeDeclareAsync("my.exchange", ExchangeType.Direct);

// 声明队列
var queueName = await consumer.QueueDeclareAsync("my.queue", durable: true);

// 绑定队列到交换机
await consumer.QueueBindAsync(queueName, "my.exchange", "my.routing.key");

// 设置 QoS(预取计数)
await consumer.BasicQosAsync(prefetchCount: 10);

// 开始消费
var consumerTag = await consumer.StartConsumeAsync(autoAck: false);

// 完成后取消消费
await consumer.CancelConsumeAsync(consumerTag);

// 释放资源
await consumer.DisposeAsync();
3. 依赖注入集成
using Microsoft.Extensions.DependencyInjection;
using Smart.RabbitMQ;

// 配置服务
var services = new ServiceCollection();

var rabbitMQOptions = new RabbitMQOptions
{
    HostName = "localhost",
    Port = 5672,
    UserName = "guest",
    Password = "guest",
    VirtualHost = "/"
};

services.AddSmartRabbitMQ(rabbitMQOptions);

// 构建服务提供者
var serviceProvider = services.BuildServiceProvider();

// 获取工厂
var factory = serviceProvider.GetRequiredService<IRabbitMQFactory>();

// 创建生产者
var producer = factory.CreateProducer();

// 创建消费者
var consumer = factory.CreateConsumer<MyMessage>();

// 或者直接获取生产者/消费者
var producer2 = serviceProvider.GetRequiredService<SmartRabbitMQProducer>();
var consumer2 = serviceProvider.GetRequiredService<SmartRabbitMQConsumer<MyMessage>>();
4. 字符串消息消费者
using Smart.RabbitMQ;

var factory = new SmartRabbitMQFactory(options);
var consumer = factory.CreateConsumer(); // 字符串消费者

consumer.MessageReceived += async (sender, args) =>
{
    Console.WriteLine($"收到字符串消息: {args.Data}");
};

await consumer.ExchangeDeclareAsync("my.exchange");
var queueName = await consumer.QueueDeclareAsync("my.queue");
await consumer.QueueBindAsync(queueName, "my.exchange", "my.routing.key");
await consumer.StartConsumeAsync();
5. 高级消费者与错误处理
using Smart.RabbitMQ;

var consumer = factory.CreateConsumer<MyMessage>();

consumer.MessageReceived += async (sender, args) =>
{
    try
    {
        // 处理消息
        await ProcessMessageAsync(args.Data);
        
        // 如果成功,消息将被自动确认
        Console.WriteLine("消息处理成功");
    }
    catch (Exception ex)
    {
        // 如果发生异常,消息将被拒绝并重新入队
        Console.Error.WriteLine($"处理消息时出错: {ex.Message}");
        throw; // 重新抛出以触发 Nack
    }
};

await consumer.StartConsumeAsync(autoAck: false);
6. 队列参数
using Smart.RabbitMQ;

var queueParams = new Dictionary<string, object?>
{
    ["x-message-ttl"] = 60000,           // 消息 TTL: 60 秒
    ["x-max-length"] = 10000,             // 队列最大长度
    ["x-expires"] = 3600000,               // 队列在 1 小时后过期
    ["x-dead-letter-exchange"] = "dlx"    // 死信交换机
};

var queueName = await consumer.QueueDeclareAsync(
    queue: "my.queue",
    durable: true,
    queueParams: queueParams
);
7. 交换机类型
using Smart.RabbitMQ;

// 直连交换机(点对点)
await producer.ExchangeDeclareAsync("direct.exchange", ExchangeType.Direct);

// 扇形交换机(广播)
await producer.ExchangeDeclareAsync("fanout.exchange", ExchangeType.Fanout);

// 主题交换机(模式匹配)
await producer.ExchangeDeclareAsync("topic.exchange", ExchangeType.Topic);

// 头交换机(基于头的路由)
await producer.ExchangeDeclareAsync("headers.exchange", ExchangeType.Headers);

最佳实践

  1. 使用手动确认: 设置 autoAck: false 以更好地控制消息处理和错误处理。

  2. 设置预取计数: 使用 BasicQosAsync 限制未确认消息的数量,防止消费者过载。

await consumer.BasicQosAsync(prefetchCount: 10);
  1. 释放资源: 使用完毕后始终释放生产者和消费者实例以释放连接。
await using var producer = factory.CreateProducer();
await using var consumer = factory.CreateConsumer<MyMessage>();
  1. 处理异常: 始终在 try-catch 块中包装消息处理,以优雅地处理错误。

  2. 使用持久化队列: 在生产环境中设置 durable: true 以在 RabbitMQ 重启后存活。

  3. 验证配置: 库会自动验证配置,但请确保在使用前设置正确。

事件参数

MessageReceivedArgs<T>

传递给消息接收事件的参数。

public class MessageReceivedArgs<T> : EventArgs
{
    public string ConsumerTag { get; set; }
    public ulong DeliveryTag { get; set; }
    public string Exchange { get; set; }
    public string RoutingKey { get; set; }
    public T Data { get; set; }
}

项目结构

Smart.RabbitMQ/
├── SmartRabbitMQFactory.cs           - 用于创建生产者和消费者的工厂
├── SmartRabbitMQProducer.cs          - 消息生产者实现
├── SmartRabbitMQConsumer.cs          - 泛型消息消费者实现
├── SerializeConvert.cs               - JSON 序列化工具
├── RabbitMQOptions.cs               - 配置选项
├── MessageReceivedArgs.cs           - 事件参数类
├── ExchangeType.cs                  - 交换机类型枚举
├── IRabbitMQFactory.cs             - 工厂接口
└── RabbitMQServiceExtensions.cs    - 依赖注入扩展

支持的 .NET 版本

  • .NET 8
  • .NET 9
  • .NET 10

Developed by zenglei

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.1.0 93 2/17/2026
1.0.0 95 2/14/2026
1.0.0-beta.14 53 2/11/2026
1.0.0-beta.13 54 2/10/2026
1.0.0-beta.12 65 12/30/2025
1.0.0-beta.11 138 4/5/2025
1.0.0-beta.10 194 3/20/2025
1.0.0-beta.9 206 3/17/2025
1.0.0-beta.8 173 3/16/2025
1.0.0-beta.7 183 3/16/2025
1.0.0-beta.6 178 3/16/2025
1.0.0-beta.5 180 3/16/2025
1.0.0-beta.4 225 3/4/2025
1.0.0-beta.3 228 3/4/2025
1.0.0-beta.2 229 3/3/2025
1.0.0-beta.1 155 3/2/2025