Galosys.Foundation.RocketMQ.Abstractions 26.5.20.1

dotnet add package Galosys.Foundation.RocketMQ.Abstractions --version 26.5.20.1
                    
NuGet\Install-Package Galosys.Foundation.RocketMQ.Abstractions -Version 26.5.20.1
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Galosys.Foundation.RocketMQ.Abstractions" Version="26.5.20.1" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Galosys.Foundation.RocketMQ.Abstractions" Version="26.5.20.1" />
                    
Directory.Packages.props
<PackageReference Include="Galosys.Foundation.RocketMQ.Abstractions" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Galosys.Foundation.RocketMQ.Abstractions --version 26.5.20.1
                    
#r "nuget: Galosys.Foundation.RocketMQ.Abstractions, 26.5.20.1"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Galosys.Foundation.RocketMQ.Abstractions@26.5.20.1
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Galosys.Foundation.RocketMQ.Abstractions&version=26.5.20.1
                    
Install as a Cake Addin
#tool nuget:?package=Galosys.Foundation.RocketMQ.Abstractions&version=26.5.20.1
                    
Install as a Cake Tool

Galosys.Foundation.RocketMQ.Abstractions

成熟度: 🟢 稳定 — 核心功能 + 高级特性(事务/顺序/批量)已完成

RocketMQ 抽象层模块,定义统一的消息发送/消费抽象基类、配置模型、[RmqHandler] 注解、可观测性基类。用户代码仅依赖此模块的 API,切换底层实现(Apache / NewLife)只需改 PackageReference。

架构

Galosys.Foundation.RocketMQ.Abstractions  ← 本模块(抽象层)
    ▲                    ▲
    │                    │
Galosys.Foundation.RocketMQ.Client      Galosys.Foundation.NewLife.RocketMQ
(Apache rocketmq.client 5.x)            (NewLife.RocketMQ 3.0)

功能特性

  • 统一 APIRmqTemplate + [RmqHandler] + IMessageHandler<T>,切换实现零改动
  • 声明式消费[RmqHandler("topic", "tag")] 方法级注解,自动扫描注册
  • 事务消息SendTransactionAsync 半消息 + 本地事务 + Commit/Rollback
  • 顺序消息SendOrderedAsync 相同 shardingKey 路由到同一队列
  • 批量发送SendBatchAsync 一次发送多条消息
  • 可观测性基类RmqTracing(OTel ActivitySource)+ RmqMeter(6 个 Counter/Histogram)
  • 健康检查基类RmqHealthCheck(IHealthCheck)
  • 生命周期管理RmqBootstrapper(IHostedLifecycleService,自动扫描 + 优雅关闭)

安装

通常不直接安装此包,而是选择一个实现包(二选一):


<PackageReference Include="Galosys.Foundation.RocketMQ.Client" Version="x.x.x" />


<PackageReference Include="Galosys.Foundation.NewLife.RocketMQ" Version="x.x.x" />

快速开始

1. 最小配置

{
  "RocketMQ": {
    "Endpoints": "localhost:9876"
  }
}

ConsumerGroup 默认取应用名称并将 . 替换为 _,通常无需显式配置。

2. 注册服务

// 方式一:模块自动发现(推荐)
await Host.CreateDefaultBuilder()
    .UseModularization()
    .Locate()
    .RunAsync();

// 方式二:手动注册
services.AddRocketMQ(configuration);

3. 发送消息

using Galosys.Foundation.RocketMQ.Abstractions;

public class OrderCreatedMessage : IMessage
{
    public long Id { get; set; }
    public string? Key { get; set; }
    public DateTime OccurredOn { get; set; }
    public string OrderNo { get; set; }
}

public class OrderService
{
    private readonly RmqTemplate _template;

    public OrderService(RmqTemplate template) => _template = template;

    // 普通消息
    public async Task SendAsync() =>
        await _template.SendAsync("order_created", new OrderCreatedMessage { OrderNo = "ORD001" });

    // 指定 Tag
    public async Task SendWithTagAsync() =>
        await _template.SendAsync("order_created", tag: "new", new OrderCreatedMessage { OrderNo = "ORD001" });

    // 延迟消息
    public async Task SendDelayAsync() =>
        await _template.SendAsync("order_created", new OrderCreatedMessage { OrderNo = "ORD001" }, delaySeconds: 30);

    // 事务消息(半消息 → 本地事务 → Commit/Rollback)
    public async Task SendTransactionAsync() =>
        await _template.SendTransactionAsync(
            new OrderCreatedMessage { OrderNo = "ORD001" },
            "order_created", "new",
            localTransaction: async () =>
            {
                // 执行本地事务(如扣库存、写订单)
                return true; // true=Commit, false=Rollback
            });

    // 顺序消息(相同 shardingKey 路由到同一队列,保证消费顺序)
    public async Task SendOrderedAsync() =>
        await _template.SendOrderedAsync(
            new OrderCreatedMessage { OrderNo = "ORD001" },
            "order_created", shardingKey: "ORD001");

    // 批量发送
    public async Task SendBatchAsync() =>
        await _template.SendBatchAsync(
            new[]
            {
                new OrderCreatedMessage { OrderNo = "ORD001" },
                new OrderCreatedMessage { OrderNo = "ORD002" },
            },
            "order_created");
}

4. 消费消息

using Galosys.Foundation.RocketMQ.Abstractions;

[Handler]
public class OrderHandler : IMessageHandler<OrderCreatedMessage>
{
    [RmqHandler("order_created")]
    public Task<bool> HandleAsync(OrderCreatedMessage msg)
    {
        // 处理消息
        return Task.FromResult(true); // true = ACK, false = Nack(重新投递)
    }
}

// 自定义消费组 + Tag 过滤
[Handler]
public class VipOrderHandler : IMessageHandler<OrderCreatedMessage>
{
    [RmqHandler("order_created", "vip", ConsumerGroup = "vip_order_group")]
    public Task<bool> HandleAsync(OrderCreatedMessage msg) => Task.FromResult(true);
}

注册机制: 类标注 [Handler] → DI 自动注册;方法标注 [RmqHandler] → 启动时自动扫描订阅 Topic。类需实现 IMessageHandlerIMessageHandler<T>

详细配置

完整配置示例

{
  "RocketMQ": {
    "Endpoints": "localhost:9876",
    "AccessKey": null,
    "SecretKey": null,
    "Ssl": false,
    "RequestTimeout": 10,
    "ConsumerGroup": "my_service",
    "Handlers": {
      "order_created": {
        "PrefetchCount": 16,
        "Concurrency": 4,
        "MaxRetryCount": 3,
        "RetryBaseDelayMs": 1000
      }
    }
  }
}

根配置(RmqOptions)

配置项 类型 默认值 说明
Endpoints string? (必填) NameServer 地址,多个用逗号分隔
AccessKey string? null ACL 认证 AccessKey
SecretKey string? null ACL 认证 SecretKey
Ssl bool false 是否启用 SSL/TLS
RequestTimeout int 10 请求超时(秒)
ConsumerGroup string 应用名(._ 默认消费组名称
Handlers IDictionary<string, RmqHandlerProperty> {} 按 Topic 粒度的消费者配置

Handler 配置(RmqHandlerProperty)

配置项 类型 默认值 说明
PrefetchCount int 16 每次拉取消息数量
Concurrency int 1 并发处理线程数
MaxRetryCount int 3 最大重试次数
RetryBaseDelayMs int 1000 重试基础延迟(毫秒)

可观测性

健康检查

实现模块在 AddRocketMQ() 中自动注册健康检查,访问 /health 即可查看 RocketMQ 状态。

健康检查名称为 "rocketmq"

OpenTelemetry 追踪

ActivitySource 名称:Galosys.Foundation.RocketMQ

Span 触发点 关键 Tag
rmq.send SendAsync messaging.destination(Topic)、messaging.rocketmq.tag
rmq.consume 消费回调 messaging.destination(Topic)、messaging.rocketmq.consumer_group

指标

Meter 名称:Galosys.Foundation.RocketMQ

指标名 类型 说明
rmq.messages.sent Counter 发送消息数
rmq.messages.consumed Counter 消费消息数
rmq.messages.rejected Counter 拒绝消息数
rmq.messages.retried Counter 重试消息数
rmq.send.duration Histogram 发送耗时(ms)
rmq.processing.duration Histogram 处理耗时(ms)

核心类

类名 说明
RmqTemplate 消息操作模板,提供 SendAsyncSendTransactionAsyncSendOrderedAsyncSendBatchAsyncHandleAsync
RmqPublisher 消息发布器抽象基类
RmqConsumer 消息消费者抽象基类
RmqBootstrapper 启动引导器,自动扫描 Handler + 优雅关闭
RmqTracing OpenTelemetry 追踪抽象基类
RmqMeter 统一 Meter 指标抽象基类
RmqHealthCheck 健康检查抽象基类
RmqOptions 配置选项类
RmqHandlerAttribute 消费者方法注解
RocketModule IModule 入口,自动调用 AddRocketMQ

依赖

  • Galosys.Foundation.Core
  • Microsoft.Extensions.Diagnostics.HealthChecks
  • Microsoft.Extensions.Hosting
  • System.Diagnostics.Metrics
Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (2)

Showing the top 2 NuGet packages that depend on Galosys.Foundation.RocketMQ.Abstractions:

Package Downloads
Galosys.Foundation.RocketMQ.Client

Galosys.Foundation快速开发库

Galosys.Foundation.NewLife.RocketMQ

Galosys.Foundation快速开发库

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
26.5.20.1 49 5/20/2026
26.5.19.1 63 5/19/2026
26.5.18.1 118 5/18/2026
26.5.15.1 116 5/15/2026
26.5.12.3 112 5/12/2026
26.5.12.2 120 5/12/2026
26.4.27.1-rc1 125 4/26/2026
26.4.25.1-rc1 124 4/25/2026
26.4.22.2-rc7 124 4/22/2026
26.4.22.2-rc6 115 4/22/2026
26.4.22.2-rc4 120 4/22/2026
26.4.22.2-rc3 107 4/22/2026
26.4.19.1-rc1 109 4/19/2026