Sparkdo.Mediation
1.0.3
There is a newer prerelease version of this package available.
See the version list below for details.
See the version list below for details.
dotnet add package Sparkdo.Mediation --version 1.0.3
NuGet\Install-Package Sparkdo.Mediation -Version 1.0.3
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Sparkdo.Mediation" Version="1.0.3" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Sparkdo.Mediation" Version="1.0.3" />
<PackageReference Include="Sparkdo.Mediation" />
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Sparkdo.Mediation --version 1.0.3
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: Sparkdo.Mediation, 1.0.3"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Sparkdo.Mediation@1.0.3
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Sparkdo.Mediation&version=1.0.3
#tool nuget:?package=Sparkdo.Mediation&version=1.0.3
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
Sparkdo.Mediation 使用文档
简介
Sparkdo.Mediation 是 Sparkdo 生态中的高性能中介者(Mediator)模式实现,提供了完整的 CQRS(命令查询职责分离)消息调度基础设施。该库通过预编译的中间件管道实现 O(1) 类型查找,支持命令发送、查询流式响应、通知事件广播等核心消息传递模式。
作为现代 .NET 应用的核心消息调度层,Sparkdo.Mediation 内置了高性能的上下文对象池、可扩展的中间件管道、OpenTelemetry 诊断支持以及源码生成器驱动的零反射 DI 注册。无论是简单的单体应用还是复杂的微服务架构,该库都能提供一致、高效的开发体验。
核心特性
1. 完整的 CQRS 支持
- 命令(Request):支持有返回值和无返回值两种命令模式
- 查询(StreamRequest):支持
IAsyncEnumerable<T>流式响应 - 通知(Notification):支持事件广播,多处理器并行/串行执行
2. 高性能设计
- O(1) 类型查找:基于
FrozenDictionary<Type, MediatorDelegate>的预编译管道查找 - 上下文对象池:
MediatorContext使用ObjectPool避免频繁 GC 分配 - PoolingAsyncValueTaskMethodBuilder:利用
ValueTask池化减少异步状态机分配 - 预编译中间件:
MediatorMiddlewareCompiler将中间件链编译为单一委托
3. 可扩展中间件管道
- 灵活的中间件注册(前置/后置/附加)
- 内置诊断中间件(
MediatorDiagnosticMiddleware) - 支持自定义中间件扩展
4. OpenTelemetry 集成
- 通过
SparkdoMediatorActivitySource导出 Activity 追踪 SemanticConventions提供标准消息语义约定属性MediatorDiagnosticEventListener支持自定义诊断监听
5. 源码生成器驱动
- 编译时自动发现消息类型和处理器
- 零反射 DI 注册,减少启动时间和内存占用
[MediatorModule]和[GenericHandler]特性支持
6. 功能特性系统
IFeatureCollection提供中间件工厂可访问的功能特性容器- 支持对象池化特性的
PooledFeatureCollection
7. 多实例支持
- 支持命名中介者实例(Keyed Services)
- 每个实例独立配置中间件、处理器和服务
项目信息
- 包名称: Sparkdo.Mediation
- 依赖: Sparkdo.Mediation.Abstractions, Microsoft.Extensions.DependencyInjection, Microsoft.Extensions.ObjectPool, Microsoft.Extensions.Options
- 标签: mediator, cqrs, middleware, pipeline, sparkdo
安装
Package Manager
Install-Package Sparkdo.Mediation
.NET CLI
dotnet add package Sparkdo.Mediation
PackageReference
<PackageReference Include="Sparkdo.Mediation" Version="1.0.0" />
快速入门
1. 定义消息和处理模型
using Sparkdo.Mediation;
// 命令 — 无返回值
public record CreateUserCommand(string Name, string Email) : IRequest;
// 命令 — 带返回值
public record GetUserQuery(int UserId) : IRequest<UserDto>;
// 流式查询
public record ExportDataQuery(string Filter) : IStreamRequest<DataRow>;
// 通知
public record UserCreatedEvent(int UserId, string Name) : INotification;
2. 实现处理器
using Sparkdo.Mediation;
// 命令处理器
public class CreateUserHandler : IRequestHandler<CreateUserCommand>
{
private readonly AppDbContext _db;
public CreateUserHandler(AppDbContext db) => _db = db;
public async ValueTask HandleAsync(CreateUserCommand request, CancellationToken ct)
{
_db.Users.Add(new User { Name = request.Name, Email = request.Email });
await _db.SaveChangesAsync(ct);
}
}
// 查询处理器
public class GetUserHandler : IRequestHandler<GetUserQuery, UserDto>
{
private readonly AppDbContext _db;
public GetUserHandler(AppDbContext db) => _db = db;
public async ValueTask<UserDto> HandleAsync(GetUserQuery request, CancellationToken ct)
{
var user = await _db.Users.FindAsync([request.UserId], ct);
return user is null ? null! : new UserDto(user.Id, user.Name, user.Email);
}
}
// 流式查询处理器
public class ExportDataHandler : IStreamRequestHandler<ExportDataQuery, DataRow>
{
public async IAsyncEnumerable<DataRow> HandleAsync(
ExportDataQuery request, [EnumeratorCancellation] CancellationToken ct)
{
await foreach (var row in QueryData(request.Filter, ct))
yield return row;
}
}
// 通知处理器
public class UserCreatedEmailHandler : INotificationHandler<UserCreatedEvent>
{
public async ValueTask HandleAsync(UserCreatedEvent notification, CancellationToken ct)
{
await SendWelcomeEmail(notification.Name, notification.Email);
}
}
3. 注册服务
// 在 Program.cs 或 Startup 中注册
builder.Services.AddMediator(mediator =>
{
mediator.AddHandler<CreateUserHandler>();
mediator.AddHandler<GetUserHandler>();
mediator.AddHandler<ExportDataHandler>();
mediator.AddHandler<UserCreatedEmailHandler>();
});
4. 使用 Mediator
public class UserController
{
private readonly IMediator _mediator;
public UserController(IMediator mediator) => _mediator = mediator;
// 发送无返回值命令
public async Task CreateUser(string name, string email)
{
await _mediator.SendAsync(new CreateUserCommand(name, email));
}
// 发送带返回值命令
public async Task<UserDto> GetUser(int id)
{
return await _mediator.SendAsync<UserDto>(new GetUserQuery(id));
}
// 流式查询
public async IAsyncEnumerable<DataRow> ExportData(string filter)
{
await foreach (var row in _mediator.CreateStreamAsync(new ExportDataQuery(filter)))
yield return row;
}
// 发布通知
public async Task NotifyUserCreated(int userId, string name)
{
await _mediator.PublishAsync(new UserCreatedEvent(userId, name));
}
}
核心功能
动态消息发送
// 运行时类型动态发送 — 在管道编译时确定的场景下使用
object message = GetMessageFromQueue();
var result = await ((ISender)mediator).SendAsync(message);
管道中间件
// 自定义中间件
public class LoggingMiddleware
{
public static MediatorMiddlewareConfiguration Create()
=> MediatorMiddlewareConfiguration.Create(
"Logging",
(ctx, next) =>
{
Console.WriteLine($"处理消息: {ctx.MessageType.Name}");
return next(ctx);
});
}
// 注册中间件
builder.Services.AddMediator(mediator =>
{
mediator.Use(LoggingMiddleware.Create());
// 在特定中间件之前插入
mediator.Use(AuditMiddleware.Create(), before: "Logging");
// 在特定中间件之后插入
mediator.Use(MetricsMiddleware.Create(), after: "Diagnostics");
});
通知发布模式
// 顺序模式(默认)— 处理程序依次执行
builder.Services.AddMediator(mediator =>
{
mediator.ConfigureOptions(options =>
{
options.NotificationPublishMode = NotificationPublishMode.Sequential;
});
});
// 并发模式 — 处理程序并行执行
builder.Services.AddMediator(mediator =>
{
mediator.ConfigureOptions(options =>
{
options.NotificationPublishMode = NotificationPublishMode.Concurrent;
});
});
处理器生命周期管理
builder.Services.AddMediator(mediator =>
{
// 默认 Scoped 生命周期
mediator.ConfigureOptions(options =>
{
options.ServiceLifetime = ServiceLifetime.Transient;
});
});
实际应用示例
ASP.NET Core 完整集成
// Program.cs
var builder = WebApplication.CreateBuilder(args);
// 注册 Mediator
builder.Services.AddMediator(mediator =>
{
// 注册所有处理器(也可使用源码生成器自动注册)
mediator.AddHandler<CreateOrderHandler>();
mediator.AddHandler<GetOrderHandler>();
mediator.AddHandler<OrderCreatedNotificationHandler>();
// 添加诊断中间件
mediator.Use(MediatorDiagnosticMiddleware.Create());
// 配置 OpenTelemetry 监听
mediator.ConfigureServices((sp, services) =>
{
services.AddSingleton<IMediatorDiagnosticEventListener>(
new ActivityMediatorDiagnosticListener(sp.GetRequiredService<SparkdoMediatorActivitySource>()));
});
});
var app = builder.Build();
app.MapPost("/orders", async (CreateOrderCommand cmd, IMediator mediator) =>
{
await mediator.SendAsync(cmd);
return Results.Created();
});
命令管道模式(Pipeline Behavior)
public class ValidationMiddleware<TRequest, TResponse> : IRequestHandler<TRequest, TResponse>
where TRequest : IRequest<TResponse>
{
private readonly IRequestHandler<TRequest, TResponse> _inner;
private readonly IValidator<TRequest> _validator;
public ValidationMiddleware(
IRequestHandler<TRequest, TResponse> inner,
IValidator<TRequest> validator)
{
_inner = inner;
_validator = validator;
}
public async ValueTask<TResponse> HandleAsync(TRequest request, CancellationToken ct)
{
var result = await _validator.ValidateAsync(request, ct);
if (!result.IsValid)
throw new ValidationException(result.Errors);
return await _inner.HandleAsync(request, ct);
}
}
多实例 Mediator
// 注册多个命名中介者实例
builder.Services.AddMediator("Commands", mediator =>
{
mediator.AddHandler<CreateUserHandler>();
});
builder.Services.AddMediator("Queries", mediator =>
{
mediator.AddHandler<GetUserHandler>();
});
// 使用命名实例
public class CommandService([FromKeyedServices("Commands")] IMediator mediator)
{
public async Task Execute(IRequest command) => await mediator.SendAsync(command);
}
API 参考
MediatorServiceCollectionExtensions
| 方法 | 说明 |
|---|---|
AddMediator() |
添加默认(未命名)的中介者基础设施,返回 IMediatorHostBuilder |
AddMediator(string name) |
添加命名的中介者基础设施,返回 IMediatorHostBuilder |
IMediatorHostBuilder
| 属性/方法 | 说明 |
|---|---|
Name |
获取此中介者实例的名称 |
Services |
用于注册处理器服务的 IServiceCollection |
AddHandler<THandler>(Action<IMediatorHandlerDescriptor>?) |
注册处理器类型 |
ConfigureOptions(Action<MediatorOptions>) |
配置中介者选项 |
Use(MediatorMiddlewareConfiguration, string? before, string? after) |
注册中间件 |
MediatorOptions
| 属性 | 默认值 | 说明 |
|---|---|---|
ServiceLifetime |
Scoped |
处理器的默认服务生命周期 |
NotificationPublishMode |
Sequential |
通知处理器的调度方式 |
NotificationPublishMode
| 值 | 说明 |
|---|---|
Sequential |
处理器按顺序调用,前一个完成后才执行下一个 |
Concurrent |
处理器通过 Task.WhenAll 并发调用 |
MediatorHandlerKind
| 值 | 说明 |
|---|---|
Request |
无返回值的请求处理器 |
RequestResponse |
带返回值的请求处理器 |
Stream |
流式查询处理器 |
Notification |
通知事件处理器 |
MediatorContext
每次消息调度的执行上下文:
| 属性/方法 | 说明 |
|---|---|
ServiceProvider |
当前作用域的 IServiceProvider |
Message |
当前正在处理的消息实例 |
MessageType |
消息的类型 |
ResponseType |
预期的响应类型 |
CancellationToken |
取消令牌 |
RequestServices |
请求级别服务的快捷访问 |
Result |
处理结果 |
Pools |
对象池访问器 |
IMediatorPools
对象池访问接口,用于租用和归还 MediatorContext:
| 方法 | 说明 |
|---|---|
RentContext() |
从对象池租用一个上下文 |
ReturnContext(MediatorContext) |
将上下文归还到对象池 |
诊断与可观测性
活动源
// 创建诊断活动源
var activitySource = new SparkdoMediatorActivitySource("MyApp.Mediator");
诊断事件监听器
public class MyDiagnosticListener : IMediatorDiagnosticEventListener
{
public ValueTask BeforeHandler(MediatorContext context) { /* ... */ return default; }
public ValueTask AfterHandler(MediatorContext context) { /* ... */ return default; }
public ValueTask OnError(MediatorContext context, Exception ex) { /* ... */ return default; }
}
OpenTelemetry 约定属性
| 属性 | 值 | 说明 |
|---|---|---|
MessagingSystem |
Sparkdo.mediator |
消息系统标识 |
OperationTypeSend |
send |
发送操作 |
OperationTypePublish |
publish |
发布操作 |
架构说明
IMediator (Mediator)
├── SendAsync → ISender
│ ├── IRequest (void)
│ ├── IRequest<T> (with response)
│ └── IStreamRequest<T> (async stream)
└── PublishAsync → IPublisher
└── INotification (event)
MediatorRuntime (O(1) FrozenDictionary lookup)
├── MediatorDelegate[] (compiled pipeline for each message type)
├── MediatorContext pool (ObjectPool<MediatorContext>)
└── MediatorMiddlewareCompiler (pipeline compilation)
测试
该项目通过 Sparkdo.Mediation.Tests 进行单元和集成测试:
dotnet test test/Sparkdo.Mediation.Tests/
测试覆盖:
- 命令调度(
MediatorDispatchTests) - 处理器注册(
AddHandlerTests) - 上下文对象池(
ContextPoolingTests) - 中间件管道(
MiddlewarePipelineTests)
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
-
net10.0
- JetBrains.Annotations (>= 2025.2.4)
- Microsoft.Extensions.Configuration (>= 10.0.8)
- Microsoft.Extensions.Configuration.CommandLine (>= 10.0.8)
- Microsoft.Extensions.Configuration.EnvironmentVariables (>= 10.0.8)
- Microsoft.Extensions.Configuration.FileExtensions (>= 10.0.8)
- Microsoft.Extensions.Configuration.Json (>= 10.0.8)
- Microsoft.Extensions.Configuration.UserSecrets (>= 10.0.8)
- Microsoft.Extensions.DependencyInjection (>= 10.0.8)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.8)
- Microsoft.Extensions.DependencyModel (>= 10.0.8)
- Microsoft.Extensions.FileProviders.Physical (>= 10.0.8)
- Microsoft.Extensions.Hosting.Abstractions (>= 10.0.8)
- Microsoft.Extensions.Localization (>= 10.0.8)
- Microsoft.Extensions.Logging (>= 10.0.8)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.8)
- Microsoft.Extensions.ObjectPool (>= 10.0.8)
- Microsoft.Extensions.Options (>= 10.0.8)
- Microsoft.Extensions.Options.ConfigurationExtensions (>= 10.0.8)
- Microsoft.Extensions.Options.DataAnnotations (>= 10.0.8)
- Microsoft.Extensions.Primitives (>= 10.0.8)
- Nito.AsyncEx.Context (>= 5.1.2)
- Sparkdo.Core (>= 1.0.3)
- Sparkdo.Mediation.Abstractions (>= 1.0.3)
- System.Linq.Dynamic.Core (>= 1.7.2)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on Sparkdo.Mediation:
| Package | Downloads |
|---|---|
|
Sparkdo.Mediation.Http
Sparkdo 中介者模式 HTTP 扩展,将 CQRS 消息调度能力延伸到 HTTP 调用场景 |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 1.0.4-preview.3 | 80 | 6/8/2026 |
| 1.0.4-preview.2 | 73 | 6/7/2026 |
| 1.0.4-preview.1 | 79 | 6/7/2026 |
| 1.0.3 | 168 | 6/6/2026 |
| 1.0.3-preview.1 | 126 | 5/17/2026 |