Sparkdo.Mediation 1.0.3

There is a newer prerelease version of this package available.
See the version list below for details.
dotnet add package Sparkdo.Mediation --version 1.0.3
                    
NuGet\Install-Package Sparkdo.Mediation -Version 1.0.3
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Sparkdo.Mediation" Version="1.0.3" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Sparkdo.Mediation" Version="1.0.3" />
                    
Directory.Packages.props
<PackageReference Include="Sparkdo.Mediation" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Sparkdo.Mediation --version 1.0.3
                    
#r "nuget: Sparkdo.Mediation, 1.0.3"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Sparkdo.Mediation@1.0.3
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Sparkdo.Mediation&version=1.0.3
                    
Install as a Cake Addin
#tool nuget:?package=Sparkdo.Mediation&version=1.0.3
                    
Install as a Cake Tool

Sparkdo.Mediation 使用文档

简介

Sparkdo.Mediation 是 Sparkdo 生态中的高性能中介者(Mediator)模式实现,提供了完整的 CQRS(命令查询职责分离)消息调度基础设施。该库通过预编译的中间件管道实现 O(1) 类型查找,支持命令发送、查询流式响应、通知事件广播等核心消息传递模式。

作为现代 .NET 应用的核心消息调度层,Sparkdo.Mediation 内置了高性能的上下文对象池、可扩展的中间件管道、OpenTelemetry 诊断支持以及源码生成器驱动的零反射 DI 注册。无论是简单的单体应用还是复杂的微服务架构,该库都能提供一致、高效的开发体验。

核心特性

1. 完整的 CQRS 支持

  • 命令(Request):支持有返回值和无返回值两种命令模式
  • 查询(StreamRequest):支持 IAsyncEnumerable<T> 流式响应
  • 通知(Notification):支持事件广播,多处理器并行/串行执行

2. 高性能设计

  • O(1) 类型查找:基于 FrozenDictionary<Type, MediatorDelegate> 的预编译管道查找
  • 上下文对象池MediatorContext 使用 ObjectPool 避免频繁 GC 分配
  • PoolingAsyncValueTaskMethodBuilder:利用 ValueTask 池化减少异步状态机分配
  • 预编译中间件MediatorMiddlewareCompiler 将中间件链编译为单一委托

3. 可扩展中间件管道

  • 灵活的中间件注册(前置/后置/附加)
  • 内置诊断中间件(MediatorDiagnosticMiddleware
  • 支持自定义中间件扩展

4. OpenTelemetry 集成

  • 通过 SparkdoMediatorActivitySource 导出 Activity 追踪
  • SemanticConventions 提供标准消息语义约定属性
  • MediatorDiagnosticEventListener 支持自定义诊断监听

5. 源码生成器驱动

  • 编译时自动发现消息类型和处理器
  • 零反射 DI 注册,减少启动时间和内存占用
  • [MediatorModule][GenericHandler] 特性支持

6. 功能特性系统

  • IFeatureCollection 提供中间件工厂可访问的功能特性容器
  • 支持对象池化特性的 PooledFeatureCollection

7. 多实例支持

  • 支持命名中介者实例(Keyed Services)
  • 每个实例独立配置中间件、处理器和服务

项目信息

  • 包名称: Sparkdo.Mediation
  • 依赖: Sparkdo.Mediation.Abstractions, Microsoft.Extensions.DependencyInjection, Microsoft.Extensions.ObjectPool, Microsoft.Extensions.Options
  • 标签: mediator, cqrs, middleware, pipeline, sparkdo

安装

Package Manager

Install-Package Sparkdo.Mediation

.NET CLI

dotnet add package Sparkdo.Mediation

PackageReference

<PackageReference Include="Sparkdo.Mediation" Version="1.0.0" />

快速入门

1. 定义消息和处理模型

using Sparkdo.Mediation;

// 命令 — 无返回值
public record CreateUserCommand(string Name, string Email) : IRequest;

// 命令 — 带返回值
public record GetUserQuery(int UserId) : IRequest<UserDto>;

// 流式查询
public record ExportDataQuery(string Filter) : IStreamRequest<DataRow>;

// 通知
public record UserCreatedEvent(int UserId, string Name) : INotification;

2. 实现处理器

using Sparkdo.Mediation;

// 命令处理器
public class CreateUserHandler : IRequestHandler<CreateUserCommand>
{
    private readonly AppDbContext _db;

    public CreateUserHandler(AppDbContext db) => _db = db;

    public async ValueTask HandleAsync(CreateUserCommand request, CancellationToken ct)
    {
        _db.Users.Add(new User { Name = request.Name, Email = request.Email });
        await _db.SaveChangesAsync(ct);
    }
}

// 查询处理器
public class GetUserHandler : IRequestHandler<GetUserQuery, UserDto>
{
    private readonly AppDbContext _db;

    public GetUserHandler(AppDbContext db) => _db = db;

    public async ValueTask<UserDto> HandleAsync(GetUserQuery request, CancellationToken ct)
    {
        var user = await _db.Users.FindAsync([request.UserId], ct);
        return user is null ? null! : new UserDto(user.Id, user.Name, user.Email);
    }
}

// 流式查询处理器
public class ExportDataHandler : IStreamRequestHandler<ExportDataQuery, DataRow>
{
    public async IAsyncEnumerable<DataRow> HandleAsync(
        ExportDataQuery request, [EnumeratorCancellation] CancellationToken ct)
    {
        await foreach (var row in QueryData(request.Filter, ct))
            yield return row;
    }
}

// 通知处理器
public class UserCreatedEmailHandler : INotificationHandler<UserCreatedEvent>
{
    public async ValueTask HandleAsync(UserCreatedEvent notification, CancellationToken ct)
    {
        await SendWelcomeEmail(notification.Name, notification.Email);
    }
}

3. 注册服务

// 在 Program.cs 或 Startup 中注册
builder.Services.AddMediator(mediator =>
{
    mediator.AddHandler<CreateUserHandler>();
    mediator.AddHandler<GetUserHandler>();
    mediator.AddHandler<ExportDataHandler>();
    mediator.AddHandler<UserCreatedEmailHandler>();
});

4. 使用 Mediator

public class UserController
{
    private readonly IMediator _mediator;

    public UserController(IMediator mediator) => _mediator = mediator;

    // 发送无返回值命令
    public async Task CreateUser(string name, string email)
    {
        await _mediator.SendAsync(new CreateUserCommand(name, email));
    }

    // 发送带返回值命令
    public async Task<UserDto> GetUser(int id)
    {
        return await _mediator.SendAsync<UserDto>(new GetUserQuery(id));
    }

    // 流式查询
    public async IAsyncEnumerable<DataRow> ExportData(string filter)
    {
        await foreach (var row in _mediator.CreateStreamAsync(new ExportDataQuery(filter)))
            yield return row;
    }

    // 发布通知
    public async Task NotifyUserCreated(int userId, string name)
    {
        await _mediator.PublishAsync(new UserCreatedEvent(userId, name));
    }
}

核心功能

动态消息发送

// 运行时类型动态发送 — 在管道编译时确定的场景下使用
object message = GetMessageFromQueue();
var result = await ((ISender)mediator).SendAsync(message);

管道中间件

// 自定义中间件
public class LoggingMiddleware
{
    public static MediatorMiddlewareConfiguration Create()
        => MediatorMiddlewareConfiguration.Create(
            "Logging",
            (ctx, next) =>
            {
                Console.WriteLine($"处理消息: {ctx.MessageType.Name}");
                return next(ctx);
            });
}

// 注册中间件
builder.Services.AddMediator(mediator =>
{
    mediator.Use(LoggingMiddleware.Create());

    // 在特定中间件之前插入
    mediator.Use(AuditMiddleware.Create(), before: "Logging");

    // 在特定中间件之后插入
    mediator.Use(MetricsMiddleware.Create(), after: "Diagnostics");
});

通知发布模式

// 顺序模式(默认)— 处理程序依次执行
builder.Services.AddMediator(mediator =>
{
    mediator.ConfigureOptions(options =>
    {
        options.NotificationPublishMode = NotificationPublishMode.Sequential;
    });
});

// 并发模式 — 处理程序并行执行
builder.Services.AddMediator(mediator =>
{
    mediator.ConfigureOptions(options =>
    {
        options.NotificationPublishMode = NotificationPublishMode.Concurrent;
    });
});

处理器生命周期管理

builder.Services.AddMediator(mediator =>
{
    // 默认 Scoped 生命周期
    mediator.ConfigureOptions(options =>
    {
        options.ServiceLifetime = ServiceLifetime.Transient;
    });
});

实际应用示例

ASP.NET Core 完整集成

// Program.cs
var builder = WebApplication.CreateBuilder(args);

// 注册 Mediator
builder.Services.AddMediator(mediator =>
{
    // 注册所有处理器(也可使用源码生成器自动注册)
    mediator.AddHandler<CreateOrderHandler>();
    mediator.AddHandler<GetOrderHandler>();
    mediator.AddHandler<OrderCreatedNotificationHandler>();

    // 添加诊断中间件
    mediator.Use(MediatorDiagnosticMiddleware.Create());

    // 配置 OpenTelemetry 监听
    mediator.ConfigureServices((sp, services) =>
    {
        services.AddSingleton<IMediatorDiagnosticEventListener>(
            new ActivityMediatorDiagnosticListener(sp.GetRequiredService<SparkdoMediatorActivitySource>()));
    });
});

var app = builder.Build();
app.MapPost("/orders", async (CreateOrderCommand cmd, IMediator mediator) =>
{
    await mediator.SendAsync(cmd);
    return Results.Created();
});

命令管道模式(Pipeline Behavior)

public class ValidationMiddleware<TRequest, TResponse> : IRequestHandler<TRequest, TResponse>
    where TRequest : IRequest<TResponse>
{
    private readonly IRequestHandler<TRequest, TResponse> _inner;
    private readonly IValidator<TRequest> _validator;

    public ValidationMiddleware(
        IRequestHandler<TRequest, TResponse> inner,
        IValidator<TRequest> validator)
    {
        _inner = inner;
        _validator = validator;
    }

    public async ValueTask<TResponse> HandleAsync(TRequest request, CancellationToken ct)
    {
        var result = await _validator.ValidateAsync(request, ct);
        if (!result.IsValid)
            throw new ValidationException(result.Errors);

        return await _inner.HandleAsync(request, ct);
    }
}

多实例 Mediator

// 注册多个命名中介者实例
builder.Services.AddMediator("Commands", mediator =>
{
    mediator.AddHandler<CreateUserHandler>();
});

builder.Services.AddMediator("Queries", mediator =>
{
    mediator.AddHandler<GetUserHandler>();
});

// 使用命名实例
public class CommandService([FromKeyedServices("Commands")] IMediator mediator)
{
    public async Task Execute(IRequest command) => await mediator.SendAsync(command);
}

API 参考

MediatorServiceCollectionExtensions

方法 说明
AddMediator() 添加默认(未命名)的中介者基础设施,返回 IMediatorHostBuilder
AddMediator(string name) 添加命名的中介者基础设施,返回 IMediatorHostBuilder

IMediatorHostBuilder

属性/方法 说明
Name 获取此中介者实例的名称
Services 用于注册处理器服务的 IServiceCollection
AddHandler<THandler>(Action<IMediatorHandlerDescriptor>?) 注册处理器类型
ConfigureOptions(Action<MediatorOptions>) 配置中介者选项
Use(MediatorMiddlewareConfiguration, string? before, string? after) 注册中间件

MediatorOptions

属性 默认值 说明
ServiceLifetime Scoped 处理器的默认服务生命周期
NotificationPublishMode Sequential 通知处理器的调度方式

NotificationPublishMode

说明
Sequential 处理器按顺序调用,前一个完成后才执行下一个
Concurrent 处理器通过 Task.WhenAll 并发调用

MediatorHandlerKind

说明
Request 无返回值的请求处理器
RequestResponse 带返回值的请求处理器
Stream 流式查询处理器
Notification 通知事件处理器

MediatorContext

每次消息调度的执行上下文:

属性/方法 说明
ServiceProvider 当前作用域的 IServiceProvider
Message 当前正在处理的消息实例
MessageType 消息的类型
ResponseType 预期的响应类型
CancellationToken 取消令牌
RequestServices 请求级别服务的快捷访问
Result 处理结果
Pools 对象池访问器

IMediatorPools

对象池访问接口,用于租用和归还 MediatorContext

方法 说明
RentContext() 从对象池租用一个上下文
ReturnContext(MediatorContext) 将上下文归还到对象池

诊断与可观测性

活动源

// 创建诊断活动源
var activitySource = new SparkdoMediatorActivitySource("MyApp.Mediator");

诊断事件监听器

public class MyDiagnosticListener : IMediatorDiagnosticEventListener
{
    public ValueTask BeforeHandler(MediatorContext context) { /* ... */ return default; }
    public ValueTask AfterHandler(MediatorContext context) { /* ... */ return default; }
    public ValueTask OnError(MediatorContext context, Exception ex) { /* ... */ return default; }
}

OpenTelemetry 约定属性

属性 说明
MessagingSystem Sparkdo.mediator 消息系统标识
OperationTypeSend send 发送操作
OperationTypePublish publish 发布操作

架构说明

IMediator (Mediator)
    ├── SendAsync → ISender
    │   ├── IRequest (void)
    │   ├── IRequest<T> (with response)
    │   └── IStreamRequest<T> (async stream)
    └── PublishAsync → IPublisher
        └── INotification (event)
MediatorRuntime (O(1) FrozenDictionary lookup)
    ├── MediatorDelegate[] (compiled pipeline for each message type)
    ├── MediatorContext pool (ObjectPool<MediatorContext>)
    └── MediatorMiddlewareCompiler (pipeline compilation)

测试

该项目通过 Sparkdo.Mediation.Tests 进行单元和集成测试:

dotnet test test/Sparkdo.Mediation.Tests/

测试覆盖:

  • 命令调度(MediatorDispatchTests
  • 处理器注册(AddHandlerTests
  • 上下文对象池(ContextPoolingTests
  • 中间件管道(MiddlewarePipelineTests
Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on Sparkdo.Mediation:

Package Downloads
Sparkdo.Mediation.Http

Sparkdo 中介者模式 HTTP 扩展,将 CQRS 消息调度能力延伸到 HTTP 调用场景

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.0.4-preview.3 80 6/8/2026
1.0.4-preview.2 73 6/7/2026
1.0.4-preview.1 79 6/7/2026
1.0.3 168 6/6/2026
1.0.3-preview.1 126 5/17/2026