InzSoftwares.NetPipeline 1.0.0

dotnet add package InzSoftwares.NetPipeline --version 1.0.0
                    
NuGet\Install-Package InzSoftwares.NetPipeline -Version 1.0.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="InzSoftwares.NetPipeline" Version="1.0.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="InzSoftwares.NetPipeline" Version="1.0.0" />
                    
Directory.Packages.props
<PackageReference Include="InzSoftwares.NetPipeline" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add InzSoftwares.NetPipeline --version 1.0.0
                    
#r "nuget: InzSoftwares.NetPipeline, 1.0.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package InzSoftwares.NetPipeline@1.0.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=InzSoftwares.NetPipeline&version=1.0.0
                    
Install as a Cake Addin
#tool nuget:?package=InzSoftwares.NetPipeline&version=1.0.0
                    
Install as a Cake Tool

InzSoftwares.NetPipeline

NuGet Version NuGet Downloads

A sophisticated .NET pipeline processing library that enables building complex data processing workflows through a fluent API. The library provides a robust and extensible framework for processing data through a series of configurable, composable, and resilient steps (pipes).

Table of Contents

Overview

InzSoftwares.NetPipeline is a powerful .NET 9.0 library that implements a fluent pipeline pattern for processing data through a series of configurable, composable, and resilient steps (pipes). The library provides an intuitive API for building data processing pipelines where each pipe performs a specific transformation or operation on data as it flows through the pipeline.

The architecture follows the pipeline pattern with a fluent builder interface that allows for sequential, parallel, conditional, and sub-pipeline execution. InzSoftwares.NetPipeline provides comprehensive error handling, performance monitoring, cancellation support, and resource sharing capabilities to create robust and maintainable data processing workflows.

Key Features

Sequential Execution

Execute pipes one after another in a specified order with full control over data flow:

var pipeline = new PipelineBuilder<InputData, OutputData>();
await pipeline.SetSource(input)
    .AttachContext(context)
    .AttachPipe(new StepOnePipe())
    .AttachPipe(new StepTwoPipe())
    .AttachPipe(new StepThreePipe())
    .Flush();

Parallel Execution

Execute multiple pipes concurrently to improve performance for independent operations:

await pipeline
    .AttachParallelPipes(
        new StepOnePipe(),
        new StepTwoPipe(),
        new StepThreePipe()
    )
    .Flush();

Conditional Execution

Execute pipes based on runtime conditions, allowing for dynamic pipeline behavior:

await pipeline.SetSource(input)
    .AttachContext(context)
    .AttachPipe(new StepOnePipe())
    .AttachConditionalPipe(new StepTwoPipe(), context => context.Input.CanExecutePipeTwo)
    .AttachConditionalPipe(new StepThreePipe(), context => !context.Input.CanExecutePipeTwo)
    .Flush();

Sub-Pipelines

Create reusable pipeline components for better composability and maintainability:

var dataProcessingPipeline = new SubPipeline<InputData, OutputData>(subBuilder =>
    subBuilder
        .AttachPipe(new ValidateInputPipe())
        .AttachPipe(new TransformDataPipe())
        .AttachPipe(new EnrichDataPipe())
);

await pipeline
    .AttachSubPipeline(dataProcessingPipeline)
    .Flush();

Resource Sharing and Context Management

Share data between pipes through the context repository with thread-safe operations:

// In one pipe
context.AddResource("userId", 123);

// In another pipe
var userId = context.GetResource<int>("userId");

// Thread-safe operations
context.TryAddResource("cacheKey", "value"); // Returns false if key exists
context.UpdateResource("existingKey", "newValue");

Advanced Error Handling

Comprehensive error handling with multiple policies and strategies for resilience:

  • Policies (applied during execution):

    • Retry Policy: Automatically retry failed operations during execution
    • Circuit Breaker Policy: Prevent cascading failures by temporarily blocking requests when failures exceed a threshold
    • Fallback Policy: Execute alternative operations when the primary pipe fails during execution
  • Recovery Strategies (applied after failure occurs):

    • Circuit Breaker Strategy: Reactive circuit breaker for post-failure recovery
    • Retry With Backoff Strategy: Retry failed operations with configurable delays and exponential backoff after failure occurs

For more detailed information on implementing these error handling techniques, see the Error Handling Guide in the main repository.

Pipeline Validation

Comprehensive validation system to catch configuration issues early:

// Attach a custom validator (optional - if not attached, default validation is used)
var validator = new DefaultPipelineValidator<InputData, OutputData>();
await pipeline.AttachValidator(validator);

// Validate the pipeline configuration
await pipeline.SetSource(input)
    .AttachContext(context)
    .AttachPipe(new StepOnePipe())
    .AttachPipe(new StepTwoPipe())
    .ValidateConfiguration(); // This will throw an exception if validation fails

// Or get validation results without throwing
var (errors, warnings) = await pipeline.ValidateConfigurationForResult();
if (errors.Any())
{
    // Handle errors
    Console.WriteLine($"Validation errors: {string.Join(", ", errors)}");
}

// Pipes can declare their resource dependencies to enable validation:
public class MyPipe : IPipe<InputData, OutputData>
{
    public async Task Handle(IPipelineContext<InputData, OutputData> context, CancellationToken cancellationToken = default)
    {
        // Pipe implementation
        await Task.CompletedTask;
    }
    
    // Declare required resources
    public IEnumerable<string> GetRequiredResources() => ["user_id", "session_token"];
    
    // Declare provided resources
    public IEnumerable<string> GetProvidedResources() => ["processed_data"];
}

Performance Monitoring and Metrics

Detailed performance tracking with comprehensive metrics collection:

await pipeline.SetSource(input)
    .AttachContext(context)
    .EnablePerformanceMetrics("my-correlation-id")
    .AttachPipe(new StepOnePipe())
    .AttachPipe(new StepTwoPipe())
    .Flush();

Console.WriteLine(context.GetPerformanceMetricsSummary());

// Or access individual metrics
Console.WriteLine($"Total Duration: {context.PerformanceMetrics?.TotalDurationMs:F2} ms");
Console.WriteLine($"Initial Memory: {context.PerformanceMetrics?.MemoryMetrics?.InitialMemoryBytes} bytes");
Console.WriteLine($"Final Memory: {context.PerformanceMetrics?.MemoryMetrics?.FinalMemoryBytes} bytes");

Enhanced Cancellation Support

Granular control over cancellation with proper propagation to sub-pipelines and ICancellablePipe implementation:

var cancellationTokenSource = new CancellationTokenSource();

// Use cancellation with pipeline execution
await pipeline.SetSource(input)
    .AttachContext(context)
    .AttachPipe(new StepOnePipe())
    .AttachPipe(new StepTwoPipe())
    .Flush(cancellationTokenSource.Token);

// Later cancel the operation
cancellationTokenSource.Cancel();

For pipes that need to handle cancellation more gracefully and perform cleanup operations, implement ICancellablePipe:

public class MyCancellablePipe : ICancellablePipe<InputData, OutputData>
{
    public async Task Handle(IPipelineContext<InputData, OutputData> context, CancellationToken cancellationToken = default)
    {
        // Check for cancellation periodically during long-running operations
        for (int i = 0; i < 100; i++)
        {
            cancellationToken.ThrowIfCancellationRequested();
            await Task.Delay(100, cancellationToken);
            // Perform work
        }
    }
    
    public async Task HandleCancellation(IPipelineContext<InputData, OutputData> context, CancellationToken cancellationToken = default)
    {
        // Perform cleanup operations when cancellation is requested
        // This method is called before the pipe execution is terminated
        // allowing for proper resource cleanup and state management
        Console.WriteLine("Performing cleanup before cancellation...");
        await Task.CompletedTask;
    }
}

Type Safety and Async Operations

Full support for strongly-typed generic pipelines with async/await patterns:

public class PipelineBuilder<TIn, TOut> where TOut : class
{
    // Strongly typed pipeline with compile-time safety
}

Installation

The InzSoftwares.NetPipeline library is available as a NuGet package:

Package Manager

Install-Package InzSoftwares.NetPipeline

.NET CLI

dotnet add package InzSoftwares.NetPipeline

Package Reference

<PackageReference Include="InzSoftwares.NetPipeline" Version="x.x.x" />

Getting Started

Prerequisites

  • .NET 9.0 SDK or later

Quick Start

  1. Install the NuGet package into your project as shown above.

  2. Define your data models:

public class InputData
{
    public string Name { get; set; } = string.Empty;
    public int Age { get; set; }
    public string Email { get; set; } = string.Empty;
}

public class OutputData
{
    public bool IsValid { get; set; }
    public string ProcessedName { get; set; } = string.Empty;
    public string HashedEmail { get; set; } = string.Empty;
    public List<string> Warnings { get; set; } = new();
}
  1. Create a pipeline context:
using InzPipeline.Core;

public class ProcessingContext : PipelineContext<InputData, OutputData>
{
    public ProcessingContext()
    {
        Input = new InputData();
        Output = new OutputData();
    }
}
  1. Implement your pipes:
using InzPipeline.Core.Contracts;

public class ValidateInputPipe : IPipe<InputData, OutputData>
{
    public async Task Handle(IPipelineContext<InputData, OutputData> context, 
        CancellationToken cancellationToken = default)
    {
        if (string.IsNullOrEmpty(context.Input.Name))
        {
            context.Output.Warnings.Add("Name is empty");
        }
        
        context.Output.IsValid = !string.IsNullOrEmpty(context.Input.Email) && context.Input.Age >= 0;
        
        await Task.CompletedTask;
    }
}

public class ProcessNamePipe : IPipe<InputData, OutputData>
{
    public async Task Handle(IPipelineContext<InputData, OutputData> context, 
        CancellationToken cancellationToken = default)
    {
        context.Output.ProcessedName = context.Input.Name.ToUpper();
        await Task.CompletedTask;
    }
}
  1. Build and execute your pipeline:
using InzPipeline.Core;

var pipeline = new PipelineBuilder<InputData, OutputData>();
var context = new ProcessingContext();

await pipeline.SetSource(new InputData { Name = "John Doe", Email = "john@example.com", Age = 30 })
    .AttachContext(context)
    .AttachPipe(new ValidateInputPipe())
    .AttachPipe(new ProcessNamePipe())
    .Flush();

Console.WriteLine($"Result: {context.Output.ProcessedName}, Valid: {context.Output.IsValid}");

Detailed Documentation

For comprehensive documentation on specific features and implementation details, visit the main repository:

Usage Examples

Here's a complete example of using the InzSoftwares.NetPipeline library:

1. Define your input and output data models:

public class UserInput
{
    public string Name { get; set; } = string.Empty;
    public int Age { get; set; }
    public string Email { get; set; } = string.Empty;
}

public class UserOutput 
{
    public bool IsValid { get; set; }
    public string ProcessedName { get; set; } = string.Empty;
    public string HashedEmail { get; set; } = string.Empty;
    public List<string> Warnings { get; set; } = new();
}

2. Create a pipeline context:

public class UserProcessingContext : PipelineContext<UserInput, UserOutput>
{
    public UserProcessingContext()
    {
        Input = new UserInput();
        Output = new UserOutput();
    }
}

3. Implement your pipes:

public class ValidateUserPipe : IPipe<UserInput, UserOutput>
{
    public async Task Handle(IPipelineContext<UserInput, UserOutput> context, 
        CancellationToken cancellationToken = default)
    {
        if (string.IsNullOrEmpty(context.Input.Name))
        {
            context.Output.Warnings.Add("Name is empty");
        }
        
        context.Output.IsValid = !string.IsNullOrEmpty(context.Input.Email) && context.Input.Age >= 0;
        
        await Task.CompletedTask;
    }
}

public class ProcessUserNamePipe : IPipe<UserInput, UserOutput>
{
    public async Task Handle(IPipelineContext<UserInput, UserOutput> context, 
        CancellationToken cancellationToken = default)
    {
        context.Output.ProcessedName = context.Input.Name.ToUpper();
        await Task.CompletedTask;
    }
}

public class HashEmailPipe : IPipe<UserInput, UserOutput>
{
    public async Task Handle(IPipelineContext<UserInput, UserOutput> context, 
        CancellationToken cancellationToken = default)
    {
        // Example implementation for hashing email
        context.Output.HashedEmail = Convert.ToBase64String(Encoding.UTF8.GetBytes(context.Input.Email));
        await Task.CompletedTask;
    }
}

4. Build and execute your pipeline:

var pipeline = new PipelineBuilder<UserInput, UserOutput>();
var context = new UserProcessingContext();

await pipeline.SetSource(new UserInput { Name = "John Doe", Email = "john@example.com", Age = 30 })
    .AttachContext(context)
    .AttachPipe(new ValidateUserPipe())
    .AttachPipe(new ProcessUserNamePipe())
    .AttachPipe(new HashEmailPipe())
    .Flush();

Console.WriteLine($"Result: {context.Output.ProcessedName}, Valid: {context.Output.IsValid}");

Advanced Features

Error Handling with Policies

// Retry Policy - automatically retry failed operations during execution
await pipeline.SetSource(input)
    .AttachContext(context)
    .AttachPipeWithRetryPolicy(new ExternalServicePipe(), maxAttempts: 3, delay: TimeSpan.FromSeconds(1))
    .Flush();

// Circuit Breaker Policy - prevent cascading failures during execution
await pipeline.SetSource(input)
    .AttachContext(context)
    .AttachPipeWithCircuitBreakerPolicy(new DatabasePipe(), failureThreshold: 5, timeout: TimeSpan.FromMinutes(2))
    .Flush();

// Fallback Policy - execute alternative operations when primary fails during execution
await pipeline.SetSource(input)
    .AttachContext(context)
    .AttachPipeWithFallbackPolicy(new PrimaryServicePipe(), new FallbackServicePipe())
    .Flush();

Recovery Strategies (After Failure)

// Circuit Breaker Strategy - reactive circuit breaker for post-failure recovery
await pipeline.SetSource(input)
    .AttachContext(context)
    .AttachPipeWithCircuitBreakerStrategy(new UnreliableServicePipe(), failureThreshold: 2, timeout: TimeSpan.FromMinutes(1))
    .Flush();

// Retry With Backoff Strategy - retry failed operations after failure occurs
await pipeline.SetSource(input)
    .AttachContext(context)
    .AttachPipeWithRetryStrategy(new NetworkOperationPipe(), maxAttempts: 3, initialDelay: TimeSpan.FromSeconds(1))
    .Flush();

// Global Recovery Strategy - apply recovery strategy to all pipes in the pipeline
await pipeline.SetSource(input)
    .AttachContext(context)
    .WithRecoveryStrategy(new RetryWithBackoffStrategy<InputData, OutputData>(maxAttempts: 2))
    .AttachPipe(new FlakyOperationPipe())
    .Flush();

Performance Metrics and Monitoring

await pipeline.SetSource(input)
    .AttachContext(context)
    .EnablePerformanceMetrics("my-correlation-id")
    .AttachPipe(new StepOnePipe())
    .AttachPipe(new StepTwoPipe())
    .Flush();

Console.WriteLine($"Total Duration: {context.PerformanceMetrics.TotalDurationMs}ms");

API Reference

PipelineBuilder<TIn, TOut>

The main entry point for building pipelines. Provides methods to attach pipes, set source data, and configure various pipeline behaviors including sequential, parallel, conditional, and sub-pipeline execution. Supports all error handling mechanisms and performance metrics collection.

PipelineContext<TIn, TOut>

Manages data flow between pipes, contains methods for resource sharing using a ConcurrentDictionary for thread-safe operations, error handling with detailed error tracking, and performance metrics collection. Provides methods for adding, getting, updating, and removing resources from the shared repository.

IPipe<TIn, TOut>

Interface that all pipe implementations must implement. Contains a Handle method that performs the pipe's operation and methods for declaring resource dependencies via GetRequiredResources() and GetProvidedResources().

SubPipeline<TIn, TOut>

Allows for creating reusable pipeline components that can be attached to parent pipelines, enabling composition and reusability of pipeline segments.

ICancellablePipe<TIn, TOut>

Specialized interface for pipes that need to handle cancellation more gracefully by implementing a HandleCancellation method to perform cleanup operations when cancellation is requested.

Configuration Options

  • PipeConfiguration<TIn, TOut>: Fine-grained configuration for individual pipes
  • ErrorHandlingOptions<TIn, TOut>: Global error handling configuration
  • PerformanceMetrics: Detailed performance tracking and metrics collection
  • IPipelineValidator<TIn, TOut>: Pipeline validation before execution

Best Practices

Pipeline Design

  • Keep pipes focused on a single responsibility to maintain testability and maintainability
  • Design pipes to be stateless and idempotent when possible to ensure predictable behavior
  • Use meaningful names for your pipe classes that clearly describe their function
  • Implement ICancellablePipe for long-running operations to allow proper resource cleanup

Error Handling

  • Use retry policies for transient failures (network timeouts, temporary service unavailability)
  • Use circuit breakers for unreliable external services to prevent cascading failures
  • Use fallback policies for graceful degradation when primary functionality is unavailable
  • Set context.ContinueOnFailure = true if you want to continue after an error
  • Use the PipelineErrors collection to access detailed error information
  • Implement proper logging within your pipes
  • Configure appropriate retry attempts and delays to avoid overwhelming failing systems
  • Use exception filtering to target specific error types for different handling strategies

Performance

  • Use parallel execution for independent operations to improve throughput
  • Implement cancellation token checks in long-running operations to allow early termination
  • Monitor pipeline metrics to identify bottlenecks and optimize performance
  • Enable performance metrics collection during development and in production environments
  • Consider using sub-pipelines for better organization and testability of complex pipelines

Resource Management

  • Use the context's resource repository for sharing data between pipes efficiently
  • Always use unique keys when adding resources to avoid conflicts in the shared repository
  • Clean up resources when no longer needed to prevent memory leaks
  • Use TryAddResource when you want to avoid exceptions if a resource already exists
  • Prefer resource sharing over complex data structures when pipes need to communicate

Validation and Testing

  • Implement custom validators for complex pipeline validation rules
  • Use GetRequiredResources and GetProvidedResources to enable automatic dependency validation
  • Test individual pipes in isolation as well as complete pipeline flows
  • Validate pipeline configuration before execution in production environments

Performance Monitoring

InzSoftwares.NetPipeline provides comprehensive performance monitoring capabilities:

  • Correlation IDs: Track requests across distributed systems
  • Execution Time Tracking: Monitor individual pipe and overall pipeline execution times
  • Memory Metrics: Track memory usage before and after pipeline execution
  • Custom Metrics: Add your own metrics for business-specific monitoring
  • Detailed Summaries: Get comprehensive performance metrics summaries

Example Usage:

await pipeline.SetSource(input)
    .AttachContext(context)
    .EnablePerformanceMetrics("my-correlation-id")
    .AttachPipe(new StepOnePipe())
    .AttachPipe(new StepTwoPipe())
    .Flush();

// Get a summary of all metrics
Console.WriteLine(context.GetPerformanceMetricsSummary());

// Access specific metrics
Console.WriteLine($"Total Duration: {context.PerformanceMetrics.TotalDurationMs}ms");
Console.WriteLine($"Memory Increase: {context.PerformanceMetrics.MemoryMetrics.MemoryIncrease} bytes");

Project Structure

The InzSoftwares.NetPipeline library is part of the InzPipeline project which includes:

InzPipeline/
├── InzPipeline.Core/           # Core library (this package)
│   ├── Cancellation/           # Cancellation support
│   ├── Configuration/          # Pipeline configuration options
│   ├── Contracts/              # Interface definitions
│   ├── ErrorHandling/          # Error handling policies and strategies
│   ├── Models/                 # Data models used internally
│   ├── Steps/                  # Pipeline step implementations
│   ├── Validation/             # Pipeline validation components
│   ├── PipelineBuilder.cs      # Main pipeline orchestration
│   ├── PipelineContext.cs      # Pipeline context management
│   └── SubPipeline.cs          # Sub-pipeline functionality
├── InzPipeline.Sample/         # Sample application demonstrating usage
└── InzPipeline.Tests/          # Unit and integration tests

Dependencies

  • .NET 9.0 or later
  • Microsoft.Extensions.Logging (included as a package reference)
  • Microsoft.Extensions.DependencyInjection (included as a package reference)

Support & Feedback

If you have questions, issues, or suggestions about InzSoftwares.NetPipeline, please:

  1. Check the documentation in the main repository
  2. Open an issue in the GitHub repository if you encounter bugs or have feature requests
  3. Create a pull request if you'd like to contribute improvements
  4. For detailed API documentation, see the code documentation comments

Contributing

Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.

Development Setup

  1. Create your feature branch: git checkout -b feature/AmazingFeature
  2. Make your changes
  3. Run tests: dotnet test
  4. Commit your changes: git commit -m 'Add some AmazingFeature'
  5. Push to the branch: git push origin feature/AmazingFeature
  6. Open a Pull Request

Code Standards

  • Follow C# coding conventions and .NET best practices
  • Write comprehensive unit tests for all new functionality
  • Update documentation as needed
  • Ensure all tests pass before submitting a pull request

Versioning

We use Semantic Versioning (SemVer) for versioning. For the versions available, see the tags on this repository.

License

This project is licensed under the MIT License - see the LICENSE file for details.


InzSoftwares.NetPipeline - A powerful and flexible pipeline processing library for .NET applications

Product Compatible and additional computed target framework versions.
.NET net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.0.0 74 10/11/2025

Stable release of InzSoftwares.NetPipeline with pipeline processing capabilities.
           - Sequential, parallel, conditional, and sub-pipeline execution
           - Resource sharing with thread-safe operations between pipes
           - Advanced error handling with policies (retry, circuit breaker, fallback) and recovery strategies (circuit breaker strategy, retry with backoff strategy)
           - Pipeline validation with automatic dependency checking
           - Performance monitoring and metrics collection with correlation IDs
           - Cancellation support with ICancellablePipe implementation for graceful cleanup
           - Fluent API for intuitive pipeline construction
           - Type-safe generic pipeline with async/await support