Fluens.Messaging.Sagas
0.7.5
dotnet add package Fluens.Messaging.Sagas --version 0.7.5
NuGet\Install-Package Fluens.Messaging.Sagas -Version 0.7.5
<PackageReference Include="Fluens.Messaging.Sagas" Version="0.7.5" />
<PackageVersion Include="Fluens.Messaging.Sagas" Version="0.7.5" />
<PackageReference Include="Fluens.Messaging.Sagas" />
paket add Fluens.Messaging.Sagas --version 0.7.5
#r "nuget: Fluens.Messaging.Sagas, 0.7.5"
#:package Fluens.Messaging.Sagas@0.7.5
#addin nuget:?package=Fluens.Messaging.Sagas&version=0.7.5
#tool nuget:?package=Fluens.Messaging.Sagas&version=0.7.5
Fluens.Messaging.Sagas
Process Manager / Saga abstraction layered on top of Fluens.Messaging. State lives in the module's own
FluensDbContext, the saga write commits in the same SaveChangesAsync as the inbox ACK and any
business outbox row, and the four governing ADRs are 0028 (saga abstraction), 0029 (portable
uint Version concurrency), 0030 (timeouts via deferred outbox), and 0031 (operator surfaces +
saga metrics).
Installation
dotnet add package Fluens.Messaging.Sagas
Usage
Registration
WithSagas(...) is the single entry point hung off MessagingModuleBuilder; invoke it from inside
the AddModuleMessaging<TDbContext> callback so the builder's IServiceCollection + TDbContext
are seeded. Every saga tuple is registered via RegisterSaga<TSaga, TData>(); WithSagas reflects
over each saga's IStartedBy<T> / IHandleSagaMessage<T> / IHandleSagaTimeout<T> markers and
wires one closed-generic SagaDispatcher<TSaga, TData, TMessage> per consumed message type as the
module's IMessageConsumer<TMessage> (ADR-0028 Rule 7).
module.AddMessaging<OrdersDbContext>(msg =>
{
msg.Publishes<OrderCreatedMessage>();
msg.WithSagas(s =>
{
s.RegisterSaga<OrderFulfillmentSaga, OrderFulfillmentData>();
});
});
WithSagas(...) auto-wires the SagaSaveChangesInterceptor singleton on every module
FluensDbContext opting into sagas (the interceptor is registered both as its concrete type and as
an ISaveChangesInterceptor enumerable entry so FluensDbContext.OnConfiguring picks it up
without you touching OnConfiguring; mirrors AddOutboxInterceptor's idempotency pattern per
ADR-0029 Rule 4 — the double-registration guard lives on OnConfiguring, not on the extension
method). An empty WithSagas block is a structural no-op (mirrors the AddXxx() convention).
Authoring a saga
A saga is an abstract subclass of Saga<TData> where TData : SagaData, new(). The base
SagaData carries the infrastructure columns Guid Id / DateTimeOffset CreatedAt /
DateTimeOffset? CompletedAt / [ConcurrencyCheck] uint Version — handler code never reads or
writes them.
public sealed class OrderFulfillmentData : SagaData
{
public Guid OrderId { get; set; }
public string CustomerEmail { get; set; } = string.Empty;
public bool PaymentCaptured { get; set; }
public Guid? OutstandingTimeoutToken { get; set; }
}
public sealed class OrderFulfillmentSaga
: Saga<OrderFulfillmentData>,
IStartedBy<OrderPlaced>,
IHandleSagaMessage<PaymentCaptured>,
IHandleSagaTimeout<PaymentTimeout>
{
protected override void Configure(ISagaCorrelation<OrderFulfillmentData> c)
{
c.Correlate<OrderPlaced>(m => m.OrderId, d => d.OrderId);
c.Correlate<PaymentCaptured>(m => m.OrderId, d => d.OrderId);
}
public async Task Handle(OrderPlaced msg, ISagaContext ctx, CancellationToken ct)
{
Data.OrderId = msg.OrderId;
Data.CustomerEmail = msg.CustomerEmail;
Data.OutstandingTimeoutToken =
ctx.RequestTimeout(new PaymentTimeout(msg.OrderId), TimeSpan.FromMinutes(15));
await ctx.Publisher.Publish(new RequestPayment(msg.OrderId));
}
public Task Handle(PaymentCaptured msg, ISagaContext ctx, CancellationToken ct)
{
Data.PaymentCaptured = true;
MarkAsComplete();
return Task.CompletedTask;
}
public Task Handle(PaymentTimeout payload, ISagaContext ctx, CancellationToken ct)
{
var firedToken = Guid.Parse(ctx.Headers["fluens-saga-timeout-token"]);
if (Data.OutstandingTimeoutToken != firedToken)
return Task.CompletedTask; // stale firing — silent-ignore
// compensating action
return Task.CompletedTask;
}
}
Correlation
Saga lookup is by business correlation key declared in Configure via
ISagaCorrelation<TData>.Correlate(messageKey, dataKey). The dispatcher emits a single
SELECT … WHERE <dataKey> = @value and only uses Guid Id lookup when the saga author explicitly
correlates on Id. The pure SagaCorrelationMap<TData> planner is unit-testable without a DB.
A non-start message arriving before its start message follows the silent-ignore path
(ACK + Information log, no orphan row). The start message arriving twice (idempotent inbox retry)
finds the existing row and re-enters Handle, not creating a second row.
SagaPublisherWrapper auto-stamps the fluens-saga-correlation: <sagaType>:<correlationKey>
header on every non-timeout outbound message before publish — cross-app traceability is the
default branch, there is no [CrossAppSaga] opt-in.
Completion and retention
MarkAsComplete() is soft — sets CompletedAt, never deletes the row. The partial pending index
WHERE CompletedAt IS NULL excludes completed rows from the saga's hot-path. Subsequent messages
for a completed correlation key route to silent-ignore (ADR-0028 Rule 6).
Retention of completed rows is handled by the per-saga-type SagaRetentionSweeper<TData> plugged
into the existing per-module HousekeepingWorker via the generic IRetentionSweeper hook
(Fluens.Messaging.Maintenance) — there is no separate saga retention worker. The cutoff is
MessagingOptions.SagaRetentionDays (default 7, validated > 0).
Timeouts (ADR-0030)
A saga timeout is a message type implementing the empty ISagaTimeout : IMessage marker handled
via IHandleSagaTimeout<TTimeout> where TTimeout : ISagaTimeout. The two markers
(IHandleSagaMessage<T> and IHandleSagaTimeout<T>) MUST NOT be mixed on one type.
ctx.RequestTimeout<TTimeout>(payload, delay) returns a fresh Guid token; storing it on TData
for the later stale-check is the saga author's responsibility (the framework does not auto-stamp
it onto state). SagaPublisherWrapper stamps fluens-saga-timeout-token: <Guid> +
fluens-directed-module: <currentModule> (the bare module name — ADR-0032 corrected the
prior <currentApp>/<currentModule> shape that mismatched the in-app subscription registry and
dropped every timeout) + the DeliverAfter(delay) offset on every ISagaTimeout outbound, so the
timeout returns to the originating saga module via the existing partitioned inbox. The directed-module header is load-bearing: ISagaTimeout outbound NEVER
crosses the mesh — LocalFanOut.DistributeLocallyAsync honours fluens-directed-module
(ADR-0019), and IngressDistributionProcessor honours it identically (ADR-0012).
The framework does NOT pre-compare the timeout token against any saga field — the saga author
writes the stale-check (if (Data.Token != ctx.Headers["fluens-saga-timeout-token"]) return;).
A stale firing is observable via the saga.timeout.stale counter when the author flags it.
ctx.CancelTimeoutAsync(token) is best-effort + race-aware:
DELETE FROM outbox WHERE … AND sent_at IS NULL over the named token; 0 rows affected does NOT
throw. A timeout already delivered cannot be cancelled — the saga's stale-token check is the
load-bearing invariant.
Sub-second timeout precision is OUT of scope — the cadence is
TransportProcessorIntervalSeconds (default 60s) for the outbox-to-inbox leg and
InboxProcessorIntervalSeconds (default 30s) for the inbox-to-handler leg.
Concurrency (ADR-0029)
SagaData.Version is the portable optimistic-concurrency token: [ConcurrencyCheck] uint Version
incremented by SagaSaveChangesInterceptor on EntityState.Modified entries only (Added keeps
Version = 0, the INSERT carries that value). A DbUpdateConcurrencyException raised by the saga
commit propagates to the outer LaneWorker retry chain unchanged — the two-stage RetryPolicy
(ADR-0011) classifies it as transient and replays on a fresh scope. The saga library never catches
the exception, never classifies it as IPermanentConsumeFailure, and never implements its own
retry loop. Each replay attempt observable on the saga.concurrency_conflict counter.
Provider-specific tokens (byte[] RowVersion / SQL Server rowversion/timestamp, PostgreSQL
xmin) and replacement types (long / int / Guid) are forbidden — portability across SQL
Server / PostgreSQL / future providers is the design goal.
Operator surfaces (ADR-0031 Rule 1)
Three SRP-aligned interfaces mirror the ADR-0014 messaging split — each internal sealed,
per-module, scoped to the module's own FluensDbContext via IServiceScopeFactory. The "Admin"
suffix is forbidden (ADR-0014). The consumer depends on the narrowest interface needed; absence of
a registration is a no-op / empty result, not an exception.
ISagaMaintenance— read / diagnostics:QuerySagasAsync(SagaFilter)→IReadOnlyList<SagaSummary>(reflection-iterates every registeredSagaDatasubclass viadbContext.Model.GetEntityTypes()+ cachedMethodInfo.MakeGenericMethodper.claude/rules/di-conventions.md);GetSagaAsync(sagaId)→SagaDetailwhoseStateis a reflection-based projection over EF-mapped properties skipping the four base columns (Id/CreatedAt/CompletedAt/Version).ISagaRecovery— replay / retry, forward-looking:ReplaySagaAsync(sagaId)stages aSagaReplaySignaloutbox row.ISagaControl— destructive override:ForceCompleteSagaAsync(sagaId, reason)setsCompletedAt = now()AND stages aSagaForceCompleteEventoutbox row carrying the operator'sreasonin one transaction (the event flows through the standard outbox path; consumers that care about saga-completion-audit subscribe normally);CancelTimeoutAsync(token)is best-effort + race-aware.
Diagnostics (ADR-0031 Rule 3)
Saga instruments ride the existing Fluens.Messaging BCL meter — no new static class, no new
meter name. Seven new instruments added to MessagingMetrics itself, all tagged with module +
saga.type:
saga.active— absoluteObservableGauge<long>backed by a per-(module, saga.type)ConcurrentDictionaryregistry (mirroringinbox.lag's pattern, ADR-0015 / ADR-0031 Rule 4 — restart-safe, never a+1/-1delta). Fed by the per-module hostedSagaActiveObserver<TDbContext>(cadence reusesInboxProcessorIntervalSeconds).saga.started,saga.completed,saga.timeout.fired,saga.timeout.stale,saga.concurrency_conflict— counters.saga.handle.duration— histogram (ms).
The recording methods on MessagingMetrics (SetSagaActive / RemoveSagaActive /
RecordSagaStarted / RecordSagaCompleted / RecordSagaTimeoutFired /
RecordSagaTimeoutStale / RecordSagaConcurrencyConflict / RecordSagaHandleDuration) take
only primitive types so the static class stays type-agnostic (no saga type is referenced from
Fluens.Messaging).
Cross-app caveat
Cross-app traceability is the default branch — there is no [CrossAppSaga] opt-in attribute.
The cross-app posture of a given outbound message is decided per-message-type by the existing
subscription topology; the saga abstraction stamps the correlation header unconditionally.
ISagaTimeout outbound is the single exception: the wrapper auto-stamps fluens-directed-module
so the timeout returns to the originating saga module via the existing partitioned inbox — a
timeout NEVER crosses the mesh to a non-originating app, by structural invariant
(LocalFanOut + IngressDistributionProcessor honour the header symmetrically; the library
does not expose a way to disable this stamping).
Configuration
The saga retention window is bound from the existing "Fluens:Messaging" configuration section:
| Property | Default | Description |
|---|---|---|
SagaRetentionDays |
7 | Retention in days for terminal saga rows (CompletedAt != null AND CompletedAt < cutoff) before SagaRetentionSweeper<TData> batch-deletes them via the per-module HousekeepingWorker. Must be > 0. |
No new configuration section — the saga library extends the existing MessagingOptions block.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Fluens (>= 0.7.5)
- Fluens.Kernel (>= 0.7.5)
- Fluens.Messaging (>= 0.7.5)
- Microsoft.EntityFrameworkCore.Relational (>= 10.0.9)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.9)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.