Galosys.Foundation.RabbitMQ.Client
26.5.12.3
See the version list below for details.
dotnet add package Galosys.Foundation.RabbitMQ.Client --version 26.5.12.3
NuGet\Install-Package Galosys.Foundation.RabbitMQ.Client -Version 26.5.12.3
<PackageReference Include="Galosys.Foundation.RabbitMQ.Client" Version="26.5.12.3" />
<PackageVersion Include="Galosys.Foundation.RabbitMQ.Client" Version="26.5.12.3" />
<PackageReference Include="Galosys.Foundation.RabbitMQ.Client" />
paket add Galosys.Foundation.RabbitMQ.Client --version 26.5.12.3
#r "nuget: Galosys.Foundation.RabbitMQ.Client, 26.5.12.3"
#:package Galosys.Foundation.RabbitMQ.Client@26.5.12.3
#addin nuget:?package=Galosys.Foundation.RabbitMQ.Client&version=26.5.12.3
#tool nuget:?package=Galosys.Foundation.RabbitMQ.Client&version=26.5.12.3
Galosys.Foundation.RabbitMQ.Client
成熟度: 🟢 稳定 — 生产级 RabbitMQ 客户端,连接池 + 批量管道 + Publisher Confirms + OTel 可观测性
基于 RabbitMQ.Client 的生产级实现,继承 Galosys.Foundation.Amqp 抽象层,提供连接池、批量发布管道、Publisher Confirms、拓扑缓存、DLX/DLQ 死信、OpenTelemetry 追踪/指标等企业级特性。
架构
Galosys.Foundation.Amqp ← 抽象层(AmqpTemplate / AmqpPublisher / AmqpConsumer)
▲
│
Galosys.Foundation.RabbitMQ.Client ← 本模块(RabbitTemplate / RabbitPublisher / RabbitConsumer)
功能特性
- 连接池 — 生产者/消费者独立连接池,Round-Robin Channel 分配,信号量限流
- 批量发布管道 —
BatchPublishPipeline(有界 Channel + 后台刷盘),Publisher Confirms 逐消息确认 - 自动拓扑管理 —
TopologyCache缓存已声明的队列/交换器,断连自动重建 - DLX/DLQ 死信 — 消费失败自动路由到死信交换器 + 死信队列
- 指数退避重试 — 消费端失败自动重试(x-delayed-message 插件),超限转入 DLQ
- 多主机支持 — Ping 选最优节点,自动网络恢复
- OTel 追踪 — W3C traceparent 传播,Producer/Consumer Span
- OTel 指标 — 7 个 Counter/Histogram/Gauge(发布数、消费数、延迟、活跃连接数等)
- 健康检查 —
RabbitMQHealthCheck聚合生产者/消费者连接池状态(Healthy/Degraded/Unhealthy) - ICloudBus — 通过
AmqpTemplate实现 Core 的ICloudBus接口
前置条件
- RabbitMQ 3.x+ 服务端
- 开启
rabbitmq_delayed_message_exchange插件(消费重试需要) - 建议开启
rabbitmq_management插件(运维监控)
安装
<PackageReference Include="Galosys.Foundation.RabbitMQ.Client" Version="x.x.x" />
快速开始
1. 最小配置
{
"RabbitMQ": {
"HostName": "localhost",
"UserName": "guest",
"Password": "guest",
"VirtualHost": "/"
}
}
AppName默认取应用名称,AutoCreateEnabled默认 true 自动创建队列/交换器。
2. 注册服务
// 方式一:模块自动发现(推荐)
await Host.CreateDefaultBuilder()
.UseModularization()
.Locate()
.RunAsync();
// 方式二:手动注册
services.AddRabbitMQ(configuration);
3. 发送消息
using RabbitMQ.Client;
public class OrderCreatedMessage : IMessage
{
public long Id { get; set; }
public string? Key { get; set; }
public DateTime OccurredOn { get; set; }
public string OrderNo { get; set; }
}
public class OrderService
{
private readonly AmqpTemplate _amqp;
public OrderService(AmqpTemplate amqp) => _amqp = amqp;
// 普通消息(通过 BatchPublishPipeline → Publisher Confirms)
public async Task SendAsync() =>
await _amqp.SendAsync("order_created", new OrderCreatedMessage { OrderNo = "ORD001" });
// 广播消息(fanout)
public async Task BroadcastAsync() =>
await _amqp.BroadcastAsync("order_events", new OrderCreatedMessage { OrderNo = "ORD001" });
// 延迟消息
public async Task DelayAsync() =>
await _amqp.DelayAsync("order_created", new OrderCreatedMessage { OrderNo = "ORD001" }, delayMillionSeconds: 5000);
// 自定义消息属性
public async Task SendWithPropsAsync() =>
await _amqp.SendAsync("order_created", new OrderCreatedMessage { OrderNo = "ORD001" },
postMessageProcessor: props =>
{
props.SetCustomProperty("x-correlation-id", "corr-123");
});
}
4. 消费消息
using RabbitMQ.Client;
[Handler]
public class OrderHandler : IMessageHandler
{
[AmqpHandler("order_created")]
public async Task<bool> HandleAsync(IMessage msg)
{
// 处理消息
return true; // true = ACK, false = Nack → 重试或进入 DLQ
}
// 广播消费(fanout + 持久化队列)
[AmqpHandler("order_events", Broadcast = true, Durable = true)]
public async Task<bool> OnEventAsync(IMessage msg)
{
return true;
}
}
注册机制: 类标注
[Handler]→ DI 自动注册;方法标注[AmqpHandler]→ 启动时自动扫描订阅。
5. 使用 ICloudBus
public class MyService
{
private readonly ICloudBus _bus;
public MyService(ICloudBus bus) => _bus = bus;
public async Task PublishAsync(MyEvent evt) =>
await _bus.PublishAsync("my_event", evt);
}
详细配置
完整配置示例
{
"RabbitMQ": {
"HostName": "rabbit1, rabbit2, rabbit3",
"UserName": "user",
"Password": "pass",
"VirtualHost": "/",
"AutoCreateEnabled": true,
"ConsumerDispatchConcurrency": 16,
"NetworkRecoveryInterval": 10,
"RequestedHeartbeat": 45,
"ContinuationTimeout": 20,
"Limit": {
"ProducerMaxConnections": 4,
"ProducerChannelMaxPerConnection": 10,
"ConsumerMaxConnections": 4,
"ConsumerChannelMaxPerConnection": 50,
"MaxMessageSize": 65536,
"MaxTps": 0,
"BatchSize": 100,
"BatchTimeoutMs": 50,
"PublishConfirmTimeoutMs": 3000,
"RetryCount": 3,
"RetryBaseDelayMs": 1000,
"PipelineCapacity": 5000
},
"Handlers": {
"order_created": {
"Concurrency": 4,
"MaxRetryCount": 3,
"RetryBaseDelayMs": 1000,
"PrefetchMultiplier": 2,
"DurableQueue": false,
"QueueName": null
}
}
}
}
配置项详细说明见 Amqp 模块文档。
可观测性
健康检查
自动注册健康检查,访问 /health 即可查看 RabbitMQ 状态。
健康检查名称为
"rabbitmq",标签["rabbitmq", "messaging"]。三态:Healthy(全部连接正常)/ Degraded(部分连接断开)/ Unhealthy(全部断开)。
OpenTelemetry 追踪
ActivitySource 名称:Galosys.Foundation.RabbitMQ
| Span | 触发点 | 关键 Tag |
|---|---|---|
rabbitmq.publish |
SendAsync / BroadcastAsync |
messaging.destination(RoutingKey)、W3C traceparent |
rabbitmq.consume |
消费回调 | messaging.destination(RoutingKey)、Consumer Tag |
指标
Meter 名称:Galosys.Foundation.RabbitMQ
| 指标名 | 类型 | 说明 |
|---|---|---|
rabbitmq.messages.published |
Counter | 发布消息数 |
rabbitmq.messages.consumed |
Counter | 消费消息数 |
rabbitmq.messages.rejected |
Counter | 拒绝消息数 |
rabbitmq.messages.retried |
Counter | 重试消息数 |
rabbitmq.messages.publish_latency |
Histogram | 发布耗时(ms) |
rabbitmq.messages.processing_duration |
Histogram | 处理耗时(ms) |
rabbitmq.batch.size |
Histogram | 批量大小 |
rabbitmq.connections.active |
ObservableGauge | 活跃连接数 |
rabbitmq.channels.active |
ObservableGauge | 活跃 Channel 数 |
核心类
| 类名 | 说明 |
|---|---|
RabbitTemplate |
统一消息模板(Facade),组合 RabbitPublisher + RabbitConsumer |
RabbitPublisher |
消息发布器,序列化 + 拓扑 + 入队 BatchPublishPipeline |
RabbitConsumer |
消息消费者,信号量并发 + 指数退避重试 + DLX/DLQ |
RabbitBootstrapper |
启动引导器,预热连接池 + 扫描 Handler + 优雅关闭 |
ConnectionPool |
多连接池,Round-Robin Channel 分配 + 自动重连 |
IConnectionPool |
连接池接口 |
ChannelLease |
Channel 租约(RAII,Dispose 归还) |
BatchPublishPipeline |
批量发布管道,有界 Channel + Publisher Confirms + 重试 |
PendingMessage |
管道中的待发送消息(含 TCS 确认) |
TopologyCache |
拓扑缓存,ConcurrentDictionary + 断连重建 |
RabbitMQMeter |
OTel 指标(7 个 Counter/Histogram/Gauge) |
RabbitMQTracing |
OTel 追踪(W3C traceparent 注入/提取) |
RabbitMQHealthCheck |
健康检查(聚合生产者/消费者池状态) |
RabbitCloudBus |
ICloudBus 实现(广播发布) |
依赖
Galosys.Foundation.Amqp(抽象层)Galosys.Foundation.Core(ICloudBus、IMessage、ISnowflakeIdGenerator、IObjectSerializer)RabbitMQ.Client(NuGet)
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- Galosys.Foundation.Amqp (>= 26.5.12.3)
- Galosys.Foundation.Core (>= 26.5.12.3)
- rabbitmq.client (>= 7.1.0)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on Galosys.Foundation.RabbitMQ.Client:
| Package | Downloads |
|---|---|
|
Galosys.Foundation.RabbitMQ.Benchmarks
Galosys.Foundation快速开发库 |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 26.5.20.1 | 92 | 5/20/2026 |
| 26.5.19.1 | 93 | 5/19/2026 |
| 26.5.18.1 | 100 | 5/18/2026 |
| 26.5.15.1 | 99 | 5/15/2026 |
| 26.5.12.3 | 94 | 5/12/2026 |
| 26.5.12.2 | 96 | 5/12/2026 |
| 26.4.27.1-rc1 | 112 | 4/26/2026 |
| 26.4.25.1-rc1 | 101 | 4/25/2026 |
| 26.4.22.2-rc7 | 93 | 4/22/2026 |
| 26.4.22.2-rc6 | 94 | 4/22/2026 |
| 26.4.22.2-rc4 | 92 | 4/22/2026 |
| 26.4.22.2-rc3 | 92 | 4/22/2026 |
| 26.4.19.1-rc1 | 133 | 4/19/2026 |