NPipeline.Extensions.DependencyInjection 0.16.0

dotnet add package NPipeline.Extensions.DependencyInjection --version 0.16.0
                    
NuGet\Install-Package NPipeline.Extensions.DependencyInjection -Version 0.16.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="NPipeline.Extensions.DependencyInjection" Version="0.16.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="NPipeline.Extensions.DependencyInjection" Version="0.16.0" />
                    
Directory.Packages.props
<PackageReference Include="NPipeline.Extensions.DependencyInjection" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add NPipeline.Extensions.DependencyInjection --version 0.16.0
                    
#r "nuget: NPipeline.Extensions.DependencyInjection, 0.16.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package NPipeline.Extensions.DependencyInjection@0.16.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=NPipeline.Extensions.DependencyInjection&version=0.16.0
                    
Install as a Cake Addin
#tool nuget:?package=NPipeline.Extensions.DependencyInjection&version=0.16.0
                    
Install as a Cake Tool

NPipeline Extensions for DependencyInjection

NPipeline.Extensions.DependencyInjection provides seamless integration between NPipeline and Microsoft.Extensions.DependencyInjection, enabling automatic node discovery, service lifetime management, and dependency injection support for pipeline components.

About NPipeline

NPipeline is a high-performance, extensible data processing framework for .NET that enables developers to build scalable and efficient pipeline-based applications. It provides a rich set of components for data transformation, aggregation, branching, and parallel processing, with built-in support for resilience patterns and error handling.

Installation

dotnet add package NPipeline.Extensions.DependencyInjection

Requirements

  • .NET 8.0, 9.0, or 10.0
  • Microsoft.Extensions.DependencyInjection.Abstractions 10.0.0 or later
  • NPipeline core package

Features

  • Automatic Node Discovery: Scan assemblies and automatically register pipeline components
  • Service Lifetime Management: Control the lifetime of nodes and handlers (Transient, Scoped, Singleton)
  • Dependency Injection Support: Inject dependencies into nodes, error handlers, and other components
  • Assembly Scanning: Automatically discover and register pipeline components from specified assemblies
  • Fluent Configuration API: Intuitive builder pattern for configuring NPipeline services
  • Pipeline Execution from Service Provider: Execute pipelines directly from your DI container
  • Error Handler Registration: Register custom error handlers with DI support
  • Safe Type Loading: Robust assembly scanning that handles reflection exceptions gracefully

Usage

Basic Registration

using Microsoft.Extensions.DependencyInjection;
using NPipeline.Extensions.DependencyInjection;

// Register NPipeline with basic configuration
var services = new ServiceCollection();

services.AddNPipeline(builder =>
{
    builder.AddNode<MyTransformNode>()
           .AddNode<MySourceNode>()
           .AddPipeline<MyDataProcessingPipeline>()
           .AddErrorHandler<MyCustomErrorHandler>();
});

var serviceProvider = services.BuildServiceProvider();

Assembly Scanning

using Microsoft.Extensions.DependencyInjection;
using NPipeline.Extensions.DependencyInjection;

var services = new ServiceCollection();

// Scan the current assembly for pipeline components
services.AddNPipeline(Assembly.GetExecutingAssembly());

// Or scan multiple assemblies
services.AddNPipeline(
    Assembly.GetExecutingAssembly(),
    typeof(MyOtherNode).Assembly,
    typeof(ExternalPipeline).Assembly);

// Or use the builder with assembly scanning
services.AddNPipeline(builder =>
{
    builder.ScanAssemblies(
        Assembly.GetExecutingAssembly(),
        typeof(MyOtherNode).Assembly);
});

Custom Service Lifetimes

using Microsoft.Extensions.DependencyInjection;
using NPipeline.Extensions.DependencyInjection;

var services = new ServiceCollection();

services.AddNPipeline(builder =>
{
    // Register nodes with specific lifetimes
    builder.AddNode<SingletonNode>(ServiceLifetime.Singleton)
           .AddNode<ScopedNode>(ServiceLifetime.Scoped)
           .AddNode<TransientNode>(ServiceLifetime.Transient);

    // Register pipelines with specific lifetimes
    builder.AddPipeline<MyPipeline>(ServiceLifetime.Scoped);

    // Register error handlers with specific lifetimes
    builder.AddErrorHandler<MyErrorHandler>(ServiceLifetime.Singleton)
           .AddPipelineErrorHandler<MyPipelineErrorHandler>(ServiceLifetime.Transient);
});

Pipeline Execution from Service Provider

using Microsoft.Extensions.DependencyInjection;
using NPipeline.Extensions.DependencyInjection;

// Set up services
var services = new ServiceCollection();
services.AddNPipeline(builder =>
{
    builder.AddNode<MySourceNode>()
           .AddNode<MyTransformNode>()
           .AddNode<MySinkNode>()
           .AddPipeline<MyDataPipeline>();
});

var serviceProvider = services.BuildServiceProvider();

// Execute pipeline without parameters
await serviceProvider.RunPipelineAsync<MyDataPipeline>();

// Execute pipeline with parameters
var parameters = new Dictionary<string, object>
{
    ["BatchSize"] = 1000,
    ["ProcessingMode"] = "Fast"
};

await serviceProvider.RunPipelineAsync<MyDataPipeline>(parameters);

Error Handler Registration

using Microsoft.Extensions.DependencyInjection;
using NPipeline.Extensions.DependencyInjection;

var services = new ServiceCollection();

services.AddNPipeline(builder =>
{
    // Register node-specific error handlers
    builder.AddErrorHandler<MyNodeErrorHandler>()
           .AddErrorHandler<RetryErrorHandler>(ServiceLifetime.Singleton);

    // Register pipeline-level error handlers
    builder.AddPipelineErrorHandler<MyPipelineErrorHandler>();

    // Register dead letter sinks for failed items
    builder.AddDeadLetterSink<MyDeadLetterSink>();

    // Register lineage sinks for tracking
    builder.AddLineageSink<MyLineageSink>()
           .AddPipelineLineageSink<MyPipelineLineageSink>();

    // Register lineage sink providers
    builder.AddLineageSinkProvider<MyLineageSinkProvider>();
});

Configuration

Service Registration Options

The NPipeline service builder provides several options for registering components:

services.AddNPipeline(builder =>
{
    // Nodes
    builder.AddNode<TNode>()                    // Transient lifetime
           .AddNode<TNode>(lifetime);           // Specific lifetime

    // Pipeline Definitions
    builder.AddPipeline<TPipeline>()             // Transient lifetime
           .AddPipeline<TPipeline>(lifetime);   // Specific lifetime

    // Error Handlers
    builder.AddErrorHandler<THandler>()          // Transient lifetime
           .AddErrorHandler<THandler>(lifetime) // Specific lifetime
           .AddPipelineErrorHandler<THandler>()  // Transient lifetime
           .AddPipelineErrorHandler<THandler>(lifetime); // Specific lifetime

    // Sinks
    builder.AddDeadLetterSink<TSink>()           // Transient lifetime
           .AddDeadLetterSink<TSink>(lifetime)  // Specific lifetime
           .AddLineageSink<TSink>()              // Transient lifetime
           .AddLineageSink<TSink>(lifetime)     // Specific lifetime
           .AddPipelineLineageSink<TSink>()      // Transient lifetime
           .AddPipelineLineageSink<TSink>(lifetime); // Specific lifetime

    // Providers
    builder.AddLineageSinkProvider<TProvider>()  // Transient lifetime
           .AddLineageSinkProvider<TProvider>(lifetime); // Specific lifetime
});

Assembly Scanning Configuration

Assembly scanning automatically discovers and registers these component types:

  • Nodes: Classes implementing INode
  • Pipeline Definitions: Classes implementing IPipelineDefinition
  • Error Handlers: Classes implementing INodeErrorHandler or IPipelineErrorHandler
  • Sinks: Classes implementing IDeadLetterSink, ILineageSink, or IPipelineLineageSink
  • Providers: Classes implementing IPipelineLineageSinkProvider
// Scan specific assemblies
services.AddNPipeline(builder =>
{
    builder.ScanAssemblies(
        Assembly.GetExecutingAssembly(),
        typeof(ExternalComponent).Assembly);
});

// Or use the direct method
services.AddNPipeline(
    Assembly.GetExecutingAssembly(),
    typeof(ExternalComponent).Assembly);

Lifetime Management

Choose appropriate service lifetimes based on your requirements:

  • Transient: New instance for every request (default)
  • Scoped: One instance per scope (recommended for most nodes)
  • Singleton: Single instance for the application lifetime
services.AddNPipeline(builder =>
{
    // Use Scoped for stateful nodes that need per-request isolation
    builder.AddNode<StatefulTransformNode>(ServiceLifetime.Scoped);

    // Use Singleton for stateless, thread-safe nodes
    builder.AddNode<ThreadSafeValidatorNode>(ServiceLifetime.Singleton);

    // Use Transient for lightweight nodes
    builder.AddNode<SimpleTransformNode>(); // Uses default Transient
});

Advanced Usage

Custom Node with Dependencies

public class MyTransformNode : ITransformNode<Input, Output>
{
    private readonly ILogger<MyTransformNode> _logger;
    private readonly IValidationService _validator;

    public MyTransformNode(ILogger<MyTransformNode> logger, IValidationService validator)
    {
        _logger = logger;
        _validator = validator;
    }

    public async Task<Output> TransformAsync(Input input, PipelineContext context)
    {
        _logger.LogInformation("Processing item: {ItemId}", input.Id);

        if (!_validator.Validate(input))
            throw new ValidationException("Invalid input");

        return new Output { ProcessedData = input.Data.ToUpper() };
    }
}

// Register with DI
var services = new ServiceCollection();
services.AddLogging();
services.AddSingleton<IValidationService, ValidationService>();

services.AddNPipeline(builder =>
{
    builder.AddNode<MyTransformNode>(ServiceLifetime.Scoped);
});

Pipeline with Configuration

public class ConfigurablePipeline : IPipelineDefinition
{
    private readonly IConfiguration _configuration;

    public ConfigurablePipeline(IConfiguration configuration)
    {
        _configuration = configuration;
    }

    public void Define(PipelineBuilder builder)
    {
        var batchSize = _configuration.GetValue<int>("Pipeline:BatchSize", 100);

        builder.Source<DataSourceNode>()
               .Transform<DataTransformNode>()
               .Batch(batchSize)
               .Sink<DataSinkNode>();
    }
}

// Register with configuration
var services = new ServiceCollection();
services.AddSingleton<IConfiguration>(configuration);

services.AddNPipeline(builder =>
{
    builder.AddPipeline<ConfigurablePipeline>(ServiceLifetime.Scoped);
});

Error Handling with DI

public class DatabaseErrorHandler : INodeErrorHandler
{
    private readonly ILogger<DatabaseErrorHandler> _logger;
    private readonly IErrorRepository _errorRepository;

    public DatabaseErrorHandler(ILogger<DatabaseErrorHandler> logger, IErrorRepository errorRepository)
    {
        _logger = logger;
        _errorRepository = errorRepository;
    }

    public async Task<ErrorHandlingResult> HandleAsync(ErrorContext context, CancellationToken cancellationToken)
    {
        _logger.LogError(context.Exception, "Error processing item");

        await _errorRepository.LogErrorAsync(context, cancellationToken);

        return ErrorHandlingResult.Retry;
    }
}

// Register with DI
services.AddNPipeline(builder =>
{
    builder.AddErrorHandler<DatabaseErrorHandler>(ServiceLifetime.Scoped);
});

License

MIT License - see LICENSE file for details.

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
0.16.0 26 2/24/2026
0.15.0 72 2/19/2026
0.14.0 80 2/17/2026
0.13.1 82 2/13/2026
0.13.0 80 2/13/2026
0.12.0 91 2/9/2026
0.11.0 88 2/8/2026
0.10.0 88 2/6/2026
0.9.1 89 2/5/2026
0.9.0 88 2/5/2026
0.8.0 89 2/3/2026
0.7.1 92 2/1/2026
0.7.0 90 1/31/2026
0.6.6 89 1/21/2026
0.6.5 97 1/19/2026
0.6.4 89 1/18/2026
0.6.3 92 1/14/2026
0.6.2 90 1/13/2026
0.6.1 93 1/13/2026
0.6.0 96 1/13/2026
Loading failed