Rivulet.Pipeline
2.0.0
dotnet add package Rivulet.Pipeline --version 2.0.0
NuGet\Install-Package Rivulet.Pipeline -Version 2.0.0
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Rivulet.Pipeline" Version="2.0.0" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Rivulet.Pipeline" Version="2.0.0" />
<PackageReference Include="Rivulet.Pipeline" />
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Rivulet.Pipeline --version 2.0.0
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: Rivulet.Pipeline, 2.0.0"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Rivulet.Pipeline@2.0.0
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Rivulet.Pipeline&version=2.0.0
#tool nuget:?package=Rivulet.Pipeline&version=2.0.0
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
Rivulet.Pipeline
Multi-stage pipeline composition for Rivulet with fluent API, per-stage concurrency, backpressure management between stages, and streaming support.
Features
- Fluent Builder API - Type-safe pipeline construction with IntelliSense support
- Per-Stage Concurrency - Different parallelism levels for each processing stage
- Backpressure Management - Automatic flow control between stages using channels
- Streaming & Buffered Modes - Memory-efficient streaming or materialized results
- Full Rivulet.Core Integration - Retries, circuit breakers, rate limiting, metrics
API
- PipelineBuilder.Create - Create type-safe pipeline builders with fluent API
- SelectParallel - Parallel transform stage with async/sync selectors
- WhereParallel - Parallel filter stage with async/sync predicates
- Batch - Group items into fixed-size batches with optional timeout
- BatchSelectParallel - Batch and transform items in parallel
- SelectManyParallel - Flatten/expand collections in parallel
- Tap - Execute side effects without transforming items
- Buffer - Decouple upstream/downstream with channel-based buffering
- Throttle - Rate limit items using token bucket algorithm
- ExecuteAsync - Execute pipeline and collect all results
- ExecuteStreamAsync - Stream results as IAsyncEnumerable
Quick Start
using Rivulet.Pipeline;
var pipeline = PipelineBuilder.Create<string>("MyPipeline")
.SelectParallel(
async (url, ct) => await httpClient.GetStringAsync(url, ct),
new StageOptions { ParallelOptions = new() { MaxDegreeOfParallelism = 32 } })
.SelectParallel(
(html, ct) => ValueTask.FromResult(ParseHtml(html)),
new StageOptions { ParallelOptions = new() { MaxDegreeOfParallelism = 16 } })
.Batch(100)
.SelectParallel(
async (batch, ct) => { await db.BulkInsertAsync(batch, ct); return batch.Count; },
new StageOptions { ParallelOptions = new() { MaxDegreeOfParallelism = 4 } })
.Build();
// Execute with streaming (memory efficient)
await foreach (var result in pipeline.ExecuteStreamAsync(urls.ToAsyncEnumerable()))
{
Console.WriteLine($"Batch saved: {result} records");
}
// Or materialize all results
var results = await pipeline.ExecuteAsync(urls);
Stage Types
| Stage | Description |
|---|---|
SelectParallel |
Transform each item in parallel |
WhereParallel |
Filter items in parallel |
Batch |
Group items into fixed-size batches |
BatchSelectParallel |
Batch and transform in one stage |
SelectManyParallel |
Flatten collections in parallel |
Tap |
Execute side effects without transformation |
Buffer |
Decouple stages with explicit buffering |
Throttle |
Rate limit items flowing through |
Pipeline-Wide Configuration
var pipeline = PipelineBuilder.Create<string>("ResilientPipeline")
.SelectParallel(ProcessAsync)
.WithRetries(3, strategy: BackoffStrategy.ExponentialJitter)
.WithCircuitBreaker(5, TimeSpan.FromSeconds(30))
.WithRateLimit(100, burstCapacity: 200)
.Build();
Documentation
See the full documentation for detailed guides and API reference.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
-
net8.0
- Rivulet.Core (>= 2.0.0)
-
net9.0
- Rivulet.Core (>= 2.0.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 2.0.0 | 92 | 3/24/2026 |