Mostlylucid.Ephemeral.Patterns.ReactiveFanOut
2.3.2
dotnet add package Mostlylucid.Ephemeral.Patterns.ReactiveFanOut --version 2.3.2
NuGet\Install-Package Mostlylucid.Ephemeral.Patterns.ReactiveFanOut -Version 2.3.2
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Mostlylucid.Ephemeral.Patterns.ReactiveFanOut" Version="2.3.2" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Mostlylucid.Ephemeral.Patterns.ReactiveFanOut" Version="2.3.2" />
<PackageReference Include="Mostlylucid.Ephemeral.Patterns.ReactiveFanOut" />
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Mostlylucid.Ephemeral.Patterns.ReactiveFanOut --version 2.3.2
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: Mostlylucid.Ephemeral.Patterns.ReactiveFanOut, 2.3.2"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Mostlylucid.Ephemeral.Patterns.ReactiveFanOut@2.3.2
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Mostlylucid.Ephemeral.Patterns.ReactiveFanOut&version=2.3.2
#tool nuget:?package=Mostlylucid.Ephemeral.Patterns.ReactiveFanOut&version=2.3.2
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
Mostlylucid.Ephemeral.Patterns.ReactiveFanOut
Two-stage reactive pipeline that automatically throttles stage 1 when stage 2 signals backpressure.
dotnet add package mostlylucid.ephemeral.patterns.reactivefanout
Quick Start
using Mostlylucid.Ephemeral.Patterns.ReactiveFanOut;
await using var pipeline = new ReactiveFanOutPipeline<Message>(
stage2Work: async (msg, ct) => await SaveToDbAsync(msg, ct),
preStageWork: async (msg, ct) => await ValidateAsync(msg, ct),
stage1MaxConcurrency: 16,
stage1MinConcurrency: 2,
stage2MaxConcurrency: 4,
backpressureThreshold: 100);
await pipeline.EnqueueAsync(message);
await pipeline.DrainAsync();
All Options
new ReactiveFanOutPipeline<T>(
// Required: stage 2 async work body
stage2Work: async (item, ct) => await ProcessAsync(item, ct),
// Optional: pre-stage work (runs in stage 1 before handoff)
// Default: null (no-op)
preStageWork: async (item, ct) => await ValidateAsync(item, ct),
// Stage 1 max concurrency (scales down under pressure)
// Default: 8
stage1MaxConcurrency: 8,
// Stage 1 min concurrency (floor when throttled)
// Default: 1
stage1MinConcurrency: 1,
// Stage 2 max concurrency (fixed)
// Default: 4
stage2MaxConcurrency: 4,
// Stage 2 pending count that triggers backpressure
// Default: 32
backpressureThreshold: 32,
// Stage 2 pending count that clears backpressure
// Default: 8
reliefThreshold: 8,
// Cooldown between concurrency adjustments (ms)
// Default: 200
adjustCooldownMs: 200,
// Optional shared signal sink
// Default: null (creates internal)
sink: signalSink
)
API Reference
// Enqueue work item
await pipeline.EnqueueAsync(item, ct);
// Check current stage 1 concurrency
int stage1Concurrency = pipeline.Stage1CurrentMaxConcurrency;
// Check stage 2 pending count
int stage2Pending = pipeline.Stage2Pending;
// Drain both stages and dispose
await pipeline.DrainAsync(ct);
await pipeline.DisposeAsync();
How It Works
Stage 1 (Validation/Transform) Stage 2 (Slow I/O)
┌─────────────────────────────┐ ┌─────────────────┐
│ Max: 16, Min: 2 │───>│ Max: 4 │
│ Dynamic based on pressure │ │ Fixed │
└─────────────────────────────┘ └─────────────────┘
│
▼
Pending > 32? ──> Throttle Stage 1
Pending < 8? ──> Restore Stage 1
Signals emitted:
stage2.backpressure- When stage 2 pending exceeds thresholdstage2.failed- When stage 2 work fails
Example: ETL Pipeline
await using var pipeline = new ReactiveFanOutPipeline<Record>(
stage2Work: async (record, ct) =>
{
// Slow database insert
await database.InsertAsync(record, ct);
},
preStageWork: async (record, ct) =>
{
// Fast validation and transform
await ValidateSchema(record, ct);
record.Timestamp = DateTimeOffset.UtcNow;
},
stage1MaxConcurrency: 32,
stage1MinConcurrency: 4,
stage2MaxConcurrency: 8,
backpressureThreshold: 200,
reliefThreshold: 50);
// When DB slows down, Stage 1 throttles automatically
foreach (var record in records)
await pipeline.EnqueueAsync(record);
Example: Monitoring Pipeline State
var sink = new SignalSink();
await using var pipeline = new ReactiveFanOutPipeline<Data>(
stage2Work: ProcessDataAsync,
sink: sink);
// Monitor in background
Task.Run(async () =>
{
while (true)
{
Console.WriteLine($"Stage1 Concurrency: {pipeline.Stage1CurrentMaxConcurrency}");
Console.WriteLine($"Stage2 Pending: {pipeline.Stage2Pending}");
var backpressure = sink.Sense(s => s.Signal == "stage2.backpressure");
if (backpressure.Any())
Console.WriteLine("! Backpressure active");
await Task.Delay(1000);
}
});
Related Packages
| Package | Description |
|---|---|
| mostlylucid.ephemeral | Core library |
| mostlylucid.ephemeral.patterns.backpressure | Simple backpressure |
| mostlylucid.ephemeral.patterns.dynamicconcurrency | Dynamic concurrency |
| mostlylucid.ephemeral.complete | All in one DLL |
License
Unlicense (public domain)
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
-
net10.0
- mostlylucid.ephemeral (>= 2.3.2)
-
net8.0
- mostlylucid.ephemeral (>= 2.3.2)
-
net9.0
- mostlylucid.ephemeral (>= 2.3.2)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on Mostlylucid.Ephemeral.Patterns.ReactiveFanOut:
| Package | Downloads |
|---|---|
|
mostlylucid.ephemeral.complete
Meta-package that references all Mostlylucid.Ephemeral packages - bounded async execution with signals, atoms, and patterns. Install this single package to get everything. |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 2.3.2 | 88 | 1/9/2026 |
| 2.3.1 | 102 | 1/9/2026 |
| 2.3.1-alpha0 | 85 | 1/9/2026 |
| 2.3.0 | 287 | 1/8/2026 |
| 2.3.0-alpha1 | 92 | 1/8/2026 |
| 2.1.0 | 91 | 1/8/2026 |
| 2.1.0-preview | 88 | 1/8/2026 |
| 2.0.1 | 90 | 1/8/2026 |
| 2.0.0 | 128 | 1/8/2026 |
| 2.0.0-alpha1 | 89 | 1/8/2026 |
| 1.7.1 | 418 | 12/11/2025 |
| 1.6.8 | 433 | 12/9/2025 |
| 1.6.7 | 426 | 12/9/2025 |
| 1.6.6 | 426 | 12/9/2025 |
| 1.6.5 | 432 | 12/9/2025 |
| 1.6.0 | 410 | 12/8/2025 |
| 1.5.0 | 414 | 12/8/2025 |
| 1.3.0 | 292 | 12/7/2025 |
| 1.2.2 | 288 | 12/7/2025 |
| 1.1.0-preview2 | 200 | 12/7/2025 |
| 1.0.0-preview3 | 202 | 12/7/2025 |