Galosys.Foundation.Amqp 26.5.20.1

dotnet add package Galosys.Foundation.Amqp --version 26.5.20.1
                    
NuGet\Install-Package Galosys.Foundation.Amqp -Version 26.5.20.1
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Galosys.Foundation.Amqp" Version="26.5.20.1" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Galosys.Foundation.Amqp" Version="26.5.20.1" />
                    
Directory.Packages.props
<PackageReference Include="Galosys.Foundation.Amqp" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Galosys.Foundation.Amqp --version 26.5.20.1
                    
#r "nuget: Galosys.Foundation.Amqp, 26.5.20.1"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Galosys.Foundation.Amqp@26.5.20.1
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Galosys.Foundation.Amqp&version=26.5.20.1
                    
Install as a Cake Addin
#tool nuget:?package=Galosys.Foundation.Amqp&version=26.5.20.1
                    
Install as a Cake Tool

Galosys.Foundation.Amqp

成熟度: 🟢 稳定 — AMQP 抽象层,已被 RabbitMQ.Client 模块正式继承

AMQP 消息抽象层模块,定义统一的发送/消费抽象基类、配置模型、[AmqpHandler] 注解、Handler 自动扫描。用户代码仅依赖此模块的 API,切换底层实现只需改 PackageReference。

架构

Galosys.Foundation.Amqp                ← 本模块(抽象层)
    ▲
    │
Galosys.Foundation.RabbitMQ.Client     ← RabbitMQ 实现(ConnectionPool + BatchPublishPipeline + Publisher Confirms)

功能特性

  • 统一 APIAmqpTemplate + [AmqpHandler] + IMessageHandler<T>,切换实现零改动
  • 声明式消费[AmqpHandler("routingKey")] 方法级注解,启动时自动扫描注册
  • 广播消息BroadcastAsync fanout 发布
  • 延迟消息DelayAsync 支持 x-delayed-message 插件
  • 消息属性扩展AmqpProperties 流式 API:自定义 MessageId、Header、延迟、TTL
  • 生命周期管理AmqpBootstrapper(IHostedLifecycleService,自动扫描 Handler + 优雅关闭)
  • 每消息 DI Scope — 消费端自动创建独立 DI Scope,正确处理 Scoped 生命周期

安装

通常不直接安装此包,而是选择实现包:


<PackageReference Include="Galosys.Foundation.RabbitMQ.Client" Version="x.x.x" />

快速开始

1. 最小配置

{
  "RabbitMQ": {
    "HostName": "localhost",
    "UserName": "guest",
    "Password": "guest",
    "VirtualHost": "/"
  }
}

AppName 默认取应用名称,通常无需显式配置。

2. 注册服务

// 方式一:模块自动发现(推荐)
await Host.CreateDefaultBuilder()
    .UseModularization()
    .Locate()
    .RunAsync();

// 方式二:手动注册(需同时注册实现包)
services.AddRabbitMQ(configuration);

3. 发送消息

using RabbitMQ.Client;

public class OrderCreatedMessage
{
    public string OrderNo { get; set; }
}

public class OrderService
{
    private readonly AmqpTemplate _amqp;

    public OrderService(AmqpTemplate amqp) => _amqp = amqp;

    // 单条发送
    public async Task SendAsync() =>
        await _amqp.SendAsync("order_created", new[] { new OrderCreatedMessage { OrderNo = "ORD001" } });

    // 批量发送
    public async Task SendBatchAsync() =>
        await _amqp.SendAsync("order_created", new[]
        {
            new OrderCreatedMessage { OrderNo = "ORD001" },
            new OrderCreatedMessage { OrderNo = "ORD002" }
        });

    // 广播消息(fanout)
    public async Task BroadcastAsync() =>
        await _amqp.BroadcastAsync("order_events", new[] { new OrderCreatedMessage { OrderNo = "ORD001" } });

    // 延迟消息(需 x-delayed-message 插件)
    public async Task DelayAsync() =>
        await _amqp.DelayAsync("order_created", new[] { new OrderCreatedMessage { OrderNo = "ORD001" } }, delayMillionSeconds: 5000);
}

4. 消费消息

using RabbitMQ.Client;

[Handler]
public class OrderHandler : IMessageHandler
{
    [AmqpHandler("order_created")]
    public async Task<bool> HandleAsync(IMessage msg)
    {
        // 处理消息
        return true; // true = ACK, false = Nack
    }

    // 广播消费
    [AmqpHandler("order_events", Broadcast = true, Durable = true)]
    public async Task<bool> OnBroadcastAsync(IMessage msg)
    {
        return true;
    }
}

注册机制: 类标注 [Handler] → DI 自动注册;方法标注 [AmqpHandler] → 启动时自动扫描订阅。类需实现 IMessageHandlerIMessageHandler<T>

详细配置

完整配置示例

{
  "RabbitMQ": {
    "HostName": "localhost",
    "UserName": "guest",
    "Password": "guest",
    "VirtualHost": "/",
    "AutoCreateEnabled": true,
    "ConsumerDispatchConcurrency": 8,
    "NetworkRecoveryInterval": 10,
    "RequestedHeartbeat": 45,
    "ContinuationTimeout": 20,
    "Limit": {
      "ProducerMaxConnections": 4,
      "ConsumerMaxConnections": 4,
      "MaxMessageSize": 65536,
      "BatchSize": 100,
      "BatchTimeoutMs": 50,
      "PublishConfirmTimeoutMs": 3000,
      "PipelineCapacity": 5000
    },
    "Handlers": {
      "order_created": {
        "Concurrency": 4,
        "MaxRetryCount": 3,
        "RetryBaseDelayMs": 1000,
        "PrefetchMultiplier": 2,
        "DurableQueue": false,
        "QueueName": null
      }
    }
  }
}

根配置(AmqpOptions)

配置项 类型 默认值 说明
HostName string (必填) RabbitMQ 主机地址,多主机空格/逗号分隔,自动 Ping 选最优
UserName string? null 认证用户名
Password string? null 认证密码
VirtualHost string? "/" 虚拟主机
AppName string? 应用名 客户端连接名称
AutoCreateEnabled bool true 自动声明队列/交换器(阿里云限 TPS 场景可关闭)
Limit AmqpLimitOptions 见下表 连接池与限流配置
Handlers IDictionary<string, AmqpHandlerProperty> {} 按 RoutingKey 粒度的消费者配置
ConsumerDispatchConcurrency ushort CPU核心数*4 消费者调度并发数
NetworkRecoveryInterval int 10 自动重连间隔(秒)
RequestedHeartbeat int 45 心跳间隔(秒)
ContinuationTimeout int 20 RPC 续延超时(秒)

限流配置(AmqpLimitOptions)

配置项 类型 默认值 说明
ProducerMaxConnections int 4 生产者连接池大小
ProducerChannelMaxPerConnection int 10 每连接最大 Channel 数(生产者)
ConsumerMaxConnections int 4 消费者连接池大小
ConsumerChannelMaxPerConnection int 50 每连接最大 Channel 数(消费者)
MaxMessageSize int 65536 最大消息体字节数(64KB)
MaxTps int 0 最大 TPS(0=不限)
BatchSize int 100 批量发送大小
BatchTimeoutMs int 50 批量聚合等待时间(ms)
PublishConfirmTimeoutMs int 3000 Publisher Confirm 超时(ms)
RetryCount int 3 批量发送重试次数
RetryBaseDelayMs int 1000 重试退避基础延迟(ms)
PipelineCapacity int 5000 发送管道容量(有界 Channel)

Handler 配置(AmqpHandlerProperty)

配置项 类型 默认值 说明
Concurrency int 1 并发消费线程数
MaxRetryCount int 3 最大重试次数
RetryBaseDelayMs int 1000 指数退避基础延迟(ms)
PrefetchMultiplier int 2 实际 Prefetch = Concurrency × PrefetchMultiplier
DurableQueue bool false 队列是否持久化(广播模式)
QueueName string? null 自定义队列名(null=自动命名)

核心类

类名 说明
AmqpTemplate 消息操作模板(Facade),提供 SendAsyncBroadcastAsyncDelayAsyncHandleAsync
AmqpPublisher 消息发布器抽象基类
AmqpConsumer 消息消费者抽象基类
AmqpBootstrapper 启动引导器,自动扫描 Handler + 优雅关闭
AmqpOptions 配置选项类
AmqpLimitOptions 连接池限流配置
AmqpHandlerProperty 按 RoutingKey 的消费者配置
AmqpHandlerAttribute 消费者方法注解
AmqpProperties 消息属性,支持流式设置 Header/Delay/TTL
IAmqpProperties 消息属性接口

依赖

  • Galosys.Foundation.Core
  • Microsoft.Extensions.Hosting
  • Microsoft.Extensions.Options
Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (3)

Showing the top 3 NuGet packages that depend on Galosys.Foundation.Amqp:

Package Downloads
Galosys.Foundation.EasyNetQ

Galosys.Foundation快速开发库

Galosys.Foundation.RabbitMQ.Client

Galosys.Foundation快速开发库

Galosys.Foundation.RabbitMQ.Benchmarks

Galosys.Foundation快速开发库

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
26.5.20.1 130 5/20/2026
26.5.19.1 128 5/19/2026
26.5.18.1 128 5/18/2026
26.5.15.1 126 5/15/2026
26.5.12.3 133 5/12/2026
26.5.12.2 127 5/12/2026
26.4.27.1-rc1 125 4/26/2026
26.4.25.1-rc1 119 4/25/2026
26.4.22.2-rc7 122 4/22/2026
26.4.22.2-rc6 123 4/22/2026
26.4.22.2-rc4 123 4/22/2026
26.4.22.2-rc3 123 4/22/2026
26.4.19.1-rc1 159 4/19/2026