NPipeline.Extensions.Observability
0.15.0
See the version list below for details.
dotnet add package NPipeline.Extensions.Observability --version 0.15.0
NuGet\Install-Package NPipeline.Extensions.Observability -Version 0.15.0
<PackageReference Include="NPipeline.Extensions.Observability" Version="0.15.0" />
<PackageVersion Include="NPipeline.Extensions.Observability" Version="0.15.0" />
<PackageReference Include="NPipeline.Extensions.Observability" />
paket add NPipeline.Extensions.Observability --version 0.15.0
#r "nuget: NPipeline.Extensions.Observability, 0.15.0"
#:package NPipeline.Extensions.Observability@0.15.0
#addin nuget:?package=NPipeline.Extensions.Observability&version=0.15.0
#tool nuget:?package=NPipeline.Extensions.Observability&version=0.15.0
NPipeline.Extensions.Observability
Comprehensive observability, metrics collection, and distributed tracing for NPipeline pipelines.
Features
- Automatic metrics collection - Node timing and lifecycle metrics are captured automatically
- Distributed tracing abstraction - Core
PipelineActivitywrapper for building custom tracing solutions - OpenTelemetry integration - Dedicated
NPipeline.Extensions.Observability.OpenTelemetrysub-package for seamless OpenTelemetry backend export - Thread-safe metrics collection for concurrent pipeline execution
- Node-level metrics tracking execution time, throughput, memory usage, and more
- Pipeline-level metrics aggregating performance across all nodes
- Flexible sink architecture for logging, monitoring, or custom integrations
- Seamless DI integration with Microsoft.Extensions.DependencyInjection
- Per-node observability configuration using fluent
WithObservability()extension methods
Installation
dotnet add package NPipeline.Extensions.Observability
For OpenTelemetry backend integration, also install:
dotnet add package NPipeline.Extensions.Observability.OpenTelemetry
Requirements
- .NET 8.0, 9.0, or 10.0
- Microsoft.Extensions.DependencyInjection.Abstractions 10.0.0 or later
- Microsoft.Extensions.Logging.Abstractions 10.0.0 or later
- NPipeline core package
Quick Start
The simplest way to enable automatic observability is to use IObservablePipelineContextFactory:
using Microsoft.Extensions.DependencyInjection;
using NPipeline.Observability;
using NPipeline.Observability.DependencyInjection;
// Register observability services - this wires everything up automatically
services.AddNPipelineObservability();
// In your pipeline execution code:
await using var scope = host.Services.CreateAsyncScope();
var runner = scope.ServiceProvider.GetRequiredService<IPipelineRunner>();
// Create a context with observability pre-configured
var contextFactory = scope.ServiceProvider.GetRequiredService<IObservablePipelineContextFactory>();
await using var context = contextFactory.Create();
// Run your pipeline - metrics are collected automatically and emitted to sinks!
await runner.RunAsync<MyPipeline>(context);
// Note: Metrics are automatically emitted to registered sinks when the pipeline completes.
// You can also retrieve collected metrics directly:
var collector = scope.ServiceProvider.GetRequiredService<IObservabilityCollector>();
var nodeMetrics = collector.GetNodeMetrics();
Automatic Metrics Emission
When you run a pipeline with observability enabled, metrics are automatically emitted to all registered sinks:
- On successful completion: All node and pipeline metrics are emitted to sinks
- On failure: Metrics are emitted including the exception that caused the failure
- Sinks are invoked asynchronously: All registered
IMetricsSinkandIPipelineMetricsSinkimplementations receive metrics
The built-in LoggingMetricsSink and LoggingPipelineMetricsSink will log metrics to your configured logger automatically.
Core Components
IObservablePipelineContextFactory
Factory for creating pipeline contexts with observability pre-configured. The created context
automatically has its ExecutionObserver set to MetricsCollectingExecutionObserver,
enabling automatic metrics collection during pipeline execution.
IObservabilityCollector
Thread-safe collector for metrics during pipeline execution. Records:
- Node start/end times
- Item processing counts
- Retry attempts
- Performance metrics (throughput, memory, CPU)
Important: The collector automatically emits metrics to all registered sinks when the pipeline completes or fails.
IMetricsSink
Interface for consuming node-level metrics. Implement to integrate with:
- Logging frameworks (built-in
LoggingMetricsSink) - Monitoring systems (Prometheus, Application Insights, etc.)
- Custom metric storage
All registered sinks are automatically invoked when metrics are emitted.
IPipelineMetricsSink
Interface for consuming pipeline-level metrics aggregated across all nodes.
MetricsCollectingExecutionObserver
IExecutionObserver implementation that automatically collects metrics from pipeline events.
This is automatically wired up when using AddNPipelineObservability().
IAutoObservabilityScope
Provides automatic metrics recording for nodes that have observability enabled. Automatically tracks:
- Items processed and emitted
- Failures and errors
- Performance metrics
This scope is automatically created when you use .WithObservability() on a node.
Per-Node Observability Configuration
You can configure observability options for individual nodes using WithObservability() extension method:
public class MyPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var source = builder.AddSource<MySource, int>();
// Configure with default options (timing, item counts, thread info, performance metrics)
var transform = builder.AddTransform<MyTransform, int, string>()
.WithObservability(builder);
// Configure with full options (includes memory tracking)
var sink = builder.AddSink<MySink, string>()
.WithObservability(builder, ObservabilityOptions.Full);
builder.Connect(source, transform);
builder.Connect(transform, sink);
}
}
How it works: When you use .WithObservability(), the system:
- Stores the observability options in the node's execution annotations
- Creates an
IAutoObservabilityScopewhen the node starts executing - Automatically records item counts, failures, and performance metrics
- Disposes the scope when the node completes, ensuring all metrics are captured
ObservabilityOptions Presets
| Preset | Timing | Item Counts | Memory | Thread Info | Performance |
|---|---|---|---|---|---|
Default |
✓ | ✓ | ✗ | ✓ | ✓ |
Full |
✓ | ✓ | ✓ | ✓ | ✓ |
Minimal |
✓ | ✗ | ✗ | ✗ | ✗ |
Disabled |
✗ | ✗ | ✗ | ✗ | ✗ |
Configuration Options
Default Configuration (Logging Sinks)
services.AddNPipelineObservability();
Custom Sink Types
services.AddNPipelineObservability<CustomMetricsSink, CustomPipelineMetricsSink>();
Factory Delegates
services.AddNPipelineObservability(
sp => new CustomMetricsSink(sp.GetRequiredService<ILogger>()),
sp => new CustomPipelineMetricsSink(sp.GetRequiredService<ILogger>()));
Custom Collector
services.AddNPipelineObservability<CustomCollector, CustomMetricsSink, CustomPipelineMetricsSink>();
Metrics Collected
Node Metrics
- NodeId: Unique identifier
- StartTime/EndTime: Execution timestamps
- DurationMs: Execution duration in milliseconds
- Success: Whether execution succeeded
- ItemsProcessed/ItemsEmitted: Item counts (automatically tracked when observability is enabled)
- RetryCount: Number of retry attempts (thread-safe, uses atomic operations)
- ThroughputItemsPerSec: Processing throughput
- AverageItemProcessingMs: Average time per item in milliseconds
- PeakMemoryUsageMb: Peak memory usage during node execution (per-node delta, not global process memory)
- ProcessorTimeMs: CPU time consumed (not available per-node; only available at process level)
- ThreadId: Executing thread ID
- Exception: Any error that occurred
Pipeline Metrics
- PipelineName: Pipeline identifier
- RunId: Unique run identifier (GUID)
- StartTime/EndTime: Pipeline execution timestamps
- DurationMs: Total execution duration
- Success: Overall pipeline success
- TotalItemsProcessed: Sum of all items processed
- NodeMetrics: Collection of node-level metrics
- Exception: Any pipeline-level error
Memory Metrics Details
Memory metrics are measured as per-node deltas using GC.GetTotalMemory(false), not global process memory:
- Each node gets its own memory usage measurement
- Memory is measured at node start and end using
GC.GetTotalMemory(false) - The difference (final memory - initial memory) represents the memory allocated during that specific node's execution
- This provides accurate, isolated memory tracking per node
- Note: This measures managed memory allocations, not total process memory or peak working set
Important: Memory metrics are only collected when:
- The extension is configured with
EnableMemoryMetrics = true(viaObservabilityExtensionOptions) - The node has
ObservabilityOptions.RecordMemoryUsage = trueset
Best Practices
- Use scoped collector: Registered as scoped to ensure isolation per pipeline run
- Implement async sinks: Use
Task RecordAsyncfor I/O-bound metric operations - Handle failures gracefully: Sinks should not throw exceptions that could disrupt pipelines
- Consider performance: Metrics collection adds overhead; profile in production scenarios
- Aggregate at pipeline level: Use
CreatePipelineMetricsto get comprehensive run summaries - Enable memory tracking selectively: Memory metrics add overhead; use
ObservabilityOptions.Fullonly when needed - Trust automatic emission: Metrics are automatically emitted to sinks on pipeline completion; no manual emission needed
Documentation
- Configuration Guide
- Usage Examples
- Metrics Reference
- Distributed Tracing
- Main Observability Documentation
Thread Safety
All collector implementations are thread-safe and designed for concurrent use. The ObservabilityCollector uses:
ConcurrentDictionaryfor node metrics storageInterlockedoperations for counter updatesInterlocked.CompareExchangeloop for thread-safe retry counting (prevents lost updates under high contention)
This ensures accurate metrics collection even when multiple nodes execute concurrently or when multiple threads update the same node's metrics.
Performance Considerations
- Metrics collection overhead is typically < 1% for most pipelines
- Memory overhead is proportional to the number of nodes (~ 1KB per node)
- Sink implementations should be async for I/O operations
- Memory metrics use
GC.GetTotalMemory(false)which adds minimal overhead per node when enabled - Memory metrics are only collected when both extension-level (
EnableMemoryMetrics) and node-level (RecordMemoryUsage) options are enabled - Consider sampling or filtering in high-throughput scenarios
- Disable memory tracking (
ObservabilityOptions.Defaultinstead ofFull) if not needed to reduce overhead
Troubleshooting
Memory Metrics Not Collected
Problem: Memory metrics are not appearing in collected data.
Solutions:
- Verify memory metrics are enabled at extension level:
services.AddNPipelineObservability(ObservabilityExtensionOptions.WithMemoryMetrics) - Ensure nodes have memory tracking enabled:
.WithObservability(builder, ObservabilityOptions.Full)or setRecordMemoryUsage = true - Memory metrics require both extension-level AND node-level configuration to be enabled
Metrics Not Appearing
Problem: Metrics are not being logged or sent to external systems.
Solutions:
- Verify observability is registered:
services.AddNPipelineObservability() - Check that the pipeline is using
IObservablePipelineContextFactoryto create the context - Ensure logging is configured properly for
LoggingMetricsSink - Verify sink implementations are not throwing exceptions
Performance Degradation
Problem: Pipeline execution slows down when observability is enabled.
Solutions:
- Use async sink implementations
- Implement batching or aggregation for external calls
- Disable memory tracking (
ObservabilityOptions.Defaultinstead ofFull) if not needed - Consider sampling for high-volume scenarios
Distributed Tracing
The extension includes PipelineActivity, an implementation of IPipelineActivity that wraps System.Diagnostics.Activity for OpenTelemetry-compatible
distributed tracing:
using System.Diagnostics;
using NPipeline.Extensions.Observability.Tracing;
using NPipeline.Observability.Tracing;
// Implement IPipelineTracer
public class DistributedTracer : IPipelineTracer
{
public IPipelineActivity StartActivity(string name)
{
var activity = new Activity(name).Start();
return activity != null
? new PipelineActivity(activity)
: new NullPipelineActivity();
}
}
// Register with DI
services.AddSingleton<IPipelineTracer>(new DistributedTracer());
// Use in pipeline context
var context = new PipelineContext(
PipelineContextConfiguration.WithObservability(
tracer: provider.GetRequiredService<IPipelineTracer>()
)
);
OpenTelemetry Integration
Export traces to OpenTelemetry backends like Jaeger or Zipkin:
using OpenTelemetry.Trace;
var tracerProvider = new TracerProviderBuilder()
.AddSource("MyPipeline")
.AddJaegerExporter()
.Build();
License
MIT License - see LICENSE file for details.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.3)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.3)
- NPipeline (>= 0.15.0)
-
net8.0
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.3)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.3)
- NPipeline (>= 0.15.0)
-
net9.0
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.3)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.3)
- NPipeline (>= 0.15.0)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on NPipeline.Extensions.Observability:
| Package | Downloads |
|---|---|
|
NPipeline.Extensions.Observability.OpenTelemetry
OpenTelemetry integration for NPipeline observability - provides seamless integration with OpenTelemetry SDKs for distributed tracing |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 0.16.0 | 0 | 2/24/2026 |
| 0.15.0 | 37 | 2/19/2026 |
| 0.14.0 | 55 | 2/17/2026 |
| 0.13.1 | 96 | 2/13/2026 |
| 0.13.0 | 98 | 2/13/2026 |
| 0.12.0 | 101 | 2/9/2026 |
| 0.11.0 | 106 | 2/8/2026 |
| 0.10.0 | 103 | 2/6/2026 |
| 0.9.1 | 115 | 2/5/2026 |
| 0.9.0 | 106 | 2/5/2026 |
| 0.8.0 | 104 | 2/3/2026 |
| 0.7.1 | 105 | 2/1/2026 |
| 0.7.0 | 116 | 1/31/2026 |
| 0.6.6 | 108 | 1/21/2026 |
| 0.6.5 | 109 | 1/19/2026 |
| 0.6.4 | 106 | 1/18/2026 |
| 0.6.3 | 106 | 1/14/2026 |
| 0.6.2 | 111 | 1/13/2026 |
| 0.6.1 | 109 | 1/13/2026 |