InMemoryWorkerBalancer 1.0.0-proview.1.5
Additional Details
此包已经弃用请用 FlexibleAckDispatcher.InMemory.Core
This is a prerelease version of InMemoryWorkerBalancer.
dotnet add package InMemoryWorkerBalancer --version 1.0.0-proview.1.5
NuGet\Install-Package InMemoryWorkerBalancer -Version 1.0.0-proview.1.5
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="InMemoryWorkerBalancer" Version="1.0.0-proview.1.5" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="InMemoryWorkerBalancer" Version="1.0.0-proview.1.5" />
<PackageReference Include="InMemoryWorkerBalancer" />
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add InMemoryWorkerBalancer --version 1.0.0-proview.1.5
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: InMemoryWorkerBalancer, 1.0.0-proview.1.5"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package InMemoryWorkerBalancer@1.0.0-proview.1.5
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=InMemoryWorkerBalancer&version=1.0.0-proview.1.5&prerelease
#tool nuget:?package=InMemoryWorkerBalancer&version=1.0.0-proview.1.5&prerelease
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
FlexibleAckDispatcher
一个高性能、灵活的内存消息调度系统,基于 .NET 实现的发布-订阅模式(Pub/Sub),支持灵活的消息确认机制、并发控制和负载均衡。
🌟 核心特性
- ✅ 灵活的 ACK 机制:支持在处理函数内部或外部手动确认消息,类似 RabbitMQ 的模式
- 🚀 高性能异步处理:基于
System.Threading.Channels实现的高效消息队列 - 🔄 智能负载均衡:多订阅者自动分发消息,实现 Worker 级别的负载均衡与连接池
- 🎯 细粒度并发控制:支持 Prefetch(预取数量)和 ConcurrencyLimit(并发限制)双重控制
- ⏱️ 超时保护机制:内置消息处理超时,自动释放 Worker 槽位,防止阻塞
- 📊 丰富运行时指标:实时监控订阅者数量、空闲 Worker 数、执行中任务数以及 Worker 快照
- 🛠️ 可配默认策略:通过
PubSubManagerOptions统一下发默认的 Prefetch、并发限制、处理超时与 ACK 超时 - 🔒 更安全的载荷控制:内置最大载荷尺寸限制(默认 10 MiB),避免异常数据冲击内存
- 🔌 动态热插拔:支持运行时动态添加和移除订阅者
- 🛡️ 失败保护:连续失败阈值,达到限制后自动停止 Worker,防止级联故障
📦 安装
NuGet 包管理器
dotnet add package InMemoryWorkerBalancer
Package Manager Console
Install-Package InMemoryWorkerBalancer
.csproj 文件
<PackageReference Include="InMemoryWorkerBalancer" Version="1.0.0" />
📂 项目结构
FlexibleAckDispatcher/
├── src/
│ └── InMemoryWorkerBalancer/ # 核心库
│ ├── Abstractions/ # 对外接口
│ │ ├── IWorkerMessageHandler.cs # 消息处理器接口
│ │ ├── IPubSubChannel.cs # 发布通道接口
│ │ ├── IPubSubManager.cs # 管理器接口
│ │ ├── IPubSubSubscription.cs # 订阅句柄接口
│ │ └── IWorkerPayloadSerializer.cs # 负载序列化器接口
│ ├── Internal/ # 内部实现
│ │ ├── WorkerDispatcher.cs # 消息调度器
│ │ ├── WorkerEndpoint.cs # Worker 端点
│ │ ├── WorkerManager.cs # Worker 管理器
│ │ ├── WorkerProcessor.cs # Worker 处理器
│ │ ├── WorkerDeliveryContext.cs # 消息传递上下文
│ │ ├── WorkerCapacity.cs # Worker 容量控制
│ │ ├── WorkerAckToken.cs # ACK 令牌
│ │ ├── PubSubSubscription.cs # 订阅实现
│ │ └── SnowflakeIdGenerator.cs # 雪花 ID 生成器
│ ├── JsonWorkerPayloadSerializer.cs # 默认 JSON 序列化器
│ ├── PubSubManager.cs # 发布订阅管理器(核心)
│ ├── PubSubManagerOptions.cs # 管理器配置选项
│ ├── SubscriptionOptions.cs # 订阅配置选项
│ ├── WorkerEndpointSnapshot.cs # Worker 快照结构
│ ├── WorkerMessage.cs # 消息包装器
│ └── WorkerProcessingDelegate.cs # 处理委托
└── Test/
└── TestWorkerBalancerPubSub.cs # 综合测试
🏗️ 架构设计
核心组件
- PubSubManager: 消息发布-订阅管理器,负责整体调度和资源管理
- WorkerManager: Worker 生命周期管理,维护 Worker 池和负载均衡
- WorkerDispatcher: 消息分发器,将消息路由到空闲的 Worker
- WorkerProcessor: Worker 处理器,从专属通道读取并处理消息
- TypedPubSubChannel: 类型化的发布通道,支持泛型消息
工作流程
- 订阅阶段:调用
SubscribeAsync创建 Worker,注册到 WorkerManager - 发布阶段:调用
PublishAsync将消息序列化后写入主通道 - 调度阶段:WorkerDispatcher 从主通道读取消息,分发给空闲 Worker
- 处理阶段:WorkerProcessor 从 Worker 专属通道读取消息并处理
- 确认阶段:调用
AckAsync释放 Worker 槽位,允许处理下一条消息
🚀 快速开始
1. 基本用法
using InMemoryWorkerBalancer;
// 创建 PubSubManager,使用默认 JSON 序列化和 NullLogger
await using var manager = PubSubManager.Create();
// 订阅消息并立即 ACK
await manager.SubscribeAsync<int>(async (message, cancellationToken) =>
{
Console.WriteLine($"处理消息: {message.Payload}");
await message.AckAsync(); // 必须调用 ACK 以释放 Worker 槽位
});
// 发布消息
for (int i = 0; i < 10; i++)
{
await manager.PublishAsync(i);
}
await Task.Delay(1000); // 等待处理完成
2. 配置管理器选项
await using var manager = PubSubManager.Create(options => options
.WithLogger(logger) // 配置日志记录器
.WithSerializer(customSerializer) // 配置自定义序列化器
.WithDefaultPrefetch(8) // 设置统一默认的 Prefetch
.WithDefaultConcurrencyLimit(4) // 统一默认并发上限
.WithDefaultHandlerTimeout(TimeSpan.FromSeconds(45)) // 默认处理超时
.WithDefaultFailureThreshold(5) // 默认失败阈值
.WithDefaultAckTimeout(TimeSpan.FromMinutes(5)) // 默认 ACK 超时
.WithAckMonitorInterval(TimeSpan.FromMilliseconds(200))// 调整 ACK 超时轮询频率
.OnWorkerAddedHandler(async snapshot => // Worker 添加事件
{
Console.WriteLine($"Worker {snapshot.Id} ({snapshot.Name}) 已加入");
})
.OnWorkerRemovedHandler(async snapshot => // Worker 移除事件
{
Console.WriteLine($"Worker {snapshot.Id} 已离开");
}));
3. 配置订阅选项
await manager.SubscribeAsync<int>(
async (message, cancellationToken) =>
{
Console.WriteLine($"Worker {message.WorkerId} 处理: {message.Payload}");
Console.WriteLine($"开始处理时间: {message.StartedAt:O}");
await Task.Delay(100, cancellationToken);
await message.AckAsync();
},
options => options
.WithName("OrderProcessor") // Worker 名称
.WithPrefetch(10) // 预取 10 条消息
.WithConcurrencyLimit(5) // 最大并发 5
.WithHandlerTimeout(TimeSpan.FromSeconds(30)) // 30秒超时
.WithFailureThreshold(3) // 3次失败后停止
.WithAckTimeout(TimeSpan.FromMinutes(10))); // 10 分钟未 ACK 自动回收
参数说明:
- Prefetch: Worker 专属通道的容量,控制预取消息数量
- ConcurrencyLimit: Worker 内部最大并发任务数(≤ Prefetch)
- HandlerTimeout: 单条消息的最大处理时间
- FailureThreshold: Worker 连续失败次数阈值
- AckTimeout: 消息在规定时间内未确认则自动释放,避免占用槽位
4. 外部手动 ACK(类似 RabbitMQ)
var deliveryTags = new List<long>();
await manager.SubscribeAsync<int>(async (message, cancellationToken) =>
{
// 保存 deliveryTag,不立即 ACK
deliveryTags.Add(message.DeliveryTag);
Console.WriteLine($"收到消息: {message.Payload},deliveryTag: {message.DeliveryTag}");
// 注意:不在此处调用 AckAsync()
});
await manager.PublishAsync(42);
await Task.Delay(100); // 等待消息被投递
// 稍后在外部确认
if (deliveryTags.Count > 0)
{
await manager.AckAsync(deliveryTags[0]);
}
// 或者使用 AckTimeout 自动回收
await manager.SubscribeAsync<int>(
async (message, ct) =>
{
Console.WriteLine($"收到: {message.Payload}");
// 不 ACK,等待 AckTimeout 自动释放
},
opts => opts.WithPrefetch(1).WithAckTimeout(TimeSpan.FromMinutes(5)));
5. 多订阅者负载均衡
// 创建 3 个订阅者,消息将自动负载均衡
for (int i = 0; i < 3; i++)
{
int workerIndex = i;
await manager.SubscribeAsync<int>(
async (message, cancellationToken) =>
{
Console.WriteLine($"订阅者 {workerIndex} (WorkerId={message.WorkerId}) 处理: {message.Payload}");
Console.WriteLine($"开始处理时间: {message.StartedAt:O}");
await Task.Delay(100, cancellationToken);
await message.AckAsync();
},
options => options
.WithName($"Worker-{i}")
.WithPrefetch(5));
}
// 发布 30 条消息,将自动分发到 3 个订阅者
for (int i = 0; i < 30; i++)
{
await manager.PublishAsync(i);
}
6. 使用接口方式(推荐用于复杂业务)
using InMemoryWorkerBalancer.Abstractions;
public class OrderMessageHandler : IWorkerMessageHandler<int>
{
public async Task HandleAsync(WorkerMessage<int> message, CancellationToken cancellationToken)
{
Console.WriteLine($"Worker {message.WorkerId} 处理订单: {message.Payload}");
await ProcessOrderAsync(message.Payload, cancellationToken);
await message.AckAsync(); // 处理完成后确认
}
private async Task ProcessOrderAsync(int orderId, CancellationToken ct)
{
// 业务逻辑
await Task.Delay(100, ct);
}
}
var handler = new OrderMessageHandler();
await manager.SubscribeAsync<int>(handler, options => options.WithPrefetch(5));
7. 动态订阅与取消订阅
// 添加订阅
var subscription1 = await manager.SubscribeAsync<int>(
async (message, ct) =>
{
Console.WriteLine($"Worker {message.WorkerId}: {message.Payload}");
await message.AckAsync();
},
options => options.WithName("DynamicWorker").WithPrefetch(3));
await Task.Delay(1000);
// 发布一些消息
for (int i = 0; i < 10; i++)
{
await manager.PublishAsync(i);
}
// 取消订阅
await subscription1.DisposeAsync();
Console.WriteLine($"剩余订阅者数: {manager.SubscriberCount}");
🔐 高级配置与安全性
- ACK 超时监控:默认根据订阅的
AckTimeout自动推导轮询间隔,可通过WithAckMonitorInterval精细化控制。内部采用按需启动的监控任务,并限制待处理 ACK 的缓存数量,避免资源膨胀。 - 统一默认值:
WithDefaultPrefetch、WithDefaultConcurrencyLimit、WithDefaultHandlerTimeout、WithDefaultAckTimeout等方法可以集中下发约束,新订阅若未覆写将自动继承。 - 消息载荷保护:
JsonWorkerPayloadSerializer在序列化/反序列化阶段都会校验载荷长度(默认 4 MiB)。如需处理更大数据,请显式传入更高的maxPayloadSize或自定义实现。 - 任务计时:
WorkerMessage.StartedAt暴露任务开始时间,配合AckTimeout或自定义监控可以统计处理耗时、排查慢任务。 - 日志控制:Worker 生命周期相关日志默认为 Debug 级别,可通过注入的
ILogger调整过滤级别或自定义输出。
📊 运行时观测与监控
获取基本指标
// 订阅者数量
Console.WriteLine($"当前订阅者数: {manager.SubscriberCount}");
// 空闲 Worker 数(可用于负载判断)
Console.WriteLine($"空闲 Worker 数: {manager.IdleCount}");
Console.WriteLine($"正在处理任务数: {manager.RunningCount}");
Console.WriteLine($"队列中等待的 Worker 数: {manager.QueueCount}");
Console.WriteLine($"累计调度任务数: {manager.DispatchedCount}");
Console.WriteLine($"累计完成任务数: {manager.CompletedCount}");
// 获取所有 Worker 的详细快照
var snapshots = manager.GetSnapshot();
foreach (var snapshot in snapshots)
{
Console.WriteLine(
$"Worker {snapshot.Id} ({snapshot.Name ?? "未命名"}): " +
$"Active={snapshot.IsActive}, " +
$"Concurrent={snapshot.CurrentConcurrency}/{snapshot.MaxConcurrency}, " +
$"Timeout={snapshot.HandlerTimeout}, " +
$"Fault={snapshot.Fault?.Message ?? "None"}");
}
实现健康检查
public class HealthCheck
{
public static async Task<bool> CheckHealthAsync(IPubSubManager manager)
{
var snapshots = manager.GetSnapshot();
// 检查所有 Worker 是否活跃
foreach (var snapshot in snapshots)
{
if (!snapshot.IsActive || snapshot.Fault != null)
{
Console.WriteLine($"警告: Worker {snapshot.Id} 状态异常");
return false;
}
}
// 检查是否有空闲 Worker
if (manager.IdleCount == 0 && manager.RunningCount > 0)
{
Console.WriteLine("警告: 所有 Worker 都在忙,可能处理能力不足");
return false;
}
return true;
}
}
// 使用示例
var isHealthy = await HealthCheck.CheckHealthAsync(manager);
Console.WriteLine($"系统健康状态: {(isHealthy ? "正常" : "异常")}");
⚠️ 重要注意事项
1. 消息确认是必需的
⚠️ 未确认的消息会一直占用 Worker 槽位,导致系统阻塞!
// ❌ 错误示例:忘记 ACK,导致 Worker 阻塞
await manager.SubscribeAsync<int>(async (message, ct) =>
{
Console.WriteLine($"处理: {message.Payload}");
// 忘记调用 message.AckAsync()!
});
// ✅ 正确示例:及时 ACK
await manager.SubscribeAsync<int>(async (message, ct) =>
{
Console.WriteLine($"处理: {message.Payload}");
await message.AckAsync(); // 必须调用
});
2. 避免重复确认
var deliveryTag = 0L;
await manager.SubscribeAsync<int>(async (message, ct) =>
{
deliveryTag = message.DeliveryTag;
await message.AckAsync();
});
await manager.PublishAsync(1);
await Task.Delay(100);
// ✅ 第一次确认 - 成功
await manager.AckAsync(deliveryTag);
// ❌ 第二次确认 - 会抛出异常
try
{
await manager.AckAsync(deliveryTag); // InvalidOperationException
}
catch (InvalidOperationException ex)
{
Console.WriteLine(ex.Message); // "未找到待确认的消息"
}
3. Prefetch 与内存权衡
// Prefetch 越大,内存占用越多,但处理延迟可能降低
// 建议:根据实际消息大小和处理时间调整
// 小消息,快速处理
options => options.WithPrefetch(100)
// 大消息,慢速处理
options => options.WithPrefetch(5)
4. 并发限制与资源消耗
// ConcurrencyLimit 控制 Worker 内部的并发任务数
// 过高可能导致资源耗尽,过低可能导致吞吐量不足
// 推荐:根据 CPU 核心数和 I/O 特点设置
options => options
.WithPrefetch(10)
.WithConcurrencyLimit(4) // CPU 密集型设为 CPU 核心数
// I/O 密集型可以设更高
5. 超时设置要合理
// 超时时间应略大于实际平均处理时间
options => options
.WithHandlerTimeout(TimeSpan.FromSeconds(30)) // 根据业务调整
6. 泛型类型必须明确
// ❌ 错误:无法推断类型
await manager.SubscribeAsync(async (msg, ct) => { });
// ✅ 正确:显式指定类型
await manager.SubscribeAsync<int>(async (msg, ct) => { });
await manager.SubscribeAsync<string>(async (msg, ct) => { });
await manager.SubscribeAsync<Order>(async (msg, ct) => { });
7. 正确处理异常
await manager.SubscribeAsync<int>(async (message, cancellationToken) =>
{
try
{
await ProcessMessageAsync(message.Payload, cancellationToken);
await message.AckAsync();
}
catch (Exception ex)
{
Console.WriteLine($"处理失败: {ex.Message}");
// 不调用 Ack,消息会被重新投递或记录到失败队列
}
});
8. 使用 CancellationToken
await manager.SubscribeAsync<int>(async (message, cancellationToken) =>
{
// 始终检查取消令牌
cancellationToken.ThrowIfCancellationRequested();
await ProcessAsync(message.Payload, cancellationToken);
await message.AckAsync();
});
🧪 测试
仓库包含覆盖常见场景的综合单元测试,包括:
- ✅ 基本消息处理
- ✅ 外部手动 ACK
- ✅ Prefetch 限制验证
- ✅ 并发控制验证
- ✅ 超时保护验证
- ✅ 多订阅者负载均衡
- ✅ 动态订阅管理
- ✅ 运行时指标监控
运行测试:
dotnet test
📊 性能特点
核心性能优化
零拷贝设计
- 使用
ReadOnlyMemory<byte>传递消息,避免不必要的内存拷贝 - 基于
System.Threading.Channels实现高效的消息队列
- 使用
无锁并发
- 核心路径使用
ConcurrentDictionary和Channel - 减少锁竞争,提升高并发场景性能
- 核心路径使用
异步优先
- 全异步 API,充分利用 .NET 异步机制
- 异步发布、订阅、处理,不阻塞线程池
内存高效
- 使用对象池和可重用结构减少 GC 压力
- 延迟分配,按需创建对象
性能指标参考
- 支持数万级别的消息/秒吞吐量
- 极低的处理延迟(微秒级)
- 内存占用与 Prefetch 成正比
- 线性扩展性(多订阅者场景)
适用场景
✅ 推荐使用
- 内存消息队列
- 事件驱动架构
- 微服务内部通信
- 高并发异步任务分发
- 需要灵活 ACK 的短时间任务处理
❌ 不适用
- 持久化存储(纯内存,进程退出后丢失)
- 跨进程/网络通信(需配合消息队列中间件)
- 超长时间任务(建议使用专门的作业调度系统)
🛠️ 技术栈
- .NET 8.0 / 9.0 - 跨平台支持
- C# 12 - 最新语言特性
- System.Threading.Channels - 高性能异步队列
- Microsoft.Extensions.Logging.Abstractions - 日志抽象
📚 更多资源
相关链接
使用建议
- 小消息场景:Prefetch 可设较大值(50-100),提升吞吐量
- 大消息场景:Prefetch 设较小值(5-10),控制内存占用
- I/O 密集型:ConcurrencyLimit 可设较大值(10-50)
- CPU 密集型:ConcurrencyLimit 应等于或略大于 CPU 核心数
- 混合场景:根据实际业务情况调整参数
📄 许可证
本项目采用 MIT 许可证。
🤝 贡献
欢迎提交 Issue 和 Pull Request!
贡献指南
- Fork 项目
- 创建特性分支 (
git checkout -b feature/AmazingFeature) - 提交更改 (
git commit -m 'Add some AmazingFeature') - 推送到分支 (
git push origin feature/AmazingFeature) - 开启 Pull Request
项目作者: FlexibleAckDispatcher Team
当前版本: 1.0.0
最后更新: 2025-10-28
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
-
net8.0
- Microsoft.Extensions.Logging.Abstractions (>= 8.0.0)
-
net9.0
- Microsoft.Extensions.Logging.Abstractions (>= 8.0.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated | |
|---|---|---|---|
| 1.0.0-proview.1.5 | 257 | 10/28/2025 | |
| 1.0.0-preview.24 | 142 | 10/31/2025 | |
| 1.0.0-preview.1.9 | 271 | 10/29/2025 | |
| 1.0.0-preview.1.8 | 256 | 10/29/2025 | |
| 1.0.0-preview.1.7 | 257 | 10/29/2025 | |
| 1.0.0-preview.1.6 | 263 | 10/28/2025 | |
| 1.0.0-preview.1.5 | 244 | 10/28/2025 | |
| 1.0.0-preview.1.4 | 241 | 10/28/2025 | |
| 1.0.0-preview.1.3 | 260 | 10/28/2025 | |
| 1.0.0-preview.1.2 | 245 | 10/28/2025 | |
| 1.0.0-preview.1.1 | 239 | 10/28/2025 | |
| 1.0.0-preview.1 | 254 | 10/28/2025 |