Galosys.Foundation.Amqp
26.5.20.1
dotnet add package Galosys.Foundation.Amqp --version 26.5.20.1
NuGet\Install-Package Galosys.Foundation.Amqp -Version 26.5.20.1
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Galosys.Foundation.Amqp" Version="26.5.20.1" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Galosys.Foundation.Amqp" Version="26.5.20.1" />
<PackageReference Include="Galosys.Foundation.Amqp" />
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Galosys.Foundation.Amqp --version 26.5.20.1
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: Galosys.Foundation.Amqp, 26.5.20.1"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Galosys.Foundation.Amqp@26.5.20.1
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Galosys.Foundation.Amqp&version=26.5.20.1
#tool nuget:?package=Galosys.Foundation.Amqp&version=26.5.20.1
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
Galosys.Foundation.Amqp
成熟度: 🟢 稳定 — AMQP 抽象层,已被 RabbitMQ.Client 模块正式继承
AMQP 消息抽象层模块,定义统一的发送/消费抽象基类、配置模型、[AmqpHandler] 注解、Handler 自动扫描。用户代码仅依赖此模块的 API,切换底层实现只需改 PackageReference。
架构
Galosys.Foundation.Amqp ← 本模块(抽象层)
▲
│
Galosys.Foundation.RabbitMQ.Client ← RabbitMQ 实现(ConnectionPool + BatchPublishPipeline + Publisher Confirms)
功能特性
- 统一 API —
AmqpTemplate+[AmqpHandler]+IMessageHandler<T>,切换实现零改动 - 声明式消费 —
[AmqpHandler("routingKey")]方法级注解,启动时自动扫描注册 - 广播消息 —
BroadcastAsyncfanout 发布 - 延迟消息 —
DelayAsync支持 x-delayed-message 插件 - 消息属性扩展 —
AmqpProperties流式 API:自定义 MessageId、Header、延迟、TTL - 生命周期管理 —
AmqpBootstrapper(IHostedLifecycleService,自动扫描 Handler + 优雅关闭) - 每消息 DI Scope — 消费端自动创建独立 DI Scope,正确处理 Scoped 生命周期
安装
通常不直接安装此包,而是选择实现包:
<PackageReference Include="Galosys.Foundation.RabbitMQ.Client" Version="x.x.x" />
快速开始
1. 最小配置
{
"RabbitMQ": {
"HostName": "localhost",
"UserName": "guest",
"Password": "guest",
"VirtualHost": "/"
}
}
AppName默认取应用名称,通常无需显式配置。
2. 注册服务
// 方式一:模块自动发现(推荐)
await Host.CreateDefaultBuilder()
.UseModularization()
.Locate()
.RunAsync();
// 方式二:手动注册(需同时注册实现包)
services.AddRabbitMQ(configuration);
3. 发送消息
using RabbitMQ.Client;
public class OrderCreatedMessage
{
public string OrderNo { get; set; }
}
public class OrderService
{
private readonly AmqpTemplate _amqp;
public OrderService(AmqpTemplate amqp) => _amqp = amqp;
// 单条发送
public async Task SendAsync() =>
await _amqp.SendAsync("order_created", new[] { new OrderCreatedMessage { OrderNo = "ORD001" } });
// 批量发送
public async Task SendBatchAsync() =>
await _amqp.SendAsync("order_created", new[]
{
new OrderCreatedMessage { OrderNo = "ORD001" },
new OrderCreatedMessage { OrderNo = "ORD002" }
});
// 广播消息(fanout)
public async Task BroadcastAsync() =>
await _amqp.BroadcastAsync("order_events", new[] { new OrderCreatedMessage { OrderNo = "ORD001" } });
// 延迟消息(需 x-delayed-message 插件)
public async Task DelayAsync() =>
await _amqp.DelayAsync("order_created", new[] { new OrderCreatedMessage { OrderNo = "ORD001" } }, delayMillionSeconds: 5000);
}
4. 消费消息
using RabbitMQ.Client;
[Handler]
public class OrderHandler : IMessageHandler
{
[AmqpHandler("order_created")]
public async Task<bool> HandleAsync(IMessage msg)
{
// 处理消息
return true; // true = ACK, false = Nack
}
// 广播消费
[AmqpHandler("order_events", Broadcast = true, Durable = true)]
public async Task<bool> OnBroadcastAsync(IMessage msg)
{
return true;
}
}
注册机制: 类标注
[Handler]→ DI 自动注册;方法标注[AmqpHandler]→ 启动时自动扫描订阅。类需实现IMessageHandler或IMessageHandler<T>。
详细配置
完整配置示例
{
"RabbitMQ": {
"HostName": "localhost",
"UserName": "guest",
"Password": "guest",
"VirtualHost": "/",
"AutoCreateEnabled": true,
"ConsumerDispatchConcurrency": 8,
"NetworkRecoveryInterval": 10,
"RequestedHeartbeat": 45,
"ContinuationTimeout": 20,
"Limit": {
"ProducerMaxConnections": 4,
"ConsumerMaxConnections": 4,
"MaxMessageSize": 65536,
"BatchSize": 100,
"BatchTimeoutMs": 50,
"PublishConfirmTimeoutMs": 3000,
"PipelineCapacity": 5000
},
"Handlers": {
"order_created": {
"Concurrency": 4,
"MaxRetryCount": 3,
"RetryBaseDelayMs": 1000,
"PrefetchMultiplier": 2,
"DurableQueue": false,
"QueueName": null
}
}
}
}
根配置(AmqpOptions)
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
HostName |
string |
(必填) | RabbitMQ 主机地址,多主机空格/逗号分隔,自动 Ping 选最优 |
UserName |
string? |
null |
认证用户名 |
Password |
string? |
null |
认证密码 |
VirtualHost |
string? |
"/" |
虚拟主机 |
AppName |
string? |
应用名 | 客户端连接名称 |
AutoCreateEnabled |
bool |
true |
自动声明队列/交换器(阿里云限 TPS 场景可关闭) |
Limit |
AmqpLimitOptions |
见下表 | 连接池与限流配置 |
Handlers |
IDictionary<string, AmqpHandlerProperty> |
{} |
按 RoutingKey 粒度的消费者配置 |
ConsumerDispatchConcurrency |
ushort |
CPU核心数*4 |
消费者调度并发数 |
NetworkRecoveryInterval |
int |
10 |
自动重连间隔(秒) |
RequestedHeartbeat |
int |
45 |
心跳间隔(秒) |
ContinuationTimeout |
int |
20 |
RPC 续延超时(秒) |
限流配置(AmqpLimitOptions)
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
ProducerMaxConnections |
int |
4 |
生产者连接池大小 |
ProducerChannelMaxPerConnection |
int |
10 |
每连接最大 Channel 数(生产者) |
ConsumerMaxConnections |
int |
4 |
消费者连接池大小 |
ConsumerChannelMaxPerConnection |
int |
50 |
每连接最大 Channel 数(消费者) |
MaxMessageSize |
int |
65536 |
最大消息体字节数(64KB) |
MaxTps |
int |
0 |
最大 TPS(0=不限) |
BatchSize |
int |
100 |
批量发送大小 |
BatchTimeoutMs |
int |
50 |
批量聚合等待时间(ms) |
PublishConfirmTimeoutMs |
int |
3000 |
Publisher Confirm 超时(ms) |
RetryCount |
int |
3 |
批量发送重试次数 |
RetryBaseDelayMs |
int |
1000 |
重试退避基础延迟(ms) |
PipelineCapacity |
int |
5000 |
发送管道容量(有界 Channel) |
Handler 配置(AmqpHandlerProperty)
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
Concurrency |
int |
1 |
并发消费线程数 |
MaxRetryCount |
int |
3 |
最大重试次数 |
RetryBaseDelayMs |
int |
1000 |
指数退避基础延迟(ms) |
PrefetchMultiplier |
int |
2 |
实际 Prefetch = Concurrency × PrefetchMultiplier |
DurableQueue |
bool |
false |
队列是否持久化(广播模式) |
QueueName |
string? |
null |
自定义队列名(null=自动命名) |
核心类
| 类名 | 说明 |
|---|---|
AmqpTemplate |
消息操作模板(Facade),提供 SendAsync、BroadcastAsync、DelayAsync、HandleAsync |
AmqpPublisher |
消息发布器抽象基类 |
AmqpConsumer |
消息消费者抽象基类 |
AmqpBootstrapper |
启动引导器,自动扫描 Handler + 优雅关闭 |
AmqpOptions |
配置选项类 |
AmqpLimitOptions |
连接池限流配置 |
AmqpHandlerProperty |
按 RoutingKey 的消费者配置 |
AmqpHandlerAttribute |
消费者方法注解 |
AmqpProperties |
消息属性,支持流式设置 Header/Delay/TTL |
IAmqpProperties |
消息属性接口 |
依赖
Galosys.Foundation.CoreMicrosoft.Extensions.HostingMicrosoft.Extensions.Options
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
-
net8.0
- Galosys.Foundation.Core (>= 26.5.20.1)
NuGet packages (3)
Showing the top 3 NuGet packages that depend on Galosys.Foundation.Amqp:
| Package | Downloads |
|---|---|
|
Galosys.Foundation.EasyNetQ
Galosys.Foundation快速开发库 |
|
|
Galosys.Foundation.RabbitMQ.Client
Galosys.Foundation快速开发库 |
|
|
Galosys.Foundation.RabbitMQ.Benchmarks
Galosys.Foundation快速开发库 |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 26.5.20.1 | 130 | 5/20/2026 |
| 26.5.19.1 | 128 | 5/19/2026 |
| 26.5.18.1 | 128 | 5/18/2026 |
| 26.5.15.1 | 126 | 5/15/2026 |
| 26.5.12.3 | 133 | 5/12/2026 |
| 26.5.12.2 | 127 | 5/12/2026 |
| 26.4.27.1-rc1 | 125 | 4/26/2026 |
| 26.4.25.1-rc1 | 119 | 4/25/2026 |
| 26.4.22.2-rc7 | 122 | 4/22/2026 |
| 26.4.22.2-rc6 | 123 | 4/22/2026 |
| 26.4.22.2-rc4 | 123 | 4/22/2026 |
| 26.4.22.2-rc3 | 123 | 4/22/2026 |
| 26.4.19.1-rc1 | 159 | 4/19/2026 |