Merq 3.0.0-alpha

Prefix Reserved
This is a prerelease version of Merq.
dotnet add package Merq --version 3.0.0-alpha
                    
NuGet\Install-Package Merq -Version 3.0.0-alpha
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Merq" Version="3.0.0-alpha" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Merq" Version="3.0.0-alpha" />
                    
Directory.Packages.props
<PackageReference Include="Merq" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Merq --version 3.0.0-alpha
                    
#r "nuget: Merq, 3.0.0-alpha"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#addin nuget:?package=Merq&version=3.0.0-alpha&prerelease
                    
Install Merq as a Cake Addin
#tool nuget:?package=Merq&version=3.0.0-alpha&prerelease
                    
Install Merq as a Cake Tool

Mercury: messenger of the Roman gods

Mercury > Merq-ry > Merq

Merq brings the Message Bus pattern together with a command-oriented interface for an extensible and decoupled in-process application architecture.

These patterns are well established in microservices and service oriented architectures, but their benefits can be applied to apps too, especially extensible ones where multiple teams can contribute extensions which are composed at run-time.

This is also commonly used as a Mediator pattern, such as in MediatR and others libraries.

The resulting improved decoupling between components makes it easier to evolve them independently, while improving discoverability of available commands and events. You can see this approach applied in the real world in VSCode commands and various events such as window events. Clearly, in the case of VSCode, everything is in-process, but the benefits of a clean and predictable API are pretty obvious.

Merq provides the same capabilities for .NET apps.

Events

Events can be any type, there is no restriction or interfaces you must implement. Nowadays, C# record types are a perfect fit for event data types. An example event could be a one-liner such as:

public record ItemShipped(string Id, DateTimeOffset Date);

The events-based API surface on the message bus is simple enough:

public interface IMessageBus
{
    void Notify<TEvent>(TEvent e);
    IObservable<TEvent> Observe<TEvent>();
}

By relying on IObservable<TEvent>, Merq integrates seamlessly with more powerful event-driven handling via System.Reactive or the more lightweight RxFree. Subscribing to events with either of those packages is trivial:

IDisposable subscription;

// constructor may use DI to get the dependency
public CustomerViewModel(IMessageBus bus)
{
    subscription = bus.Observe<ItemShipped>().Subscribe(OnItemShipped);
}

void OnItemShipped(ItemShipped e) => // Refresh item status

public void Dispose() => subscription.Dispose();

In addition to event producers just invoking Notify, they can also be implemented as IObservable<TEvent> directly, which is useful when the producer is itself an observable sequence.

Both features integrate seamlessly and leverage all the power of Reactive Extensions.

Commands

Commands can also be any type, and C# records make for concise definitions:

record CancelOrder(string OrderId) : IAsyncCommand;

Unlike events, command messages need to signal the invocation style they require for execution:

Scenario Interface Invocation
void synchronous command ICommand IMessageBus.Execute(command)
value-returning synchronous command ICommand<TResult> var result = await IMessageBus.Execute(command)
void asynchronous command IAsyncCommand await IMessageBus.ExecuteAsync(command)
value-returning asynchronous command IAsyncCommand<TResult> var result = await IMessageBus.ExecuteAsync(command)
async stream command IStreamCommand<TResult> await foreach(var item in IMessageBus.ExecuteStream(command))

The sample command shown before can be executed using the following code:

// perhaps a method invoked when a user 
// clicks/taps a Cancel button next to an order
async Task OnCancel(string orderId)
{
    await bus.ExecuteAsync(new CancelOrder(orderId), CancellationToken.None);
    // refresh UI for new state.
}

An example of a synchronous command could be:

// Command declaration
record SignOut() : ICommand;

// Command invocation
void OnSignOut() => bus.Execute(new SignOut());

// or alternatively, for void commands that have no additional data:
void OnSignOut() => bus.Execute<SignOut>();

The marker interfaces on the command messages drive the compiler to only allow the right invocation style on the message bus, as defined by the command author:

public interface IMessageBus
{
    // sync void
    void Execute(ICommand command);
    // sync value-returning
    TResult Execute<TResult>(ICommand<TResult> command);
    // async void
    Task ExecuteAsync(IAsyncCommand command, CancellationToken cancellation);
    // async value-returning
    Task<TResult> ExecuteAsync<TResult>(IAsyncCommand<TResult> command, CancellationToken cancellation);
    // async stream
    IAsyncEnumerable<TResult> ExecuteStream<TResult>(IStreamCommand<TResult> command, CancellationToken cancellation);
}

For example, to create a value-returning async command that retrieves some value, you would have:

record FindDocuments(string Filter) : IAsyncCommand<IEnumerable<string>>;

class FindDocumentsHandler : IAsyncCommandHandler<FindDocument, IEnumerable<string>>
{
    public bool CanExecute(FindDocument command) => !string.IsNullOrEmpty(command.Filter);
    
    public Task<IEnumerable<string>> ExecuteAsync(FindDocument command, CancellationToken cancellation)
        => // evaluate command.Filter across all documents and return matches
}

In order to execute such command, the only execute method the compiler will allow is:

IEnumerable<string> files = await bus.ExecuteAsync(new FindDocuments("*.json"));

If the consumer tries to use Execute, the compiler will complain that the command does not implement ICommand<TResult>, which is the synchronous version of the marker interface.

While these marker interfaces on the command messages might seem unnecessary, they are actually quite important. They solve a key problem that execution abstractions face: whether a command execution is synchronous or asynchronous (as well as void or value-returning) should not be abstracted away since otherwise you can end up in two common anti-patterns (i.e. async guidelines for ASP.NET), known as sync over async and async over sync.

Likewise, mistakes cannot be made when implementing the handler, since the handler interfaces define constraints on what the commands must implement:

// sync
public interface ICommandHandler<in TCommand> : ... where TCommand : ICommand;
public interface ICommandHandler<in TCommand, out TResult> : ... where TCommand : ICommand<TResult>;

// async
public interface IAsyncCommandHandler<in TCommand> : ... where TCommand : IAsyncCommand;
public interface IAsyncCommandHandler<in TCommand, TResult> : ... where TCommand : IAsyncCommand<TResult>

// async stream
public interface IStreamCommandHandler<in TCommand, out TResult>: ... where TCommand : IStreamCommand<TResult>

This design choice also makes it impossible to end up executing a command implementation improperly.

In addition to execution, the IMessageBus also provides a mechanism to determine if a command has a registered handler at all via the CanHandle<T> method as well as a validation mechanism via CanExecute<T>, as shown above in the FindDocumentsHandler example.

Commands can notify new events, and event observers/subscribers can in turn execute commands.

Async Streams

For .NET6+ apps, Merq also supports async streams as a command invocation style. This is useful for scenarios where the command execution produces a potentially large number of results, and the consumer wants to process them as they are produced, rather than waiting for the entire sequence to be produced.

For example, the filter documents command above could be implemented as an async stream command instead:

record FindDocuments(string Filter) : IStreamCommand<string>;

class FindDocumentsHandler : IStreamCommandHandler<FindDocument, string>
{
    public bool CanExecute(FindDocument command) => !string.IsNullOrEmpty(command.Filter);
    
    public async IAsyncEnumerable<string> ExecuteAsync(FindDocument command, [EnumeratorCancellation] CancellationToken cancellation)
    {
        await foreach (var file in FindFilesAsync(command.Filter, cancellation))
            yield return file;
    }
}

In order to execute such command, the only execute method the compiler will allow is:

await foreach (var file in bus.ExecuteStream(new FindDocuments("*.json")))
    Console.WriteLine(file);

Analyzers and Code Fixes

Beyond the compiler complaining, Merq also provides a set of analyzers and code fixes to learn the patterns and avoid common mistakes. For example, if you created a simple record to use as a command, such as:

public record Echo(string Message);

And then tried to implement a command handler for it:

public class EchoHandler : ICommandHandler<Echo>
{
}

the compiler would immediately complain about various contraints and interfaces that aren't satisfied due to the requirements on the Echo type itself. For a seasoned Merq developer, this is a no-brainer, but for new developers, it can be a bit puzzling:

compiler warnings screenshot

A code fix is provided to automatically implement the required interfaces in this case:

code fix to implement ICommand screenshot

Likewise, if a consumer attempted to invoke the above Echo command asynchronously (known as the async over sync anti-pattern), they would get a somewhat unintuitive compiler error:

error executing sync command as async

But the second error is more helpful, since it points to the actual problem, and a code fix can be applied to resolve it:

code fix for executing sync command as async

The same analyzers and code fixes are provided for the opposite anti-pattern, known as sync over async, where a synchronous command is executed asynchronously.

Hosting

The default implementation of the message bus interface IMessageBus has no external dependencies and can be instantiated via the MessageBus constructor directly by an application host.

The bus locates command handlers and event producers via the passed-in IServiceProvider instance in the constructor:

var bus = new MessageBus(serviceProvider);

// execute a command
bus.Execute(new MyCommand());

// observe an event from the bus
bus.Observe<MyEvent>().Subscribe(e => Console.WriteLine(e.Message));

Merq integrates out of the box with dependency injection for .NET, making it straightforward to properly register the bus and all command handlers and event producers.

var builder = WebApplication.CreateBuilder(args);
...
builder.Services.AddMessageBus();

All command handlers and event producers need to be registered with the services collection as usual, using the main interface for the component, such as ICommandHandler<T> and IObservable<TEvent>. In addition, if you use the IMessageBus.CanExecute<T> method, handlers need to also be registered with the ICanExecute<T> interface.

NOTE: Merq makes no assumptions about the lifetime of the registered components, so it's up to the consumer to register them with the desired lifetime.

To drastically simplify registration of handlers and producers, we recommend the Devlooped.Extensions.DependencyInjection. package, which provides a simple attribute-based mechanism for automatically emitting at compile-time the required service registrations for all types marked with the provided [Service] attribute, which also allows setting the component lifetime, such as [Service(ServiceLifetime.Transient)] (default lifetime is ServiceLifetime.Singleton for this source generator-based package).

This allows to simply mark all command handlers and event producers as [Service] and then register them all with a single line of code:

builder.Services.AddServices();

This package emits all registrations at compile-time using source generators, so run-time performance is not affected at all.

Telemetry and Monitoring

The core implementation of the IMessageBus is instrumented with ActivitySource and Metric, providing out of the box support for Open Telemetry-based monitoring, as well as via dotnet trace and dotnet counters.

To export telemetry using Open Telemetry, for example:

using var tracer = Sdk
    .CreateTracerProviderBuilder()
    .SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("ConsoleApp"))
    .AddSource(source.Name)
    .AddSource("Merq")
    .AddConsoleExporter()
    .AddZipkinExporter()
    .AddAzureMonitorTraceExporter(o => o.ConnectionString = config["AppInsights"])
    .Build();

Collecting traces via dotnet-trace:

dotnet trace collect --name [PROCESS_NAME] --providers="Microsoft-Diagnostics-DiagnosticSource:::FilterAndPayloadSpecs=[AS]Merq,System.Diagnostics.Metrics:::Metrics=Merq"

Monitoring metrics via dotnet-counters:

dotnet counters monitor --process-id [PROCESS_ID] --counters Merq

Example rendering from the included sample console app:

dotnet-counters screenshot

Performance

The performance of Merq is on par with the best implementations of the the same pattern, for example MediatR. Note it sacrifices (minimally) performance for fewer allocations, most notably for the synchronous command execution path:


BenchmarkDotNet v0.13.12, Windows 11 (10.0.22622.575)
Intel Core i9-10900T CPU 1.90GHz, 1 CPU, 20 logical and 10 physical cores
.NET SDK 9.0.100-preview.2.24074.1
  [Host]     : .NET 8.0.1 (8.0.123.58001), X64 RyuJIT AVX2
  DefaultJob : .NET 8.0.1 (8.0.123.58001), X64 RyuJIT AVX2


Method Mean Error StdDev Median Gen0 Allocated
PingMerq 303.8 ns 6.05 ns 15.84 ns 302.7 ns 0.0172 184 B
PingMerqAsync 294.7 ns 5.35 ns 5.95 ns 295.2 ns 0.0248 264 B
PingMediatR 166.8 ns 3.15 ns 6.99 ns 164.2 ns 0.0319 336 B

Abstractions

Version Downloads

The Merq.Abstractions contains just the interfaces for Merq for scenarios where messages are shared across multiple assemblies or defined separately from the main app host.

Duck Typing

Being able to loosely couple both events (and their consumers) and command execution (from their command handler implementations) is a key feature of Merq. To take this decoupling to the extreme, Merq allows a similar capability as allowed by the TypeScript/JavaScript in VSCode: you can just copy/paste an event/command definition as source into your assembly, and perform the regular operations with it (like Observe an event and Execute a command), in a "duck typing" manner.

As long as the types' full name match, the conversion will happen automatically. Since this functionality isn't required in many scenarios, and since there are a myriad ways to implement such an object mapping functionality, the Merq.Core package only provides the hooks to enable this, but does not provide any built-in implementation for it. In other words, no duck typing is performed by default.

The Merq.AutoMapper package provides one such implementation, based on the excelent AutoMapper library. It can be registered with the DI container as follows:

builder.Services.AddMessageBus<AutoMapperMessageBus>();
// register all services, including handlers and producers
builder.Services.AddServices();

Sponsors

Clarius Org MFB Technologies, Inc. Torutek DRIVE.NET, Inc. Keith Pickford Thomas Bolon Kori Francis Toni Wenzel Uno Platform Reuben Swartz Jacob Foshee alternate text is missing from this package README image Eric Johnson David JENNI Jonathan Charley Wu Ken Bonny Simon Cropp agileworks-eu sorahex Zheyu Shen Vezel ChilliCream 4OTC Vincent Limo Jordan S. Jones domischell Justin Wendlandt Adrian Alonso Michael Hagedorn Matt Frear

Sponsor this project  

Learn more about GitHub Sponsors

Product Compatible and additional computed target framework versions.
.NET net5.0 was computed.  net5.0-windows was computed.  net6.0 is compatible.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
.NET Core netcoreapp2.0 was computed.  netcoreapp2.1 was computed.  netcoreapp2.2 was computed.  netcoreapp3.0 was computed.  netcoreapp3.1 was computed. 
.NET Standard netstandard2.0 is compatible.  netstandard2.1 was computed. 
.NET Framework net461 was computed.  net462 was computed.  net463 was computed.  net47 was computed.  net471 was computed.  net472 was computed.  net48 was computed.  net481 was computed. 
MonoAndroid monoandroid was computed. 
MonoMac monomac was computed. 
MonoTouch monotouch was computed. 
Tizen tizen40 was computed.  tizen60 was computed. 
Xamarin.iOS xamarinios was computed. 
Xamarin.Mac xamarinmac was computed. 
Xamarin.TVOS xamarintvos was computed. 
Xamarin.WatchOS xamarinwatchos was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (6)

Showing the top 5 NuGet packages that depend on Merq:

Package Downloads
Merq.Core

Merq: Default Message Bus (Commands + Events) Implementation, for internal application architecture via command and event messages. Only the main application assembly needs to reference this package. Components and extensions can simply reference the interfaces in Merq.

Clide.Installer

Clide Installer: VSIX, MSI and EXE (chained) installer integration.

Merq.VisualStudio

Merq MEF components suitable for hosting with Microsoft.VisualStudio.Composition.

Merq.DependencyInjection

Merq: Microsoft Dependency Injection support with automatic IMessageBus registration via AddMessageBus.

Merq.AutoMapper

A specialized Message Bus that allows cross observing and executing of events and commands from structurally compatible types even if they are from disparate assemblies, as long as their full name is the same, otherwise known as "duck typing".

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
3.0.0-alpha 315 6/17/2025
2.0.0 2,307 1/29/2024
2.0.0-rc.6 925 1/29/2024
2.0.0-rc.5 76 1/27/2024
2.0.0-rc.3 698 7/10/2023
2.0.0-rc.2 120 7/10/2023
2.0.0-rc.1 119 7/7/2023
2.0.0-beta.4 120 7/6/2023
2.0.0-beta.3 293 11/19/2022
2.0.0-beta.2 329 11/18/2022
2.0.0-beta 1,180 11/16/2022
2.0.0-alpha 1,327 11/16/2022
1.5.0 1,793 2/16/2023
1.3.0 1,865 7/28/2022
1.2.0-beta 1,266 7/20/2022
1.2.0-alpha 1,875 7/16/2022
1.1.4 4,151 5/6/2020
1.1.1 27,093 6/15/2018
1.1.0 13,827 6/15/2018
1.0.0 8,781 4/30/2018