Galosys.Foundation.RabbitMQ.Client 26.5.12.3

There is a newer version of this package available.
See the version list below for details.
dotnet add package Galosys.Foundation.RabbitMQ.Client --version 26.5.12.3
                    
NuGet\Install-Package Galosys.Foundation.RabbitMQ.Client -Version 26.5.12.3
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Galosys.Foundation.RabbitMQ.Client" Version="26.5.12.3" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Galosys.Foundation.RabbitMQ.Client" Version="26.5.12.3" />
                    
Directory.Packages.props
<PackageReference Include="Galosys.Foundation.RabbitMQ.Client" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Galosys.Foundation.RabbitMQ.Client --version 26.5.12.3
                    
#r "nuget: Galosys.Foundation.RabbitMQ.Client, 26.5.12.3"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Galosys.Foundation.RabbitMQ.Client@26.5.12.3
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Galosys.Foundation.RabbitMQ.Client&version=26.5.12.3
                    
Install as a Cake Addin
#tool nuget:?package=Galosys.Foundation.RabbitMQ.Client&version=26.5.12.3
                    
Install as a Cake Tool

Galosys.Foundation.RabbitMQ.Client

成熟度: 🟢 稳定 — 生产级 RabbitMQ 客户端,连接池 + 批量管道 + Publisher Confirms + OTel 可观测性

基于 RabbitMQ.Client 的生产级实现,继承 Galosys.Foundation.Amqp 抽象层,提供连接池、批量发布管道、Publisher Confirms、拓扑缓存、DLX/DLQ 死信、OpenTelemetry 追踪/指标等企业级特性。

架构

Galosys.Foundation.Amqp                ← 抽象层(AmqpTemplate / AmqpPublisher / AmqpConsumer)
    ▲
    │
Galosys.Foundation.RabbitMQ.Client     ← 本模块(RabbitTemplate / RabbitPublisher / RabbitConsumer)

功能特性

  • 连接池 — 生产者/消费者独立连接池,Round-Robin Channel 分配,信号量限流
  • 批量发布管道BatchPublishPipeline(有界 Channel + 后台刷盘),Publisher Confirms 逐消息确认
  • 自动拓扑管理TopologyCache 缓存已声明的队列/交换器,断连自动重建
  • DLX/DLQ 死信 — 消费失败自动路由到死信交换器 + 死信队列
  • 指数退避重试 — 消费端失败自动重试(x-delayed-message 插件),超限转入 DLQ
  • 多主机支持 — Ping 选最优节点,自动网络恢复
  • OTel 追踪 — W3C traceparent 传播,Producer/Consumer Span
  • OTel 指标 — 7 个 Counter/Histogram/Gauge(发布数、消费数、延迟、活跃连接数等)
  • 健康检查RabbitMQHealthCheck 聚合生产者/消费者连接池状态(Healthy/Degraded/Unhealthy)
  • ICloudBus — 通过 AmqpTemplate 实现 Core 的 ICloudBus 接口

前置条件

  • RabbitMQ 3.x+ 服务端
  • 开启 rabbitmq_delayed_message_exchange 插件(消费重试需要)
  • 建议开启 rabbitmq_management 插件(运维监控)

安装

<PackageReference Include="Galosys.Foundation.RabbitMQ.Client" Version="x.x.x" />

快速开始

1. 最小配置

{
  "RabbitMQ": {
    "HostName": "localhost",
    "UserName": "guest",
    "Password": "guest",
    "VirtualHost": "/"
  }
}

AppName 默认取应用名称,AutoCreateEnabled 默认 true 自动创建队列/交换器。

2. 注册服务

// 方式一:模块自动发现(推荐)
await Host.CreateDefaultBuilder()
    .UseModularization()
    .Locate()
    .RunAsync();

// 方式二:手动注册
services.AddRabbitMQ(configuration);

3. 发送消息

using RabbitMQ.Client;

public class OrderCreatedMessage : IMessage
{
    public long Id { get; set; }
    public string? Key { get; set; }
    public DateTime OccurredOn { get; set; }
    public string OrderNo { get; set; }
}

public class OrderService
{
    private readonly AmqpTemplate _amqp;

    public OrderService(AmqpTemplate amqp) => _amqp = amqp;

    // 普通消息(通过 BatchPublishPipeline → Publisher Confirms)
    public async Task SendAsync() =>
        await _amqp.SendAsync("order_created", new OrderCreatedMessage { OrderNo = "ORD001" });

    // 广播消息(fanout)
    public async Task BroadcastAsync() =>
        await _amqp.BroadcastAsync("order_events", new OrderCreatedMessage { OrderNo = "ORD001" });

    // 延迟消息
    public async Task DelayAsync() =>
        await _amqp.DelayAsync("order_created", new OrderCreatedMessage { OrderNo = "ORD001" }, delayMillionSeconds: 5000);

    // 自定义消息属性
    public async Task SendWithPropsAsync() =>
        await _amqp.SendAsync("order_created", new OrderCreatedMessage { OrderNo = "ORD001" },
            postMessageProcessor: props =>
            {
                props.SetCustomProperty("x-correlation-id", "corr-123");
            });
}

4. 消费消息

using RabbitMQ.Client;

[Handler]
public class OrderHandler : IMessageHandler
{
    [AmqpHandler("order_created")]
    public async Task<bool> HandleAsync(IMessage msg)
    {
        // 处理消息
        return true; // true = ACK, false = Nack → 重试或进入 DLQ
    }

    // 广播消费(fanout + 持久化队列)
    [AmqpHandler("order_events", Broadcast = true, Durable = true)]
    public async Task<bool> OnEventAsync(IMessage msg)
    {
        return true;
    }
}

注册机制: 类标注 [Handler] → DI 自动注册;方法标注 [AmqpHandler] → 启动时自动扫描订阅。

5. 使用 ICloudBus

public class MyService
{
    private readonly ICloudBus _bus;

    public MyService(ICloudBus bus) => _bus = bus;

    public async Task PublishAsync(MyEvent evt) =>
        await _bus.PublishAsync("my_event", evt);
}

详细配置

完整配置示例

{
  "RabbitMQ": {
    "HostName": "rabbit1, rabbit2, rabbit3",
    "UserName": "user",
    "Password": "pass",
    "VirtualHost": "/",
    "AutoCreateEnabled": true,
    "ConsumerDispatchConcurrency": 16,
    "NetworkRecoveryInterval": 10,
    "RequestedHeartbeat": 45,
    "ContinuationTimeout": 20,
    "Limit": {
      "ProducerMaxConnections": 4,
      "ProducerChannelMaxPerConnection": 10,
      "ConsumerMaxConnections": 4,
      "ConsumerChannelMaxPerConnection": 50,
      "MaxMessageSize": 65536,
      "MaxTps": 0,
      "BatchSize": 100,
      "BatchTimeoutMs": 50,
      "PublishConfirmTimeoutMs": 3000,
      "RetryCount": 3,
      "RetryBaseDelayMs": 1000,
      "PipelineCapacity": 5000
    },
    "Handlers": {
      "order_created": {
        "Concurrency": 4,
        "MaxRetryCount": 3,
        "RetryBaseDelayMs": 1000,
        "PrefetchMultiplier": 2,
        "DurableQueue": false,
        "QueueName": null
      }
    }
  }
}

配置项详细说明见 Amqp 模块文档

可观测性

健康检查

自动注册健康检查,访问 /health 即可查看 RabbitMQ 状态。

健康检查名称为 "rabbitmq",标签 ["rabbitmq", "messaging"]

三态:Healthy(全部连接正常)/ Degraded(部分连接断开)/ Unhealthy(全部断开)。

OpenTelemetry 追踪

ActivitySource 名称:Galosys.Foundation.RabbitMQ

Span 触发点 关键 Tag
rabbitmq.publish SendAsync / BroadcastAsync messaging.destination(RoutingKey)、W3C traceparent
rabbitmq.consume 消费回调 messaging.destination(RoutingKey)、Consumer Tag

指标

Meter 名称:Galosys.Foundation.RabbitMQ

指标名 类型 说明
rabbitmq.messages.published Counter 发布消息数
rabbitmq.messages.consumed Counter 消费消息数
rabbitmq.messages.rejected Counter 拒绝消息数
rabbitmq.messages.retried Counter 重试消息数
rabbitmq.messages.publish_latency Histogram 发布耗时(ms)
rabbitmq.messages.processing_duration Histogram 处理耗时(ms)
rabbitmq.batch.size Histogram 批量大小
rabbitmq.connections.active ObservableGauge 活跃连接数
rabbitmq.channels.active ObservableGauge 活跃 Channel 数

核心类

类名 说明
RabbitTemplate 统一消息模板(Facade),组合 RabbitPublisher + RabbitConsumer
RabbitPublisher 消息发布器,序列化 + 拓扑 + 入队 BatchPublishPipeline
RabbitConsumer 消息消费者,信号量并发 + 指数退避重试 + DLX/DLQ
RabbitBootstrapper 启动引导器,预热连接池 + 扫描 Handler + 优雅关闭
ConnectionPool 多连接池,Round-Robin Channel 分配 + 自动重连
IConnectionPool 连接池接口
ChannelLease Channel 租约(RAII,Dispose 归还)
BatchPublishPipeline 批量发布管道,有界 Channel + Publisher Confirms + 重试
PendingMessage 管道中的待发送消息(含 TCS 确认)
TopologyCache 拓扑缓存,ConcurrentDictionary + 断连重建
RabbitMQMeter OTel 指标(7 个 Counter/Histogram/Gauge)
RabbitMQTracing OTel 追踪(W3C traceparent 注入/提取)
RabbitMQHealthCheck 健康检查(聚合生产者/消费者池状态)
RabbitCloudBus ICloudBus 实现(广播发布)

依赖

  • Galosys.Foundation.Amqp(抽象层)
  • Galosys.Foundation.CoreICloudBusIMessageISnowflakeIdGeneratorIObjectSerializer
  • RabbitMQ.Client(NuGet)
Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on Galosys.Foundation.RabbitMQ.Client:

Package Downloads
Galosys.Foundation.RabbitMQ.Benchmarks

Galosys.Foundation快速开发库

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
26.5.20.1 92 5/20/2026
26.5.19.1 93 5/19/2026
26.5.18.1 100 5/18/2026
26.5.15.1 99 5/15/2026
26.5.12.3 94 5/12/2026
26.5.12.2 96 5/12/2026
26.4.27.1-rc1 112 4/26/2026
26.4.25.1-rc1 101 4/25/2026
26.4.22.2-rc7 93 4/22/2026
26.4.22.2-rc6 94 4/22/2026
26.4.22.2-rc4 92 4/22/2026
26.4.22.2-rc3 92 4/22/2026
26.4.19.1-rc1 133 4/19/2026