NPipeline.Extensions.Parallelism 0.15.0

There is a newer version of this package available.
See the version list below for details.
dotnet add package NPipeline.Extensions.Parallelism --version 0.15.0
                    
NuGet\Install-Package NPipeline.Extensions.Parallelism -Version 0.15.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="NPipeline.Extensions.Parallelism" Version="0.15.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="NPipeline.Extensions.Parallelism" Version="0.15.0" />
                    
Directory.Packages.props
<PackageReference Include="NPipeline.Extensions.Parallelism" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add NPipeline.Extensions.Parallelism --version 0.15.0
                    
#r "nuget: NPipeline.Extensions.Parallelism, 0.15.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package NPipeline.Extensions.Parallelism@0.15.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=NPipeline.Extensions.Parallelism&version=0.15.0
                    
Install as a Cake Addin
#tool nuget:?package=NPipeline.Extensions.Parallelism&version=0.15.0
                    
Install as a Cake Tool

NPipeline Extensions Parallelism

NPipeline Extensions Parallelism is a high-performance package that provides configurable parallel execution strategies for NPipeline pipelines. This package enables developers to implement efficient parallel processing with configurable backpressure handling, queue policies, and performance monitoring capabilities.

About NPipeline

NPipeline is a high-performance, extensible data processing framework for .NET that enables developers to build scalable and efficient pipeline-based applications. It provides a rich set of components for data transformation, aggregation, branching, and parallel processing, with built-in support for resilience patterns and error handling.

Installation

dotnet add package NPipeline.Extensions.Parallelism

Requirements

  • .NET 8.0, 9.0, or 10.0
  • NPipeline core package
  • System.Threading.Tasks.Dataflow (automatically included for .NET 8.0)

Execution Strategies

The package provides multiple execution strategies to handle different parallel processing scenarios:

BlockingParallelStrategy

The default strategy that preserves ordering and applies end-to-end backpressure using TPL Dataflow. This strategy is ideal for scenarios requiring flow control and ordered output.

  • Preserves input ordering in output
  • Applies backpressure to prevent memory buildup
  • Configurable queue bounds for input and output
  • Best for: Ordered processing, batch jobs, scenarios where data loss is unacceptable

DropOldestParallelStrategy

A strategy that drops the oldest items when the input queue is full, prioritizing newer data. This strategy is suitable for real-time scenarios where freshness is critical.

  • Drops oldest items when queue is full
  • Prioritizes newer data for processing
  • Inherently unordered output
  • Best for: Real-time alerts, live dashboards, scenarios where recent data is most valuable

DropNewestParallelStrategy

A strategy that drops the newest items when the input queue is full, preserving older data. This strategy is suitable for analytics scenarios where historical data is important.

  • Drops newest items when queue is full
  • Preserves older data in the queue
  • Inherently unordered output
  • Best for: Analytics, batch processing, scenarios where data completeness is important

ParallelExecutionStrategy

A facade strategy that automatically selects the appropriate concrete implementation based on the configured queue policy. This provides a unified interface while allowing runtime strategy selection.

Queue Policies

The package supports different queue policies to handle backpressure when the input queue reaches its capacity:

BoundedQueuePolicy.Block

The default policy that blocks the producer when the queue is full, providing end-to-end flow control.

var options = new ParallelOptions(
    MaxQueueLength: 1000,        // Maximum queue length
    QueuePolicy: BoundedQueuePolicy.Block,
    OutputBufferCapacity: 500    // Optional output buffer capacity
);

Performance implications:

  • Provides natural backpressure to upstream components
  • Prevents memory buildup under sustained load
  • May increase latency when queues are full
  • Ensures no data loss

BoundedQueuePolicy.DropOldest

Drops the oldest items when the queue is full, making room for newer items.

var options = new ParallelOptions(
    MaxQueueLength: 1000,
    QueuePolicy: BoundedQueuePolicy.DropOldest
);

Performance implications:

  • Maintains constant memory usage
  • Prioritizes data freshness
  • May cause data loss under sustained load
  • Reduces latency for new items

BoundedQueuePolicy.DropNewest

Drops the newest items when the queue is full, preserving older items.

var options = new ParallelOptions(
    MaxQueueLength: 1000,
    QueuePolicy: BoundedQueuePolicy.DropNewest
);

Performance implications:

  • Maintains constant memory usage
  • Preserves historical data
  • May cause data loss for recent items
  • Reduces processing variability

Usage Examples

Basic Parallel Execution

using NPipeline.Extensions.Parallelism;

// Create a pipeline with parallel execution
var pipeline = PipelineBuilder.Create()
    .AddSource<MySource, InputData>()
    .AddTransform<MyTransform, InputData, OutputData>()
    .WithBlockingParallelism(builder, maxDegreeOfParallelism: 4)
    .AddSink<MySink, OutputData>()
    .Build();

// Execute the pipeline
await pipeline.ExecuteAsync(context);

Different Queue Policies

// Drop oldest policy for real-time processing
var realtimeTransform = builder
    .AddTransform<RealtimeTransform, SensorData, AlertData>()
    .WithDropOldestParallelism(builder,
        maxDegreeOfParallelism: 8,
        maxQueueLength: 100);

// Drop newest policy for analytics
var analyticsTransform = builder
    .AddTransform<AnalyticsTransform, EventData, MetricsData>()
    .WithDropNewestParallelism(builder,
        maxDegreeOfParallelism: 4,
        maxQueueLength: 1000);

// Custom parallel options
var customTransform = builder
    .AddTransform<CustomTransform, InputData, OutputData>()
    .WithParallelism(builder,
        new ParallelOptions(
            MaxDegreeOfParallelism: 6,
            MaxQueueLength: 500,
            QueuePolicy: BoundedQueuePolicy.DropOldest,
            OutputBufferCapacity: 200,
            PreserveOrdering: false),
        new DropOldestParallelStrategy());

Performance Monitoring

// Access parallel execution metrics after pipeline execution
if (context.TryGetParallelMetrics("transformNodeId", out var metrics))
{
    Console.WriteLine($"Processed: {metrics.Processed}");
    Console.WriteLine($"Dropped (oldest): {metrics.DroppedOldest}");
    Console.WriteLine($"Dropped (newest): {metrics.DroppedNewest}");
    Console.WriteLine($"Retry events: {metrics.RetryEvents}");
    Console.WriteLine($"Items with retry: {metrics.ItemsWithRetry}");
    Console.WriteLine($"Max retry attempts: {metrics.MaxItemRetryAttempts}");
}

Custom Parallel Options

// Create a strategy with custom options
var strategy = ParallelExecutionStrategy.Create(new ParallelOptions(
    MaxDegreeOfParallelism: Environment.ProcessorCount * 2,
    MaxQueueLength: 10000,
    QueuePolicy: BoundedQueuePolicy.Block,
    OutputBufferCapacity: 5000,
    PreserveOrdering: true
));

// Apply to a specific node
builder.WithExecutionStrategy(transformNodeHandle, strategy);

Performance Considerations

Strategy Selection Guidelines

Scenario Recommended Strategy Reason
Batch processing with ordering requirements BlockingParallelStrategy Preserves order and prevents data loss
Real-time alerts with latency sensitivity DropOldestParallelStrategy Prioritizes newest data
Analytics with data completeness requirements DropNewestParallelStrategy Preserves historical data
High-throughput with controlled memory usage Drop policies Bounded memory usage
Variable load with backpressure needs BlockingParallelStrategy Natural flow control

Memory Usage Patterns

  • BlockingParallelStrategy: Memory usage scales with queue size and processing speed differences
  • DropOldestParallelStrategy: Constant memory usage bounded by MaxQueueLength
  • DropNewestParallelStrategy: Constant memory usage bounded by MaxQueueLength

CPU Utilization Optimization

// For CPU-bound workloads
var cpuBoundOptions = new ParallelOptions(
    MaxDegreeOfParallelism: Environment.ProcessorCount,
    QueuePolicy: BoundedQueuePolicy.Block
);

// For I/O-bound workloads
var ioBoundOptions = new ParallelOptions(
    MaxDegreeOfParallelism: Environment.ProcessorCount * 4,
    QueuePolicy: BoundedQueuePolicy.DropOldest,
    MaxQueueLength: 1000
);

// For mixed workloads
var mixedOptions = new ParallelOptions(
    MaxDegreeOfParallelism: Environment.ProcessorCount * 2,
    QueuePolicy: BoundedQueuePolicy.Block,
    MaxQueueLength: 500,
    OutputBufferCapacity: 250
);

License

MIT License - see LICENSE file for details.

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
0.18.0 0 2/25/2026
0.17.0 0 2/25/2026
0.16.0 30 2/24/2026
0.15.0 76 2/19/2026
0.14.0 82 2/17/2026
0.13.1 82 2/13/2026
0.13.0 88 2/13/2026
0.12.0 87 2/9/2026
0.11.0 86 2/8/2026
0.10.0 92 2/6/2026
0.9.1 86 2/5/2026
0.9.0 93 2/5/2026
0.8.0 86 2/3/2026
0.7.1 89 2/1/2026
0.7.0 92 1/31/2026
0.6.6 87 1/21/2026
0.6.5 96 1/19/2026
0.6.4 91 1/18/2026
0.6.3 91 1/14/2026
0.6.2 94 1/13/2026
Loading failed