Galosys.Foundation.RocketMQ.Abstractions
26.5.20.1
dotnet add package Galosys.Foundation.RocketMQ.Abstractions --version 26.5.20.1
NuGet\Install-Package Galosys.Foundation.RocketMQ.Abstractions -Version 26.5.20.1
<PackageReference Include="Galosys.Foundation.RocketMQ.Abstractions" Version="26.5.20.1" />
<PackageVersion Include="Galosys.Foundation.RocketMQ.Abstractions" Version="26.5.20.1" />
<PackageReference Include="Galosys.Foundation.RocketMQ.Abstractions" />
paket add Galosys.Foundation.RocketMQ.Abstractions --version 26.5.20.1
#r "nuget: Galosys.Foundation.RocketMQ.Abstractions, 26.5.20.1"
#:package Galosys.Foundation.RocketMQ.Abstractions@26.5.20.1
#addin nuget:?package=Galosys.Foundation.RocketMQ.Abstractions&version=26.5.20.1
#tool nuget:?package=Galosys.Foundation.RocketMQ.Abstractions&version=26.5.20.1
Galosys.Foundation.RocketMQ.Abstractions
成熟度: 🟢 稳定 — 核心功能 + 高级特性(事务/顺序/批量)已完成
RocketMQ 抽象层模块,定义统一的消息发送/消费抽象基类、配置模型、[RmqHandler] 注解、可观测性基类。用户代码仅依赖此模块的 API,切换底层实现(Apache / NewLife)只需改 PackageReference。
架构
Galosys.Foundation.RocketMQ.Abstractions ← 本模块(抽象层)
▲ ▲
│ │
Galosys.Foundation.RocketMQ.Client Galosys.Foundation.NewLife.RocketMQ
(Apache rocketmq.client 5.x) (NewLife.RocketMQ 3.0)
功能特性
- 统一 API —
RmqTemplate+[RmqHandler]+IMessageHandler<T>,切换实现零改动 - 声明式消费 —
[RmqHandler("topic", "tag")]方法级注解,自动扫描注册 - 事务消息 —
SendTransactionAsync半消息 + 本地事务 + Commit/Rollback - 顺序消息 —
SendOrderedAsync相同 shardingKey 路由到同一队列 - 批量发送 —
SendBatchAsync一次发送多条消息 - 可观测性基类 —
RmqTracing(OTel ActivitySource)+RmqMeter(6 个 Counter/Histogram) - 健康检查基类 —
RmqHealthCheck(IHealthCheck) - 生命周期管理 —
RmqBootstrapper(IHostedLifecycleService,自动扫描 + 优雅关闭)
安装
通常不直接安装此包,而是选择一个实现包(二选一):
<PackageReference Include="Galosys.Foundation.RocketMQ.Client" Version="x.x.x" />
<PackageReference Include="Galosys.Foundation.NewLife.RocketMQ" Version="x.x.x" />
快速开始
1. 最小配置
{
"RocketMQ": {
"Endpoints": "localhost:9876"
}
}
ConsumerGroup默认取应用名称并将.替换为_,通常无需显式配置。
2. 注册服务
// 方式一:模块自动发现(推荐)
await Host.CreateDefaultBuilder()
.UseModularization()
.Locate()
.RunAsync();
// 方式二:手动注册
services.AddRocketMQ(configuration);
3. 发送消息
using Galosys.Foundation.RocketMQ.Abstractions;
public class OrderCreatedMessage : IMessage
{
public long Id { get; set; }
public string? Key { get; set; }
public DateTime OccurredOn { get; set; }
public string OrderNo { get; set; }
}
public class OrderService
{
private readonly RmqTemplate _template;
public OrderService(RmqTemplate template) => _template = template;
// 普通消息
public async Task SendAsync() =>
await _template.SendAsync("order_created", new OrderCreatedMessage { OrderNo = "ORD001" });
// 指定 Tag
public async Task SendWithTagAsync() =>
await _template.SendAsync("order_created", tag: "new", new OrderCreatedMessage { OrderNo = "ORD001" });
// 延迟消息
public async Task SendDelayAsync() =>
await _template.SendAsync("order_created", new OrderCreatedMessage { OrderNo = "ORD001" }, delaySeconds: 30);
// 事务消息(半消息 → 本地事务 → Commit/Rollback)
public async Task SendTransactionAsync() =>
await _template.SendTransactionAsync(
new OrderCreatedMessage { OrderNo = "ORD001" },
"order_created", "new",
localTransaction: async () =>
{
// 执行本地事务(如扣库存、写订单)
return true; // true=Commit, false=Rollback
});
// 顺序消息(相同 shardingKey 路由到同一队列,保证消费顺序)
public async Task SendOrderedAsync() =>
await _template.SendOrderedAsync(
new OrderCreatedMessage { OrderNo = "ORD001" },
"order_created", shardingKey: "ORD001");
// 批量发送
public async Task SendBatchAsync() =>
await _template.SendBatchAsync(
new[]
{
new OrderCreatedMessage { OrderNo = "ORD001" },
new OrderCreatedMessage { OrderNo = "ORD002" },
},
"order_created");
}
4. 消费消息
using Galosys.Foundation.RocketMQ.Abstractions;
[Handler]
public class OrderHandler : IMessageHandler<OrderCreatedMessage>
{
[RmqHandler("order_created")]
public Task<bool> HandleAsync(OrderCreatedMessage msg)
{
// 处理消息
return Task.FromResult(true); // true = ACK, false = Nack(重新投递)
}
}
// 自定义消费组 + Tag 过滤
[Handler]
public class VipOrderHandler : IMessageHandler<OrderCreatedMessage>
{
[RmqHandler("order_created", "vip", ConsumerGroup = "vip_order_group")]
public Task<bool> HandleAsync(OrderCreatedMessage msg) => Task.FromResult(true);
}
注册机制: 类标注
[Handler]→ DI 自动注册;方法标注[RmqHandler]→ 启动时自动扫描订阅 Topic。类需实现IMessageHandler或IMessageHandler<T>。
详细配置
完整配置示例
{
"RocketMQ": {
"Endpoints": "localhost:9876",
"AccessKey": null,
"SecretKey": null,
"Ssl": false,
"RequestTimeout": 10,
"ConsumerGroup": "my_service",
"Handlers": {
"order_created": {
"PrefetchCount": 16,
"Concurrency": 4,
"MaxRetryCount": 3,
"RetryBaseDelayMs": 1000
}
}
}
}
根配置(RmqOptions)
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
Endpoints |
string? |
(必填) | NameServer 地址,多个用逗号分隔 |
AccessKey |
string? |
null |
ACL 认证 AccessKey |
SecretKey |
string? |
null |
ACL 认证 SecretKey |
Ssl |
bool |
false |
是否启用 SSL/TLS |
RequestTimeout |
int |
10 |
请求超时(秒) |
ConsumerGroup |
string |
应用名(. → _) |
默认消费组名称 |
Handlers |
IDictionary<string, RmqHandlerProperty> |
{} |
按 Topic 粒度的消费者配置 |
Handler 配置(RmqHandlerProperty)
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
PrefetchCount |
int |
16 |
每次拉取消息数量 |
Concurrency |
int |
1 |
并发处理线程数 |
MaxRetryCount |
int |
3 |
最大重试次数 |
RetryBaseDelayMs |
int |
1000 |
重试基础延迟(毫秒) |
可观测性
健康检查
实现模块在 AddRocketMQ() 中自动注册健康检查,访问 /health 即可查看 RocketMQ 状态。
健康检查名称为
"rocketmq"。
OpenTelemetry 追踪
ActivitySource 名称:Galosys.Foundation.RocketMQ
| Span | 触发点 | 关键 Tag |
|---|---|---|
rmq.send |
SendAsync |
messaging.destination(Topic)、messaging.rocketmq.tag |
rmq.consume |
消费回调 | messaging.destination(Topic)、messaging.rocketmq.consumer_group |
指标
Meter 名称:Galosys.Foundation.RocketMQ
| 指标名 | 类型 | 说明 |
|---|---|---|
rmq.messages.sent |
Counter | 发送消息数 |
rmq.messages.consumed |
Counter | 消费消息数 |
rmq.messages.rejected |
Counter | 拒绝消息数 |
rmq.messages.retried |
Counter | 重试消息数 |
rmq.send.duration |
Histogram | 发送耗时(ms) |
rmq.processing.duration |
Histogram | 处理耗时(ms) |
核心类
| 类名 | 说明 |
|---|---|
RmqTemplate |
消息操作模板,提供 SendAsync、SendTransactionAsync、SendOrderedAsync、SendBatchAsync、HandleAsync |
RmqPublisher |
消息发布器抽象基类 |
RmqConsumer |
消息消费者抽象基类 |
RmqBootstrapper |
启动引导器,自动扫描 Handler + 优雅关闭 |
RmqTracing |
OpenTelemetry 追踪抽象基类 |
RmqMeter |
统一 Meter 指标抽象基类 |
RmqHealthCheck |
健康检查抽象基类 |
RmqOptions |
配置选项类 |
RmqHandlerAttribute |
消费者方法注解 |
RocketModule |
IModule 入口,自动调用 AddRocketMQ |
依赖
Galosys.Foundation.CoreMicrosoft.Extensions.Diagnostics.HealthChecksMicrosoft.Extensions.HostingSystem.Diagnostics.Metrics
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- Galosys.Foundation.Core (>= 26.5.20.1)
NuGet packages (2)
Showing the top 2 NuGet packages that depend on Galosys.Foundation.RocketMQ.Abstractions:
| Package | Downloads |
|---|---|
|
Galosys.Foundation.RocketMQ.Client
Galosys.Foundation快速开发库 |
|
|
Galosys.Foundation.NewLife.RocketMQ
Galosys.Foundation快速开发库 |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 26.5.20.1 | 49 | 5/20/2026 |
| 26.5.19.1 | 63 | 5/19/2026 |
| 26.5.18.1 | 118 | 5/18/2026 |
| 26.5.15.1 | 116 | 5/15/2026 |
| 26.5.12.3 | 112 | 5/12/2026 |
| 26.5.12.2 | 120 | 5/12/2026 |
| 26.4.27.1-rc1 | 125 | 4/26/2026 |
| 26.4.25.1-rc1 | 124 | 4/25/2026 |
| 26.4.22.2-rc7 | 124 | 4/22/2026 |
| 26.4.22.2-rc6 | 115 | 4/22/2026 |
| 26.4.22.2-rc4 | 120 | 4/22/2026 |
| 26.4.22.2-rc3 | 107 | 4/22/2026 |
| 26.4.19.1-rc1 | 109 | 4/19/2026 |