NPipeline.Extensions.Parallelism
0.15.0
See the version list below for details.
dotnet add package NPipeline.Extensions.Parallelism --version 0.15.0
NuGet\Install-Package NPipeline.Extensions.Parallelism -Version 0.15.0
<PackageReference Include="NPipeline.Extensions.Parallelism" Version="0.15.0" />
<PackageVersion Include="NPipeline.Extensions.Parallelism" Version="0.15.0" />
<PackageReference Include="NPipeline.Extensions.Parallelism" />
paket add NPipeline.Extensions.Parallelism --version 0.15.0
#r "nuget: NPipeline.Extensions.Parallelism, 0.15.0"
#:package NPipeline.Extensions.Parallelism@0.15.0
#addin nuget:?package=NPipeline.Extensions.Parallelism&version=0.15.0
#tool nuget:?package=NPipeline.Extensions.Parallelism&version=0.15.0
NPipeline Extensions Parallelism
NPipeline Extensions Parallelism is a high-performance package that provides configurable parallel execution strategies for NPipeline pipelines. This package enables developers to implement efficient parallel processing with configurable backpressure handling, queue policies, and performance monitoring capabilities.
About NPipeline
NPipeline is a high-performance, extensible data processing framework for .NET that enables developers to build scalable and efficient pipeline-based applications. It provides a rich set of components for data transformation, aggregation, branching, and parallel processing, with built-in support for resilience patterns and error handling.
Installation
dotnet add package NPipeline.Extensions.Parallelism
Requirements
- .NET 8.0, 9.0, or 10.0
- NPipeline core package
- System.Threading.Tasks.Dataflow (automatically included for .NET 8.0)
Execution Strategies
The package provides multiple execution strategies to handle different parallel processing scenarios:
BlockingParallelStrategy
The default strategy that preserves ordering and applies end-to-end backpressure using TPL Dataflow. This strategy is ideal for scenarios requiring flow control and ordered output.
- Preserves input ordering in output
- Applies backpressure to prevent memory buildup
- Configurable queue bounds for input and output
- Best for: Ordered processing, batch jobs, scenarios where data loss is unacceptable
DropOldestParallelStrategy
A strategy that drops the oldest items when the input queue is full, prioritizing newer data. This strategy is suitable for real-time scenarios where freshness is critical.
- Drops oldest items when queue is full
- Prioritizes newer data for processing
- Inherently unordered output
- Best for: Real-time alerts, live dashboards, scenarios where recent data is most valuable
DropNewestParallelStrategy
A strategy that drops the newest items when the input queue is full, preserving older data. This strategy is suitable for analytics scenarios where historical data is important.
- Drops newest items when queue is full
- Preserves older data in the queue
- Inherently unordered output
- Best for: Analytics, batch processing, scenarios where data completeness is important
ParallelExecutionStrategy
A facade strategy that automatically selects the appropriate concrete implementation based on the configured queue policy. This provides a unified interface while allowing runtime strategy selection.
Queue Policies
The package supports different queue policies to handle backpressure when the input queue reaches its capacity:
BoundedQueuePolicy.Block
The default policy that blocks the producer when the queue is full, providing end-to-end flow control.
var options = new ParallelOptions(
MaxQueueLength: 1000, // Maximum queue length
QueuePolicy: BoundedQueuePolicy.Block,
OutputBufferCapacity: 500 // Optional output buffer capacity
);
Performance implications:
- Provides natural backpressure to upstream components
- Prevents memory buildup under sustained load
- May increase latency when queues are full
- Ensures no data loss
BoundedQueuePolicy.DropOldest
Drops the oldest items when the queue is full, making room for newer items.
var options = new ParallelOptions(
MaxQueueLength: 1000,
QueuePolicy: BoundedQueuePolicy.DropOldest
);
Performance implications:
- Maintains constant memory usage
- Prioritizes data freshness
- May cause data loss under sustained load
- Reduces latency for new items
BoundedQueuePolicy.DropNewest
Drops the newest items when the queue is full, preserving older items.
var options = new ParallelOptions(
MaxQueueLength: 1000,
QueuePolicy: BoundedQueuePolicy.DropNewest
);
Performance implications:
- Maintains constant memory usage
- Preserves historical data
- May cause data loss for recent items
- Reduces processing variability
Usage Examples
Basic Parallel Execution
using NPipeline.Extensions.Parallelism;
// Create a pipeline with parallel execution
var pipeline = PipelineBuilder.Create()
.AddSource<MySource, InputData>()
.AddTransform<MyTransform, InputData, OutputData>()
.WithBlockingParallelism(builder, maxDegreeOfParallelism: 4)
.AddSink<MySink, OutputData>()
.Build();
// Execute the pipeline
await pipeline.ExecuteAsync(context);
Different Queue Policies
// Drop oldest policy for real-time processing
var realtimeTransform = builder
.AddTransform<RealtimeTransform, SensorData, AlertData>()
.WithDropOldestParallelism(builder,
maxDegreeOfParallelism: 8,
maxQueueLength: 100);
// Drop newest policy for analytics
var analyticsTransform = builder
.AddTransform<AnalyticsTransform, EventData, MetricsData>()
.WithDropNewestParallelism(builder,
maxDegreeOfParallelism: 4,
maxQueueLength: 1000);
// Custom parallel options
var customTransform = builder
.AddTransform<CustomTransform, InputData, OutputData>()
.WithParallelism(builder,
new ParallelOptions(
MaxDegreeOfParallelism: 6,
MaxQueueLength: 500,
QueuePolicy: BoundedQueuePolicy.DropOldest,
OutputBufferCapacity: 200,
PreserveOrdering: false),
new DropOldestParallelStrategy());
Performance Monitoring
// Access parallel execution metrics after pipeline execution
if (context.TryGetParallelMetrics("transformNodeId", out var metrics))
{
Console.WriteLine($"Processed: {metrics.Processed}");
Console.WriteLine($"Dropped (oldest): {metrics.DroppedOldest}");
Console.WriteLine($"Dropped (newest): {metrics.DroppedNewest}");
Console.WriteLine($"Retry events: {metrics.RetryEvents}");
Console.WriteLine($"Items with retry: {metrics.ItemsWithRetry}");
Console.WriteLine($"Max retry attempts: {metrics.MaxItemRetryAttempts}");
}
Custom Parallel Options
// Create a strategy with custom options
var strategy = ParallelExecutionStrategy.Create(new ParallelOptions(
MaxDegreeOfParallelism: Environment.ProcessorCount * 2,
MaxQueueLength: 10000,
QueuePolicy: BoundedQueuePolicy.Block,
OutputBufferCapacity: 5000,
PreserveOrdering: true
));
// Apply to a specific node
builder.WithExecutionStrategy(transformNodeHandle, strategy);
Performance Considerations
Strategy Selection Guidelines
| Scenario | Recommended Strategy | Reason |
|---|---|---|
| Batch processing with ordering requirements | BlockingParallelStrategy | Preserves order and prevents data loss |
| Real-time alerts with latency sensitivity | DropOldestParallelStrategy | Prioritizes newest data |
| Analytics with data completeness requirements | DropNewestParallelStrategy | Preserves historical data |
| High-throughput with controlled memory usage | Drop policies | Bounded memory usage |
| Variable load with backpressure needs | BlockingParallelStrategy | Natural flow control |
Memory Usage Patterns
- BlockingParallelStrategy: Memory usage scales with queue size and processing speed differences
- DropOldestParallelStrategy: Constant memory usage bounded by MaxQueueLength
- DropNewestParallelStrategy: Constant memory usage bounded by MaxQueueLength
CPU Utilization Optimization
// For CPU-bound workloads
var cpuBoundOptions = new ParallelOptions(
MaxDegreeOfParallelism: Environment.ProcessorCount,
QueuePolicy: BoundedQueuePolicy.Block
);
// For I/O-bound workloads
var ioBoundOptions = new ParallelOptions(
MaxDegreeOfParallelism: Environment.ProcessorCount * 4,
QueuePolicy: BoundedQueuePolicy.DropOldest,
MaxQueueLength: 1000
);
// For mixed workloads
var mixedOptions = new ParallelOptions(
MaxDegreeOfParallelism: Environment.ProcessorCount * 2,
QueuePolicy: BoundedQueuePolicy.Block,
MaxQueueLength: 500,
OutputBufferCapacity: 250
);
License
MIT License - see LICENSE file for details.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.3)
- NPipeline (>= 0.15.0)
-
net8.0
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.3)
- NPipeline (>= 0.15.0)
- System.Threading.Tasks.Dataflow (>= 10.0.3)
-
net9.0
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.3)
- NPipeline (>= 0.15.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 0.18.0 | 0 | 2/25/2026 |
| 0.17.0 | 0 | 2/25/2026 |
| 0.16.0 | 30 | 2/24/2026 |
| 0.15.0 | 76 | 2/19/2026 |
| 0.14.0 | 82 | 2/17/2026 |
| 0.13.1 | 82 | 2/13/2026 |
| 0.13.0 | 88 | 2/13/2026 |
| 0.12.0 | 87 | 2/9/2026 |
| 0.11.0 | 86 | 2/8/2026 |
| 0.10.0 | 92 | 2/6/2026 |
| 0.9.1 | 86 | 2/5/2026 |
| 0.9.0 | 93 | 2/5/2026 |
| 0.8.0 | 86 | 2/3/2026 |
| 0.7.1 | 89 | 2/1/2026 |
| 0.7.0 | 92 | 1/31/2026 |
| 0.6.6 | 87 | 1/21/2026 |
| 0.6.5 | 96 | 1/19/2026 |
| 0.6.4 | 91 | 1/18/2026 |
| 0.6.3 | 91 | 1/14/2026 |
| 0.6.2 | 94 | 1/13/2026 |