NPipeline 0.52.0
dotnet add package NPipeline --version 0.52.0
NuGet\Install-Package NPipeline -Version 0.52.0
<PackageReference Include="NPipeline" Version="0.52.0" />
<PackageVersion Include="NPipeline" Version="0.52.0" />
<PackageReference Include="NPipeline" />
paket add NPipeline --version 0.52.0
#r "nuget: NPipeline, 0.52.0"
#:package NPipeline@0.52.0
#addin nuget:?package=NPipeline&version=0.52.0
#tool nuget:?package=NPipeline&version=0.52.0
NPipeline
High-performance, streaming data pipelines for .NET
NPipeline is a modern .NET library that makes building complex data processing workflows simple and efficient. Whether you're building AI-driven data pipelines, ETL workflows, processing real-time data streams, or crafting sophisticated data transformation workflows, NPipeline provides the tools you need with minimal ceremony.
Why Choose NPipeline?
Built for Performance
Stream data efficiently using IAsyncEnumerable<T> without loading entire datasets into memory. Perfect for processing large files, database results, or
real-time data streams. NPipeline is designed with performance in mind:
Streaming architecture minimizes memory allocations and GC pressure
Zero-reflection execution model with pre-compiled delegates and cached execution plans
Optimized for high-throughput using
ValueTask<T>to eliminate heap allocations for synchronous operations
Graph-Based Architecture
Create sophisticated workflows with multiple data sources, parallel processing, joins, and branching operations. Build complex DAGs (Directed Acyclic Graphs) with ease.
Type-Safe by Design
Strongly-typed nodes with compile-time validation prevent common runtime errors. Your IDE will catch type mismatches before you run your code.
Build-Time Guardrails
Roslyn analyzers flag misconfigured retries, blocking calls, silent data loss, and performance anti-patterns before your pipeline ever runs.
Production-Ready Resilience
Built-in error handling with retry strategies, circuit breakers, and dead letter queues. Create robust pipelines that handle failures gracefully with intelligent backoff and jitter patterns to prevent thundering herd problems.
Designed for Testing
Test each node independently. Pass in test data, assert on outputs. Use the testing extensions to run entire pipelines in memory. First-class support for FluentAssertions and AwesomeAssertions.
Use Cases
NPipeline excels in scenarios where you need to process data efficiently and reliably:
ETL Pipelines - Extract data from databases, APIs, or files; transform it; load it into data warehouses
Data Migration - Move and transform data between systems with validation and error handling
Real-time Processing - Process streaming data from message queues, IoT devices, or APIs
File Processing - Parse, validate, and transform large CSV, JSON, or XML files
API Integration - Fetch data from multiple APIs, combine results, and sync to your systems
Data Validation - Clean and validate data with complex business rules and error reporting
Report Generation - Aggregate data from multiple sources and generate reports or exports
Batch Processing - Process large datasets efficiently with parallel execution and state management
AI Data Enrichment & Intelligent Routing - Augment records with LLM-generated classifications, summaries, or entity extractions, then route to the right downstream system based on content, confidence scores, or business rules
Key Concepts
Nodes are the building blocks of your pipeline:
- Sources generate or fetch data (
SourceNode<TOut>) - Transforms process data item by item (
TransformNode<TIn, TOut>) - Sinks consume data and perform final operations (
SinkNode<TIn>)
Data Streams transport data between nodes as strongly-typed async streams (IDataStream<T>).
Pipeline Context provides logging, cancellation, error handling, and shared state without carrying data payloads.
Execution Strategies control how nodes process data: sequential, parallel, or batched processing. Mix strategies within the same pipeline.
Prerequisites
NPipeline requires .NET 8.0, .NET 9.0 or .NET 10.0 to take advantage of the latest performance improvements and language features.
The core library is dependency-free.
Quick Start
Install the core package to get started:
dotnet add package NPipeline
dotnet add package NPipeline.Extensions.DependencyInjection
Additionally, install the Analyzer package to enable Roslyn analyzers that provide compile-time checks and guardrails:
dotnet add package NPipeline.Analyzers
Simple Example
Here's a basic pipeline that processes customer orders:
// 1. Define your data models
public record Order(int Id, string Customer, decimal Amount);
public record ProcessedOrder(int Id, string Customer, decimal Amount, decimal Tax, decimal Total);
// 2. Create source, transform and sink nodes
public class OrderSource : SourceNode<Order>
{
public override IDataStream<Order> OpenStream(PipelineContext context, CancellationToken cancellationToken)
{
var orders = new[] { new Order(1, "Alice", 100m), new Order(2, "Bob", 250m), new Order(3, "Carol", 75m) };
return new DataStream<Order>(orders.ToAsyncEnumerable());
}
}
public class TaxCalculator : TransformNode<Order, ProcessedOrder>
{
public override Task<ProcessedOrder> TransformAsync(Order order, PipelineContext context, CancellationToken cancellationToken)
{
var tax = order.Amount * 0.08m;
var total = order.Amount + tax;
return Task.FromResult(new ProcessedOrder(order.Id, order.Customer, order.Amount, tax, total));
}
}
public class OrderSink : SinkNode<ProcessedOrder>
{
public override async Task ConsumeAsync(IDataStream<ProcessedOrder> input, PipelineContext context, CancellationToken cancellationToken)
{
await foreach (var order in input.WithCancellation(cancellationToken))
{
Console.WriteLine($"Order {order.Id}: {order.Customer} - Total: ${order.Total:F2}");
}
}
}
// 3. Define the pipeline
public class OrderPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var source = builder.AddSource<OrderSource, Order>("Order Source");
var calculator = builder.AddTransform<TaxCalculator, Order, ProcessedOrder>("Tax Calculator");
var sink = builder.AddSink<OrderSink, ProcessedOrder>("Order Sink");
builder.Connect(source, calculator);
builder.Connect(calculator, sink);
}
}
// 4. Run the pipeline
var services = new ServiceCollection()
.AddNPipeline(typeof(Program).Assembly)
.BuildServiceProvider();
await services.RunPipelineAsync<OrderPipeline>();
Available Packages
NPipeline is modular - install only what you need.
Core Packages
| Package | Purpose |
|---|---|
| NPipeline | Core streaming pipeline functionality (zero dependencies) |
| NPipeline.Analyzers | Roslyn analyzers for compile-time validation, style guidance, and developer guardrails |
| NPipeline.Extensions.DependencyInjection | Integration with Microsoft DI container |
| NPipeline.Extensions.Testing | Core testing utilities with in-memory nodes, test harness, and assertion helpers |
Key Extensions
| Package | Purpose |
|---|---|
| NPipeline.Extensions.Composition | Pipeline composition for treating entire pipelines as nodes within larger pipelines |
| NPipeline.Extensions.Lineage | Data lineage tracking and provenance for governance, auditing, and debugging |
| NPipeline.Extensions.Nodes | Ready-made, granular nodes for common data processing tasks (cleansing, validation, transformation, enrichment, filtering) |
| NPipeline.Extensions.Observability | Observability and telemetry support for pipeline monitoring |
| NPipeline.Extensions.Parallelism | Parallel processing with TPL Dataflow and configurable backpressure strategies |
Connectors
| Package | Purpose |
|---|---|
| NPipeline.Connectors | Core abstractions for data format/storage decoupling |
| NPipeline.Connectors.Csv | CSV source and sink nodes |
| NPipeline.Connectors.Excel | Excel source and sink nodes for XLS and XLSX files |
| NPipeline.Connectors.Json | JSON source and sink nodes for JSON array and NDJSON formats |
| NPipeline.Connectors.Postgres | PostgreSQL database source and sink nodes |
| NPipeline.Connectors.SqlServer | SQL Server database source and sink nodes |
| NPipeline.Connectors.Aws.Sqs | AWS SQS source and sink nodes for message queue processing |
| NPipeline.Connectors.Kafka | Kafka source and sink nodes with batching and transactional support |
| NPipeline.Connectors.RabbitMQ | RabbitMQ source and sink nodes with publisher confirms and dead-lettering |
... And More
For the complete list of all available packages see the Extensions Documentation and Connectors Documentation.
What's Next?
- Installation - Set up NPipeline in your project
- Your First Pipeline - Build a complete pipeline step-by-step
- Key Concepts - Learn the fundamentals
Contributing
Contributions are welcome! Please see our Contributing Guide for details.
License
MIT License - see LICENSE file for details.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.8)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.8)
-
net8.0
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.8)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.8)
-
net9.0
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.8)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.8)
NuGet packages (21)
Showing the top 5 NuGet packages that depend on NPipeline:
| Package | Downloads |
|---|---|
|
NPipeline.StorageProviders
Storage provider abstractions and implementations for NPipeline - interfaces, models, and base classes for storage operations |
|
|
NPipeline.Connectors
Extensible storage abstraction layer for NPipeline - supports pluggable storage providers (file system, cloud storage, etc.) |
|
|
NPipeline.Extensions.Testing
Testing utilities for NPipeline including in-memory nodes, test harness, and assertion helpers |
|
|
NPipeline.Extensions.Parallelism
High-performance parallel execution strategies for NPipeline with configurable backpressure and queue policies |
|
|
NPipeline.Extensions.DependencyInjection
Microsoft.Extensions.DependencyInjection integration for NPipeline - enables automatic node registration and lifetime management |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 0.52.0 | 100 | 5/30/2026 |
| 0.51.1 | 142 | 5/29/2026 |
| 0.51.0 | 132 | 5/29/2026 |
| 0.50.0 | 181 | 5/29/2026 |
| 0.49.3 | 324 | 5/28/2026 |
| 0.49.2 | 415 | 5/27/2026 |
| 0.49.1 | 419 | 5/27/2026 |
| 0.49.0 | 816 | 5/25/2026 |
| 0.48.3 | 819 | 5/22/2026 |
| 0.48.2 | 796 | 5/19/2026 |
| 0.48.1 | 791 | 5/17/2026 |
| 0.48.0 | 789 | 5/17/2026 |
| 0.47.0 | 778 | 5/16/2026 |
| 0.46.0 | 783 | 5/16/2026 |
| 0.45.0 | 787 | 5/15/2026 |
| 0.44.0 | 767 | 5/14/2026 |
| 0.43.0 | 768 | 5/14/2026 |
| 0.42.0 | 801 | 5/8/2026 |
| 0.41.0 | 725 | 4/30/2026 |
| 0.40.3 | 735 | 4/27/2026 |