Finaps.EventSourcing.Cosmos
0.3.0
See the version list below for details.
dotnet add package Finaps.EventSourcing.Cosmos --version 0.3.0
NuGet\Install-Package Finaps.EventSourcing.Cosmos -Version 0.3.0
<PackageReference Include="Finaps.EventSourcing.Cosmos" Version="0.3.0" />
paket add Finaps.EventSourcing.Cosmos --version 0.3.0
#r "nuget: Finaps.EventSourcing.Cosmos, 0.3.0"
// Install Finaps.EventSourcing.Cosmos as a Cake Addin #addin nuget:?package=Finaps.EventSourcing.Cosmos&version=0.3.0 // Install Finaps.EventSourcing.Cosmos as a Cake Tool #tool nuget:?package=Finaps.EventSourcing.Cosmos&version=0.3.0
Finaps.EventSourcing
Event Sourcing for .NET 6!
This repository is WIP. Breaking API changes are likely to occur before version 1.0.0.
Finaps.EventSourcing
is an implementation of the Event Sourcing Pattern in .NET 6
with a focus on Validity, Clarity & Performance.
Finaps.EventSourcing
supports SQL Server, Postgres and Azure Cosmos DB databases.
All Finaps.EventSourcing packages are available under the Apache Licence 2.0.
Table of Contents
Installation
Entity Framework Core
Alongside CosmosDB, support for relational databases is provided using Entity Framework Core.
Through EF Core, Finaps.EventSourcing
supports SQL Server & PostgreSQL databases.
NuGet Packages
Finaps.EventSourcing.EF is available on NuGet.
> dotnet add package Finaps.EventSourcing.EF
And Depending on which database you are using, make sure to install the right provider
> dotnet add package Microsoft.EntityFrameworkCore.SqlServer
or
> dotnet add package Npgsql.EntityFrameworkCore.PostgreSQL
Database & DBContext Setup
Like most Entity Framework Core applications, the database is managed by Migrations.
The Finaps.EventSourcing.EF
package adds migrations based on the Records (Events/Snapshots/Projections) you have defined and you are responsible for updating the database using them.
To access this functionality, you have to configure a DbContext which inherits from the RecordContext
class.
You can use the OnModelCreating
method to override or add new Entities to the context.
ASP.Net Core Configuration
Your DbContext
is configured in the same way as any other, refer to the Microsoft docs on how to do this,
but your configuration could look something like this:
// appsettings.json
{
"ConnectionStrings": {
"RecordStore": "<SQL Server/Postgres Connection String>"
}
}
// Startup.cs
public void ConfigureServices(IServiceCollection services)
{
services.AddDbContext<ViewContext>(options =>
{
options.UseSqlServer(configuration.GetConnectionString("RecordStore"));
// or
options.UseNpgsql(configuration.GetConnectionString("RecordStore"));
});
services.AddScoped<IRecordStore, EntityFrameworkRecordStore>();
services.AddScoped<IAggregateService, AggregateService>();
}
Now you can use the EntityFrameworkRecordStore
and AggregateService
to power your backend!
Cosmos DB
Nuget Package
Finaps.EventSourcing.Cosmos is available on NuGet.
> dotnet add package Finaps.EventSourcing.Cosmos
Database Setup
Finaps.EventSourcing supports Azure Cosmos DB. To create a Cosmos DB Account, Database and Container, checkout the Microsoft Documentation on Creating Cosmos DB Resources.
For local development, one can use the Docker Cosmos Emulator for Linux/macOS or Windows.
When Creating a Cosmos DB Container to use with Finaps.EventSourcing.Cosmos
, make sure to set PartitionKey
equal to /PartitionId
.
ASP.Net Core Configuration
// appsettings.json
{
"Cosmos": {
"ConnectionString": "<Cosmos Connection String>",
"Database": "<Cosmos Database Name>",
"Container": "<Cosmos Container Name>"
}
}
// Startup.cs
public void ConfigureServices(IServiceCollection services)
{
services.Configure<CosmosEventStoreOptions>(Configuration.GetSection("Cosmos"));
services.AddSingleton<IRecordStore, CosmosRecordStore>();
services.AddSingleton<IAggregateService, AggregateService>();
}
Now you can use the CosmosRecordStore
and AggregateService
to power your backend!
Basic Usage
These examples show how a (very simplified) bank account could be modelled using Finaps.EventSourcing
.
It shows how to use the three types of Records this package is concerned with: Events, Snapshots and Projections.
These examples work with both Finaps.EventSourcing.Cosmos
and Finaps.EventSourcing.EF
Checkout the Example Project for a more thorough example on how this package can be used.
1. Define Domain Events
Events are immutable Records that describe something that has happened to a particular Aggregate.
public record BankAccountCreatedEvent(string Name, string Iban) : Event<BankAccount>;
public record FundsDepositedEvent(decimal Amount) : Event<BankAccount>;
public record FundsWithdrawnEvent(decimal Amount) : Event<BankAccount>;
public record FundsTransferredEvent(decimal Amount, Guid DebtorAccount, Guid CreditorAccon : Event<BankAccount>;
Note:
- Events are scoped to a particular Aggregate class, specified by
Event<TAggregate>
- Events should be immutable, so either:
- use positional records**
- use
{ get; init; }
accessors
2. Define Aggregate
An Aggregate is an aggregation of one or more Events.
public class BankAccount : Aggregate<BankAccount>
{
public string Name { get; private set; }
public string Iban { get; private set; }
public decimal Balance { get; private set; }
protected override void Apply(Event<BankAccount> e)
{
switch (e)
{
case BankAccountCreatedEvent created:
Name = created.Name;
Iban = created.Iban;
break;
case FundsDepositedEvent deposit:
Balance += deposit.Amount;
break;
case FundsWithdrawnEvent withdraw:
Balance -= withdraw.Amount;
break;
case BankAccountFundsTransferredEvent transfer when transfer.DebtorAccount == Id:
Balance -= transfer.Amount;
break;
case BankAccountFundsTransferredEvent transfer when transfer.CreditorAccount == Id:
Balance += transfer.Amount;
break;
case BankAccountFundsTransferredEvent:
throw new InvalidOperationException("Not debtor nor creditor of this transaction");
default:
throw new ArgumentOutOfRangeException(nameof(e));
}
// An error is thrown if any event would cause the bank account balance to drop below 0
if (Balance < 0) throw new InvalidOperationException("Not enough funds");
}
}
Note:
- The
Aggregate.Apply
method contains the logic for aggregating Events.- Using C# 7+ pattern matching, logic can be added based on Event type and Aggregate State
- Aggregate properties should only be updated by applying Events, hence the
{ get; private set; }
accessors. - Aggregates reference themselves, specified by
Aggregate<TAggregate>
, which enables static type checking for theAggregate.Apply(Event<TAggregate>)
method.
3. Create & Persist an Aggregate
// Create new Bank Account Aggregate
var account = new BankAccount();
// This created a new Aggregate.Id
Assert.NotEqual(Guid.Empty, account.Id);
// But left all other values default
Assert.Equal(default, account.Name);
Assert.Equal(default, account.Iban);
Assert.Equal(default, account.Balance);
// Create the Bank Account by applying an Event
account.Apply(new BankAccountCreatedEvent("E. Vent", "SOME IBAN");
// Add some funds to this account using a convenience method
account.Apply(new FundsDepositedEvent(100));
// By calling the Apply method, the Aggregat is now updated
Assert.Equal("E. Vent" , account.Name);
Assert.Equal("SOME IBAN", account.Iban);
Assert.Equal(100 , account.Balance);
// Finally: Persist the Aggregate
// This will store the newly added Events for this BankAccount in the ```IRecordStore```
await AggregateService.PersistAsync(account);
4. Rehydrate & Update an Aggregate
When you want to update an existing Aggregate, you'll first need to rehydrate the Aggregate:
// Rehydrate existing BankAccount, i.e. apply all stored Events to this BankAccount
var account = await AggregateService.RehydrateAsync<BankAccount>(bankAccountId);
// Then add more funds to the account
account.Apply(new FundsDepositedEvent(50));
// Finally, Persist Aggregate. i.e. store the newly added Event(s)
await AggregateService.PersistAsync(account);
or alternatively, the three lines of code above can be replaced with the shorthand notation:
await AggregateService.RehydrateAndPersistAsync<BankAccount>(bankAccountId,
account => account.Apply(new FundsDepositedEvent(50));
5. Update Aggregates in a Transaction
Let's spice things up and transfer money from one bank account to another. In such a transaction we want to ensure the transaction either entirely succeeds or entirely fails.
Here's where transactions come into play:
// Create another BankAccount
var anotherAccount = new BankAccount();
anotherAccount.Apply(new BankAccountCreatedEvent("S. Ourcing", "ANOTHER IBAN");
// Define a transfer of funds
var transfer = new FundsTransferredEvent(20, account.Id, anotherAccount.Id);
// Add this Event to both Aggregates
account.Apply(transfer);
anotherAccount.Apply(transfer);
// Persist both aggregates in a single ACID transaction.
await AggregateService.PersistAsync(new[] { account, anotherAccount });
6. Create & Apply Snapshots
When many Events are stored for a given Aggregate, rehydrating that Aggregate will get more expensive. The meaning of 'many Events' will depend on backend and database hardware, but also your performance requirements. When performance impacts are expected (or even better, measured!), Snapshots can be used to mitigate them.
Snapshots work by storing a copy of the Aggregate state every n Events. When rehydrating, the latest Snapshot and all Events after that will be used, instead of applying all Events from scratch.
To use Snapshots, first define a Snapshot
:
// A Snapshot represents the full state of an Aggregate at a given point in time
public record BankAccountSnapshot(string Name, string Iban, decimal Balance) : Snapshot<BankAccount>;
Note:
- Like Events, Snapshots are scoped to an Aggregate, specified using
Snapshot<TAggregate>
- Like Events, Snapshots should be immutable: use positional records or
{ get; init; }
accessors.
Next, define a SnapshotFactory
:
// The Snapshot Factory is resposible for creating a Snapshot at a given Event interval
public class BankAccountSnapshotFactory : SnapshotFactory<BankAccount, BankAccountSnapshot>
{
// Create a snapshot every 100 Events
public override long SnapshotInterval => 100;
// Create a Snapshot from the Aggregate
protected override BankAccountSnapshot CreateSnapshot(BankAccount aggregate) =>
new BankAccountSnapshot(aggregate.Name, aggregate.Iban, aggregate.Balance);
}
When Persisting an Aggregate using the AggregateService
a Snapshot
will be created when the SnapshotInterval
is exceeded.
This means that Snapshots will not necessarily be created at exactly SnapshotInterval
increments when applying more Events than one at a time.
After creating the Snapshot
and SnapshotFactory
, we have to apply the Snapshot
in the Aggregate.Apply
method:
public class BankAccount : Aggregate<BankAccount>
{
public string Name { get; private set; }
public string Iban { get; private set; }
public decimal Balance { get; private set; }
protected override void Apply(Snapshot<BankAccount> e)
{
switch (e)
{
case BankAccountSnapshot snapshot:
Name = snapshot.Name;
Iban = snapshot.Iban;
Balance = snapshot.Balance;
break;
}
}
}
7. Point in time Rehydration
Sometimes we want to get the historical state of a particular Aggregate at a given point in time.
This is where Event Sourcing really shines, since it is as easy as applying all Events up to a certain date.
When using Snapshots, the latest Snapshot
before the given date
will be used to speed up these point in time rehydrations as well.
// Query the Bank Account for the January 1th, 2022
var account = await AggregateService.RehydrateAsync<BankAccount>(bankAccountId, new DateTime(2022, 01, 01));
8. Querying Records
All previous examples use the AggregateService
class,
which provides a high level API for rehydrating and persisting Aggregates.
To directly interact with Record
types (Events, Snapshots & Projections), one can use the IRecordStore
.
Some examples of what can be done using the record store:
// Get all Events for a particular Aggregate type
var events = await RecordStore.GetEvents<BankAccount>() // The RecordStore exposes Events/Snapshots/Projections Queryables
.Where(x => x.AggregateId == myAggregateId) // Linq can be used to query all Record types
.OrderBy(x => x.Index) // Order by Aggregate Index
.AsAsyncEnumerable() // Call the AsAsyncEnumerable extension method to finalize the query
.ToListAsync(); // Use any System.Linq.Async method to get the result you want
// Get latest Snapshot for a particular Aggregate
var result = await RecordStore.GetSnapshots<BankAccount>()
.Where(x => x.AggregateId == myAggregateId)
.OrderByDescending(x => x.Index)
.AsAsyncEnumerable()
.FirstAsync();
Not All Linq operations are supported by CosmosDB. For an overview of the supported linq queries in CosmosDB, please refer to the CosmosDB Linq to SQL Translation documentation.
9. Creating & Querying Projections
While Event Sourcing is really powerful, it is not well suited for querying many Aggregates at one time. By creating an easily queryable read-only view of an Aggregate, Projections try to tackle this problem.
Creating Projections works the same as creating Snapshots: just define a Projection
and a ProjectionFactory
:
// A Projection represents the current state of an Aggregate
public record BankAccountProjection(string Name, string Iban) : Projection;
// The Projection factory is responsible for creating a Projection every time the Aggregate is persisted
public class BankAccountProjectionFactory : ProjectionFactory<BankAccount, BankAccountProjection>
{
// This particular projection could be used for e.g. an overview page
// We left out 'Balance' (privacy reasons) and made 'Name' uppercase
// Any transformation could be done here, e.g. to make frontend consumption easier/faster
protected override BankAccountProjection CreateProjection(BankAccount aggregate) =>
new BankAccountProjection(aggregate.Name.ToUpper(), aggregate.Iban);
}
Projections are updated whenever the Aggregate of a particular type are persisted. You can make as many Projections for a given Aggregate type as you like.
To query Projections, use the RecordStore
API:
// Get first 10 BankAccount Projections, ordered by the Bank Account name
var projections = await RecordStore.GetProjections<BankAccountProjection>()
.OrderBy(x => x.Name)
.Skip(0)
.Take(10)
.AsAsyncEnumerable()
.ToListAsync();
Advanced Usage
1. Aggregate References
Note: currently this feature is only available in Finaps.EventSourcing.EF
Aggregates don't usually live in a vacuum, but are related to other Aggregates. However, because Events are the source of truth and Aggregates are never directly persisted, defining foreign keys to ensure data integrity is less trivial than in non-EventSourced systems.
How do we, in the example below, ensure that PostAggregate.BlogId
is actually valid?
public class Blog : Aggregate<Blog>
{
public string Name { get; private set; }
}
public class Post : Aggregate<Post>
{
public Guid BlogId { get; private set; }
public string Content { get; private set; }
}
We can solve this by validating all Events that contain BlogId
public record PostCreated(Guid BlogId, string Content) : Event<Post>;
To do this, add the following line of code for every Aggregate reference to the DbContext.OnModelCreating
:
builder.AggregateReference<PostCreated, Blog>(x => x.BlogId);
This creates a relation between the PostCreated
Event and the first Event of the referenced Blog
.
This technique can be used, alongside other techniques, to increase the data integrity of your application.
Concepts
Records
This package stores three types of Records using the
IRecordStore
:
Events,
Snapshots and
Projections.
Records are always defined with respect to an Aggregate.
The abstract base Record
is defined below:
public abstract record Record
{
public RecordKind Kind { get; } // = Event | Snapshot | Projection
public string Type { get; init; } // = nameof(<MyRecordType>)
public string? AggregateType { get; init; } // = nameof(<MyAggregateType>)
public Guid PartitionId { get; init; } // = Aggregate.PartitionId
public Guid AggregateId { get; init; } // = Aggregate.Id
public Guid RecordId { get; init; } // = Guid.NewGuid()
public DateTimeOffset Timestamp { get; init; } // Event/Snapshot/Projection creation time
}
1. Events
Events are Records that describe what happened to an Aggregate. They are added to an append only store and form the source of truth for an Aggregate.
The base Event
is defined below:
public record Event : Record
{
public long Index { get; init; } // The index of this Event in the Event Stream
}
2. Snapshots
Snapshots are Events that describe the complete state of an Aggregate at a particular Event
index.
Snapshots can be used to speed up the rehydration of Aggregates.
The base Snapshot
is defined below:
public record Snapshot : Event;
3. Projections
Projections are Records that describe the current state of an Aggregate (and hence the Event
stream).
Projections can be used to speed up queries, especially those involving many Aggregates at the same time.
The base Projection
is defined below:
public record Projection : Record
{
public string? FactoryType { get; init; } // = nameof(<MyProjectionFactory>)
public long Version { get; init; } // = Aggregate.Version
public string Hash { get; init; } // Projection Hash Code, see "Updating Projections"
public bool IsUpToDate { get; } // True if Projection is up to date
}
Updating Projections
Unlike Events, Projections are not a source of truth, but depend on the following data:
- The
Event
stream - The
Aggregate.Apply
logic - The
ProjectionFactory.CreateProjection
logic
In order to accurately reflect the current state, Projection
s have to be updated whenever any of these data changes.
The first point, the Event
stream, is trivial to solve: The AggregateService
will simply update the Projection
whenever Events are persisted.
The last two points are less trivial, since they rely on user code.
To provide a solution, the Projection.Hash
stores a hash representation of the IL Bytecode of the methods that define Projections.
When querying projections, we can compare the stored hash to the current hash to see whether the projection was created using up to date code.
Projection.IsUpToDate
reflects this comparison.
Now we know whether a Projection
is out of date, we can actually update it using the following methods:
- Simply
RehydrateAndPersist
the aggregate with the correspondingAgregateType
,PartitionId
andAggregateId
. - Use the
ProjectionUpdateService
to bulk update may Projections at once.
Aggregates
Aggregates are the result of applying one or more Events.
The base Aggregate is defined below:
public abstract class Aggregate
{
public string Type { get; init; } // = nameof(<MyAggregateType>)
public Guid PartitionId { get; init; } // = Guid.Empty (Can be used to partition data)
public Guid Id { get; init; } // Unique Aggregate Identifier
public long Version { get; private set; } // The number of Events applied to this Aggregate
protected abstract void Apply(Event e); // Logic to apply Events
}
SQL vs NoSQL
Finaps.EventSourcing.Core
supports both SQL (SQL Server, Postgres) and NoSQL (CosmosDB) databases.
While the same IRecordStore
API is exposed for all databases, there are differences.
Through the topics Storage, Integrity, Migrations and Performance their respective features are covered.
Storage
This package stores Events, Snapshots and Projections, but the way they are stored differs between NoSQL and SQL databases.
Consider the following Events:
public record BankAccountCreatedEvent(string Name, string Iban) : Event<BankAccount>;
public record BankAccountFundsDepositedEvent(decimal Amount) : Event<BankAccount>;
NoSQL Record Representation
For NoSQL, Events, Snapshots and Projections are stored as JSON in the same collection, which allows for great flexibility when it comes to creating, updating and querying them:
- No database migrations have to be done
- Arbitrary or changing data can be stored and queried
The NoSQL JSON representation of the Bank Account Events mentioned above will look like this:
[{
"AggregateType": "BankAccount",
"Type": "BankAccountCreatedEvent",
"Kind": 1, // RecordKind.Event
// Unique Id, encoding <Kind>|<AggregateId>[<Index>]
"id": "Event|f543d76a-3895-48e2-a836-f09d4a00cd7f[0]",
"PartitionId": "00000000-0000-0000-0000-000000000000",
"AggregateId": "f543d76a-3895-48e2-a836-f09d4a00cd7f",
"Index": 0,
"Timestamp": "2022-03-07T15:29:19.941474+01:00",
"Name": "E. Sourcing",
"Iban": "SOME IBAN"
}, {
"AggregateType": "BankAccount",
"Type": "BankAccountFundsDepositedEvent",
"Kind": 1, // RecordKind.Event
// Unique Id, encoding <Kind>|<AggregateId>[<Index>]
"id": "Event|f543d76a-3895-48e2-a836-f09d4a00cd7f[1]",
"PartitionId": "00000000-0000-0000-0000-000000000000",
"AggregateId": "f543d76a-3895-48e2-a836-f09d4a00cd7f",
"Index": 1,
"Timestamp": "2022-03-07T15:29:19.942085+01:00",
"Amount": 100,
}]
Snapshots and Projections are stored similarly.
SQL Record Representation
SQL is a bit less flexible when storing Events, Snapshots:
- Entity Framework Core Migrations have to be created and applied every time you create/update
Event
,Snapshot
andProjection
models. - Events and Snapshots are stored in a table per Aggregate Type using Table per Hierarchy.
- pro: querying is efficient, since all Events for a given Aggregate are in one table and no joins are needed to rehydrate an Aggregate.
- con: there will be redundant
NULL
columns when they are not applicable for a given Event Type.
The SQL Database representation of the Bank Account Events mentioned above will be:
PartitionId | AggregateId | Index | AggregateType | Type | Timestamp | Name | IBAN | Amount |
---|---|---|---|---|---|---|---|---|
00000000-0000-0000-0000-000000000000 | d85e6b59-add6-46bd-bae9-f7aa0f3140e5 | 0 | BankAccount | BankAccountCreatedEvent | 2022-04-19 12:16:41.213708 +00:00 | E. Vent | SOME IBAN | NULL |
00000000-0000-0000-0000-000000000000 | d85e6b59-add6-46bd-bae9-f7aa0f3140e5 | 1 | BankAccount | BankAccountFundsDepositedEvent | 2022-04-19 12:16:41.215338 +00:00 | NULL | NULL | 100 |
Projections are stored in a unique table per Projection
type.
Integrity
To ensure data integrity in the context of Event Sourcing one has to:
- validate Events
- validate Events w.r.t. Aggregate State
While both validations can be done using C# code in e.g. the Aggregate.Apply
method,
EF Core adds the option to validate Events at database level using
Check Constraints,
Unique Constraints
Foreign Key Constraints
and AggregateReferences.
Migrations
When developing applications, updates to Event models are bound to happen.
Depending on which database powers your EventSourcing (NoSQL Finaps.EventSourcing.Cosmos
or SQL Finaps.EventSourcing.EF
),
special care needs to be taken in order ensure backwards compatibility.
NoSQL
When updating Event models using the Finaps.EventSourcing.Cosmos
package,
all existing Events will remain the way they were written to the database initially.
Your code has to handle both the original as well as the updated Event models.
The following strategies can be used:
When adding properties to an Event record, consider making these properties nullable: this will ensure existing events without these properties are handled correctly in your application logic. You can also specify a default value for the property right on the Event record.
When removing properties from an Event model, no special care has to be taken, they will simply be ignored by the JSON conversion.
When drastically changing your Event model, consider making an entirely new Event model instead and handle both the old and the new in the
Aggregate.Apply
method.When changing data types, ensure that they map to the same json representation. Be very careful when doing this.
SQL
When updating Event models using the Finaps.EventSourcing.EF
package,
all existing Events will be updated with when applying Entity Framework Core Migrations.
Special care has to be taken to not change existing Event data in the database when changing Event models.
For SQL, most NoSQL strategies mentioned above are also applicable, however:
5When adding constraints, you can choose to validate them against all existing Events in the database, allowing you to reason over the validity of all Events as a whole.
Performance
TODO: Performance Testing & Metrics
Example Project
For a more thorough example using the CosmosDB database, check out the Example Project and corresponding Example Tests.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net6.0 is compatible. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
-
net6.0
- Azure.Core (>= 1.19.0)
- Finaps.EventSourcing.Core (>= 0.3.0)
- Microsoft.Azure.Cosmos (>= 3.21.0)
- Microsoft.Extensions.Options (>= 6.0.0)
- System.Linq.Async (>= 6.0.1)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.