MicroPlumberd.CommandBus.Abstractions 1.0.11.54

There is a newer version of this package available.
See the version list below for details.
dotnet add package MicroPlumberd.CommandBus.Abstractions --version 1.0.11.54
                    
NuGet\Install-Package MicroPlumberd.CommandBus.Abstractions -Version 1.0.11.54
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="MicroPlumberd.CommandBus.Abstractions" Version="1.0.11.54" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="MicroPlumberd.CommandBus.Abstractions" Version="1.0.11.54" />
                    
Directory.Packages.props
<PackageReference Include="MicroPlumberd.CommandBus.Abstractions" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add MicroPlumberd.CommandBus.Abstractions --version 1.0.11.54
                    
#r "nuget: MicroPlumberd.CommandBus.Abstractions, 1.0.11.54"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package MicroPlumberd.CommandBus.Abstractions@1.0.11.54
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=MicroPlumberd.CommandBus.Abstractions&version=1.0.11.54
                    
Install as a Cake Addin
#tool nuget:?package=MicroPlumberd.CommandBus.Abstractions&version=1.0.11.54
                    
Install as a Cake Tool

micro-plumberd

Micro library for EventStore, CQRS and EventSourcing Just eXtreamly simple.

Documentation can be found here: MicroPlumberd Documentation

Getting started

Install nugets:

# For your domain
dotnet add package MicroPlumberd
dotnet add package MicroPlumberd.SourceGenerators

If you'd like to use direct dotnet-dotnet communication to execute command-handlers install MicroPlumberd.DirectConnect

# For application-layer using EventStore as message-bus. 
dotnet add package MicroPlumberd.Services

# For application-layer communicating (dotnet-2-dotnet) using GRPC:
dotnet add package MicroPlumberd.Services.Grpc.DirectConnect

Configure plumber

/// change to your connection-string.
string connectionString = $"esdb://admin:changeit@localhost:2113?tls=false&tlsVerifyCert=false";
var settings = EventStoreClientSettings.Create(connectionString);
var plumber = Plumber.Create(settings);

Aggregates

  1. Write an aggregate.
[Aggregate]
public partial class FooAggregate(Guid id) : AggregateBase<FooAggregate.FooState>(id)
{
    internal new FooState State => base.State;
    public record FooState { public string Name { get; set; } };
    private static FooState Given(FooState state, FooCreated ev) => state with { Name = ev.Name };
    private static FooState Given(FooState state, FooUpdated ev) => state with { Name =ev.Name };
    public void Open(string msg) => AppendPendingChange(new FooCreated() { Name = msg });
    public void Change(string msg) => AppendPendingChange(new FooUpdated() { Name = msg });
}
// And events:
public record FooCreated { public string? Name { get; set; } }
public record FooUpdated { public string? Name { get; set; } }

Comments:

  • State is encapsulated in nested class FooState.
  • Given methods, that are used when loading aggregate from the EventStoreDB are private and static. State is encouraged to be immutable.
  • [Aggregate] attribute is used by SourceGenerator that will generate dispatching code and handy metadata.
  1. Consume an aggregate.

If you want to create a new aggregate and save it to EventStoreDB:


FooAggregate aggregate = FooAggregate.New(Guid.NewGuid());
aggregate.Open("Hello");

await plumber.SaveNew(aggregate);

If you want to load aggregate from EventStoreDB, change it and save back to EventStoreDB

var aggregate = await plumber.Get<FooAggregate>("YOUR_ID");
aggregate.Change("World");
await plumber.SaveChanges(aggregate);

Write a read-model/processor

  1. Read-Models
[EventHandler]
public partial class FooModel
{
    private async Task Given(Metadata m, FooCreated ev)
    {
        // your code
    }
    private async Task Given(Metadata m, FooUpdated ev)
    {
         // your code
    }
}

Comments:

  • ReadModels have private async Given methods. Since they are async, you can invoke SQL here, or othere APIs to store your model.
  • Metadata contains standard stuff (Created, CorrelationId, CausationId), but can be reconfigured.
var fooModel = new FooModel();
var sub= await plumber.SubscribeModel(fooModel);

// or if you want to persist progress of your subscription
var sub2= await plumber.SubscribeModelPersistently(fooModel);

With SubscribeModel you can subscribe from start, from certain moment or from the end of the stream.

  1. Processors
[EventHandler]
public partial class FooProcessor(IPlumber plumber)
{
    private async Task Given(Metadata m, FooUpdated ev)
    {
        var agg = FooAggregate.New(Guid.NewGuid());
        agg.Open(ev.Name + " new");
        await plumber.SaveNew(agg);
    }
}

Implementing a processor is technically the same as implementing a read-model, but inside the Given method you would typically invoke a command or execute an aggregate.

Features

Conventions

  • SteamNameConvention - from aggregate type, and aggregate id
  • EventNameConvention - from aggregate? instance and event instance
  • MetadataConvention - to enrich event with metadata based on aggregate instance and event instance
  • EventIdConvention - from aggregate instance and event instance
  • OutputStreamModelConvention - for output stream name from model-type
  • GroupNameModelConvention - for group name from model-type

Ultra development cycle for Read-Models (EF example).

Imagine this:

  1. You create a read-model that subscribes persistently.
  2. You subscribe it with plumber.
  3. You changed something in the event and want to see the new model.
  4. Instead of re-creating old read-model, you can easily create new one. Just change MODEL_VER to reflect new version.

Please note that Sql schema create/drop auto-generation script will be covered in a different article. (For now we leave it for developers.)

Comments:

  • By creating a new read-model you can always compare the differences with the previous one.
  • You can leverage canary-deployment strategy and have 2 versions of your system running in parallel.
[OutputStream(FooModel.MODEL_NAME)]
[EventHandler]
public partial class FooModel : DbContext
{
    internal const string MODEL_VER = "_v1";
    internal const string MODEL_NAME = $"FooModel{MODEL_VER}";
    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        modelBuilder
           .Entity<FooEntity>()
           .ToTable($"FooEntities{MODEL_VER}");
    }
    private async Task Given(Metadata m, FooCreated ev)
    {
        // your code
    }
    private async Task Given(Metadata m, FooUpdated ev)
    {
        // your code
    }
}

Subscription Sets - Models ultra-composition

  • You can easily create a stream that joins events together by event-type, and subscribe many read-models at once. Here it is named 'MasterStream', which is created out of events used to create DimentionLookupModel and MasterModel.
  • In this way, you can easily manage the composition and decoupling of read-models. You can nicely composite your read-models. And if you don't wish to decouple read-models, you can reuse your existing one.
/// Given simple models, where master-model has foreign-key used to obtain value from dimentionLookupModel

var dimentionTable = new DimentionLookupModel();
var factTable = new MasterModel(dimentionTable);

await plumber.SubscribeSet()
    .With(dimentionTable)
    .With(factTable)
    .SubscribeAsync("MasterStream", FromStream.Start);

EventStoreDB as message-bus

If you want to start as quickly as possible, you can start with EventStoreDB as command-message-bus.


services.AddPlumberd()
        .AddCommandHandler<FooCommandHandler>()

// on the client side:
ICommandBus bus; // from DI
bus.SendAsync(Guid.NewGuid(), new CreateFoo() { Name = "Hello" });

GRPC Direct communication

If you prefer direct communication (like REST-API, but without the hassle for contract generation/etc.) you can use direct communication where client invokes command handle using grpc. Command is not stored in EventStore.

/// Let's configure server:
services.AddCommandHandler<FooCommandHandler>().AddServerDirectConnect();

/// Add mapping to direct-connect service
app.MapDirectConnect();

Here is an example of a command handler code:

[CommandHandler]
public partial class FooCommandHandler(IPlumber plumber)
{

    [ThrowsFaultException<BusinessFault>]
    public async Task Handle(Guid id, CreateFoo cmd)
    {
        if (cmd.Name == "error")
            throw new BusinessFaultException("Foo");

        var agg = FooAggregate.New(id);
        agg.Open(cmd.Name);

        await plumber.SaveNew(agg);
    }

    [ThrowsFaultException<BusinessFault>]
    public async Task<HandlerOperationStatus> Handle(Guid id, ChangeFoo cmd)
    {
        if (cmd.Name == "error")
            throw new BusinessFaultException("Foo");

        var agg = await plumber.Get<FooAggregate>(id);
        agg.Change(cmd.Name);

        await plumber.SaveChanges(agg);
        return HandlerOperationStatus.Ok();
    }
}

And how on the client side:

service.AddClientDirectConnect().AddCommandInvokers();

// And invocation
 var clientPool = sp.GetRequiredService<IRequestInvokerPool>();
 var invoker = clientPool.Get("YOUR_GRPC_URL");
 await invoker.Execute(Guid.NewId(), new CreateFoo(){});

Aspects

You can easily inject aspects through decorator pattern.

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (3)

Showing the top 3 NuGet packages that depend on MicroPlumberd.CommandBus.Abstractions:

Package Downloads
MicroPlumberd.Services

CQRS/EventSourcing made eXtreamly simple. Application-Layer: Command-Handlers, Command-Bus

MicroPlumberd.ProcessManager.Abstractions

Package Description

MicroPlumberd.Services.Cron

CQRS/EventSourcing made eXtreamly simple. Application-Layer: Command-Handlers, Command-Bus

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.0.120.153 33 9/16/2025
1.0.118.151 214 9/5/2025
1.0.117.151 122 9/5/2025
1.0.115.150 127 9/5/2025
1.0.114.150 273 3/21/2025
1.0.113.149 160 3/21/2025
1.0.112.149 189 3/20/2025
1.0.111.147 189 3/19/2025
1.0.107.137 144 2/15/2025
1.0.106.136 155 2/14/2025
1.0.105.136 148 2/14/2025
1.0.104.136 140 2/14/2025
1.0.103.135 147 2/14/2025
1.0.102.135 145 2/14/2025
1.0.101.135 165 2/13/2025
1.0.100.134 143 2/12/2025
1.0.99.133 147 2/12/2025
1.0.98.133 147 2/12/2025
1.0.97.132 141 2/12/2025
1.0.96.132 137 2/12/2025
1.0.95.132 139 2/11/2025
1.0.92.130 148 2/11/2025
1.0.91.129 140 2/11/2025
1.0.90.127 133 2/11/2025
1.0.87.127 144 2/3/2025
1.0.86.127 128 2/3/2025
1.0.85.127 139 2/3/2025
1.0.83.127 133 2/3/2025
1.0.81.127 127 1/29/2025
1.0.80.127 122 1/29/2025
1.0.79.127 128 1/28/2025
1.0.78.125 130 1/22/2025
1.0.77.125 163 11/26/2024
1.0.76.125 157 10/12/2024
1.0.75.125 164 10/12/2024
1.0.74.125 141 10/12/2024
1.0.73.124 137 10/12/2024
1.0.72.122 233 6/9/2024
1.0.71.121 165 6/7/2024
1.0.70.121 186 6/6/2024
1.0.69.119 184 5/15/2024
1.0.68.118 181 5/15/2024
1.0.67.118 156 5/15/2024
1.0.66.118 159 5/15/2024
1.0.65.117 162 5/15/2024
1.0.64.116 176 5/14/2024
1.0.63.115 169 5/11/2024
1.0.62.114 178 5/11/2024
1.0.61.113 157 5/11/2024
1.0.60.112 191 5/8/2024
1.0.58.112 183 5/8/2024
1.0.57.111 269 4/26/2024
1.0.55.111 198 4/23/2024
1.0.54.110 192 4/23/2024
1.0.53.110 177 4/23/2024
1.0.51.109 174 4/22/2024
1.0.50.109 170 4/22/2024
1.0.49.108 174 4/22/2024
1.0.48.108 174 4/20/2024
1.0.46.107 184 4/20/2024
1.0.45.106 180 4/20/2024
1.0.44.106 173 4/20/2024
1.0.43.100 160 4/17/2024
1.0.41.97 181 4/11/2024
1.0.40.95 166 4/11/2024
1.0.39.94 181 4/11/2024
1.0.37.94 164 4/9/2024
1.0.36.93 174 4/9/2024
1.0.35.90 170 4/8/2024
1.0.34.87 171 4/8/2024
1.0.29.85 167 4/7/2024
1.0.26.83 157 4/7/2024
1.0.20.72 187 3/24/2024
1.0.19.71 172 3/24/2024
1.0.17.71 188 3/24/2024
1.0.16.71 182 3/23/2024
1.0.15.70 160 3/23/2024
1.0.14.57 191 3/21/2024
1.0.13.56 176 3/21/2024
1.0.11.54 187 3/21/2024