NPipeline.Extensions.DependencyInjection
0.16.0
dotnet add package NPipeline.Extensions.DependencyInjection --version 0.16.0
NuGet\Install-Package NPipeline.Extensions.DependencyInjection -Version 0.16.0
<PackageReference Include="NPipeline.Extensions.DependencyInjection" Version="0.16.0" />
<PackageVersion Include="NPipeline.Extensions.DependencyInjection" Version="0.16.0" />
<PackageReference Include="NPipeline.Extensions.DependencyInjection" />
paket add NPipeline.Extensions.DependencyInjection --version 0.16.0
#r "nuget: NPipeline.Extensions.DependencyInjection, 0.16.0"
#:package NPipeline.Extensions.DependencyInjection@0.16.0
#addin nuget:?package=NPipeline.Extensions.DependencyInjection&version=0.16.0
#tool nuget:?package=NPipeline.Extensions.DependencyInjection&version=0.16.0
NPipeline Extensions for DependencyInjection
NPipeline.Extensions.DependencyInjection provides seamless integration between NPipeline and Microsoft.Extensions.DependencyInjection, enabling automatic node discovery, service lifetime management, and dependency injection support for pipeline components.
About NPipeline
NPipeline is a high-performance, extensible data processing framework for .NET that enables developers to build scalable and efficient pipeline-based applications. It provides a rich set of components for data transformation, aggregation, branching, and parallel processing, with built-in support for resilience patterns and error handling.
Installation
dotnet add package NPipeline.Extensions.DependencyInjection
Requirements
- .NET 8.0, 9.0, or 10.0
- Microsoft.Extensions.DependencyInjection.Abstractions 10.0.0 or later
- NPipeline core package
Features
- Automatic Node Discovery: Scan assemblies and automatically register pipeline components
- Service Lifetime Management: Control the lifetime of nodes and handlers (Transient, Scoped, Singleton)
- Dependency Injection Support: Inject dependencies into nodes, error handlers, and other components
- Assembly Scanning: Automatically discover and register pipeline components from specified assemblies
- Fluent Configuration API: Intuitive builder pattern for configuring NPipeline services
- Pipeline Execution from Service Provider: Execute pipelines directly from your DI container
- Error Handler Registration: Register custom error handlers with DI support
- Safe Type Loading: Robust assembly scanning that handles reflection exceptions gracefully
Usage
Basic Registration
using Microsoft.Extensions.DependencyInjection;
using NPipeline.Extensions.DependencyInjection;
// Register NPipeline with basic configuration
var services = new ServiceCollection();
services.AddNPipeline(builder =>
{
builder.AddNode<MyTransformNode>()
.AddNode<MySourceNode>()
.AddPipeline<MyDataProcessingPipeline>()
.AddErrorHandler<MyCustomErrorHandler>();
});
var serviceProvider = services.BuildServiceProvider();
Assembly Scanning
using Microsoft.Extensions.DependencyInjection;
using NPipeline.Extensions.DependencyInjection;
var services = new ServiceCollection();
// Scan the current assembly for pipeline components
services.AddNPipeline(Assembly.GetExecutingAssembly());
// Or scan multiple assemblies
services.AddNPipeline(
Assembly.GetExecutingAssembly(),
typeof(MyOtherNode).Assembly,
typeof(ExternalPipeline).Assembly);
// Or use the builder with assembly scanning
services.AddNPipeline(builder =>
{
builder.ScanAssemblies(
Assembly.GetExecutingAssembly(),
typeof(MyOtherNode).Assembly);
});
Custom Service Lifetimes
using Microsoft.Extensions.DependencyInjection;
using NPipeline.Extensions.DependencyInjection;
var services = new ServiceCollection();
services.AddNPipeline(builder =>
{
// Register nodes with specific lifetimes
builder.AddNode<SingletonNode>(ServiceLifetime.Singleton)
.AddNode<ScopedNode>(ServiceLifetime.Scoped)
.AddNode<TransientNode>(ServiceLifetime.Transient);
// Register pipelines with specific lifetimes
builder.AddPipeline<MyPipeline>(ServiceLifetime.Scoped);
// Register error handlers with specific lifetimes
builder.AddErrorHandler<MyErrorHandler>(ServiceLifetime.Singleton)
.AddPipelineErrorHandler<MyPipelineErrorHandler>(ServiceLifetime.Transient);
});
Pipeline Execution from Service Provider
using Microsoft.Extensions.DependencyInjection;
using NPipeline.Extensions.DependencyInjection;
// Set up services
var services = new ServiceCollection();
services.AddNPipeline(builder =>
{
builder.AddNode<MySourceNode>()
.AddNode<MyTransformNode>()
.AddNode<MySinkNode>()
.AddPipeline<MyDataPipeline>();
});
var serviceProvider = services.BuildServiceProvider();
// Execute pipeline without parameters
await serviceProvider.RunPipelineAsync<MyDataPipeline>();
// Execute pipeline with parameters
var parameters = new Dictionary<string, object>
{
["BatchSize"] = 1000,
["ProcessingMode"] = "Fast"
};
await serviceProvider.RunPipelineAsync<MyDataPipeline>(parameters);
Error Handler Registration
using Microsoft.Extensions.DependencyInjection;
using NPipeline.Extensions.DependencyInjection;
var services = new ServiceCollection();
services.AddNPipeline(builder =>
{
// Register node-specific error handlers
builder.AddErrorHandler<MyNodeErrorHandler>()
.AddErrorHandler<RetryErrorHandler>(ServiceLifetime.Singleton);
// Register pipeline-level error handlers
builder.AddPipelineErrorHandler<MyPipelineErrorHandler>();
// Register dead letter sinks for failed items
builder.AddDeadLetterSink<MyDeadLetterSink>();
// Register lineage sinks for tracking
builder.AddLineageSink<MyLineageSink>()
.AddPipelineLineageSink<MyPipelineLineageSink>();
// Register lineage sink providers
builder.AddLineageSinkProvider<MyLineageSinkProvider>();
});
Configuration
Service Registration Options
The NPipeline service builder provides several options for registering components:
services.AddNPipeline(builder =>
{
// Nodes
builder.AddNode<TNode>() // Transient lifetime
.AddNode<TNode>(lifetime); // Specific lifetime
// Pipeline Definitions
builder.AddPipeline<TPipeline>() // Transient lifetime
.AddPipeline<TPipeline>(lifetime); // Specific lifetime
// Error Handlers
builder.AddErrorHandler<THandler>() // Transient lifetime
.AddErrorHandler<THandler>(lifetime) // Specific lifetime
.AddPipelineErrorHandler<THandler>() // Transient lifetime
.AddPipelineErrorHandler<THandler>(lifetime); // Specific lifetime
// Sinks
builder.AddDeadLetterSink<TSink>() // Transient lifetime
.AddDeadLetterSink<TSink>(lifetime) // Specific lifetime
.AddLineageSink<TSink>() // Transient lifetime
.AddLineageSink<TSink>(lifetime) // Specific lifetime
.AddPipelineLineageSink<TSink>() // Transient lifetime
.AddPipelineLineageSink<TSink>(lifetime); // Specific lifetime
// Providers
builder.AddLineageSinkProvider<TProvider>() // Transient lifetime
.AddLineageSinkProvider<TProvider>(lifetime); // Specific lifetime
});
Assembly Scanning Configuration
Assembly scanning automatically discovers and registers these component types:
- Nodes: Classes implementing
INode - Pipeline Definitions: Classes implementing
IPipelineDefinition - Error Handlers: Classes implementing
INodeErrorHandlerorIPipelineErrorHandler - Sinks: Classes implementing
IDeadLetterSink,ILineageSink, orIPipelineLineageSink - Providers: Classes implementing
IPipelineLineageSinkProvider
// Scan specific assemblies
services.AddNPipeline(builder =>
{
builder.ScanAssemblies(
Assembly.GetExecutingAssembly(),
typeof(ExternalComponent).Assembly);
});
// Or use the direct method
services.AddNPipeline(
Assembly.GetExecutingAssembly(),
typeof(ExternalComponent).Assembly);
Lifetime Management
Choose appropriate service lifetimes based on your requirements:
- Transient: New instance for every request (default)
- Scoped: One instance per scope (recommended for most nodes)
- Singleton: Single instance for the application lifetime
services.AddNPipeline(builder =>
{
// Use Scoped for stateful nodes that need per-request isolation
builder.AddNode<StatefulTransformNode>(ServiceLifetime.Scoped);
// Use Singleton for stateless, thread-safe nodes
builder.AddNode<ThreadSafeValidatorNode>(ServiceLifetime.Singleton);
// Use Transient for lightweight nodes
builder.AddNode<SimpleTransformNode>(); // Uses default Transient
});
Advanced Usage
Custom Node with Dependencies
public class MyTransformNode : ITransformNode<Input, Output>
{
private readonly ILogger<MyTransformNode> _logger;
private readonly IValidationService _validator;
public MyTransformNode(ILogger<MyTransformNode> logger, IValidationService validator)
{
_logger = logger;
_validator = validator;
}
public async Task<Output> TransformAsync(Input input, PipelineContext context)
{
_logger.LogInformation("Processing item: {ItemId}", input.Id);
if (!_validator.Validate(input))
throw new ValidationException("Invalid input");
return new Output { ProcessedData = input.Data.ToUpper() };
}
}
// Register with DI
var services = new ServiceCollection();
services.AddLogging();
services.AddSingleton<IValidationService, ValidationService>();
services.AddNPipeline(builder =>
{
builder.AddNode<MyTransformNode>(ServiceLifetime.Scoped);
});
Pipeline with Configuration
public class ConfigurablePipeline : IPipelineDefinition
{
private readonly IConfiguration _configuration;
public ConfigurablePipeline(IConfiguration configuration)
{
_configuration = configuration;
}
public void Define(PipelineBuilder builder)
{
var batchSize = _configuration.GetValue<int>("Pipeline:BatchSize", 100);
builder.Source<DataSourceNode>()
.Transform<DataTransformNode>()
.Batch(batchSize)
.Sink<DataSinkNode>();
}
}
// Register with configuration
var services = new ServiceCollection();
services.AddSingleton<IConfiguration>(configuration);
services.AddNPipeline(builder =>
{
builder.AddPipeline<ConfigurablePipeline>(ServiceLifetime.Scoped);
});
Error Handling with DI
public class DatabaseErrorHandler : INodeErrorHandler
{
private readonly ILogger<DatabaseErrorHandler> _logger;
private readonly IErrorRepository _errorRepository;
public DatabaseErrorHandler(ILogger<DatabaseErrorHandler> logger, IErrorRepository errorRepository)
{
_logger = logger;
_errorRepository = errorRepository;
}
public async Task<ErrorHandlingResult> HandleAsync(ErrorContext context, CancellationToken cancellationToken)
{
_logger.LogError(context.Exception, "Error processing item");
await _errorRepository.LogErrorAsync(context, cancellationToken);
return ErrorHandlingResult.Retry;
}
}
// Register with DI
services.AddNPipeline(builder =>
{
builder.AddErrorHandler<DatabaseErrorHandler>(ServiceLifetime.Scoped);
});
License
MIT License - see LICENSE file for details.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.3)
- NPipeline (>= 0.16.0)
-
net8.0
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.3)
- NPipeline (>= 0.16.0)
-
net9.0
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.3)
- NPipeline (>= 0.16.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 0.16.0 | 26 | 2/24/2026 |
| 0.15.0 | 72 | 2/19/2026 |
| 0.14.0 | 80 | 2/17/2026 |
| 0.13.1 | 82 | 2/13/2026 |
| 0.13.0 | 80 | 2/13/2026 |
| 0.12.0 | 91 | 2/9/2026 |
| 0.11.0 | 88 | 2/8/2026 |
| 0.10.0 | 88 | 2/6/2026 |
| 0.9.1 | 89 | 2/5/2026 |
| 0.9.0 | 88 | 2/5/2026 |
| 0.8.0 | 89 | 2/3/2026 |
| 0.7.1 | 92 | 2/1/2026 |
| 0.7.0 | 90 | 1/31/2026 |
| 0.6.6 | 89 | 1/21/2026 |
| 0.6.5 | 97 | 1/19/2026 |
| 0.6.4 | 89 | 1/18/2026 |
| 0.6.3 | 92 | 1/14/2026 |
| 0.6.2 | 90 | 1/13/2026 |
| 0.6.1 | 93 | 1/13/2026 |
| 0.6.0 | 96 | 1/13/2026 |