NPipeline.Extensions.Observability 0.15.0

There is a newer version of this package available.
See the version list below for details.
dotnet add package NPipeline.Extensions.Observability --version 0.15.0
                    
NuGet\Install-Package NPipeline.Extensions.Observability -Version 0.15.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="NPipeline.Extensions.Observability" Version="0.15.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="NPipeline.Extensions.Observability" Version="0.15.0" />
                    
Directory.Packages.props
<PackageReference Include="NPipeline.Extensions.Observability" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add NPipeline.Extensions.Observability --version 0.15.0
                    
#r "nuget: NPipeline.Extensions.Observability, 0.15.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package NPipeline.Extensions.Observability@0.15.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=NPipeline.Extensions.Observability&version=0.15.0
                    
Install as a Cake Addin
#tool nuget:?package=NPipeline.Extensions.Observability&version=0.15.0
                    
Install as a Cake Tool

NPipeline.Extensions.Observability

Comprehensive observability, metrics collection, and distributed tracing for NPipeline pipelines.

Features

  • Automatic metrics collection - Node timing and lifecycle metrics are captured automatically
  • Distributed tracing abstraction - Core PipelineActivity wrapper for building custom tracing solutions
  • OpenTelemetry integration - Dedicated NPipeline.Extensions.Observability.OpenTelemetry sub-package for seamless OpenTelemetry backend export
  • Thread-safe metrics collection for concurrent pipeline execution
  • Node-level metrics tracking execution time, throughput, memory usage, and more
  • Pipeline-level metrics aggregating performance across all nodes
  • Flexible sink architecture for logging, monitoring, or custom integrations
  • Seamless DI integration with Microsoft.Extensions.DependencyInjection
  • Per-node observability configuration using fluent WithObservability() extension methods

Installation

dotnet add package NPipeline.Extensions.Observability

For OpenTelemetry backend integration, also install:

dotnet add package NPipeline.Extensions.Observability.OpenTelemetry

Requirements

  • .NET 8.0, 9.0, or 10.0
  • Microsoft.Extensions.DependencyInjection.Abstractions 10.0.0 or later
  • Microsoft.Extensions.Logging.Abstractions 10.0.0 or later
  • NPipeline core package

Quick Start

The simplest way to enable automatic observability is to use IObservablePipelineContextFactory:

using Microsoft.Extensions.DependencyInjection;
using NPipeline.Observability;
using NPipeline.Observability.DependencyInjection;

// Register observability services - this wires everything up automatically
services.AddNPipelineObservability();

// In your pipeline execution code:
await using var scope = host.Services.CreateAsyncScope();
var runner = scope.ServiceProvider.GetRequiredService<IPipelineRunner>();

// Create a context with observability pre-configured
var contextFactory = scope.ServiceProvider.GetRequiredService<IObservablePipelineContextFactory>();
await using var context = contextFactory.Create();

// Run your pipeline - metrics are collected automatically and emitted to sinks!
await runner.RunAsync<MyPipeline>(context);

// Note: Metrics are automatically emitted to registered sinks when the pipeline completes.
// You can also retrieve collected metrics directly:
var collector = scope.ServiceProvider.GetRequiredService<IObservabilityCollector>();
var nodeMetrics = collector.GetNodeMetrics();

Automatic Metrics Emission

When you run a pipeline with observability enabled, metrics are automatically emitted to all registered sinks:

  • On successful completion: All node and pipeline metrics are emitted to sinks
  • On failure: Metrics are emitted including the exception that caused the failure
  • Sinks are invoked asynchronously: All registered IMetricsSink and IPipelineMetricsSink implementations receive metrics

The built-in LoggingMetricsSink and LoggingPipelineMetricsSink will log metrics to your configured logger automatically.

Core Components

IObservablePipelineContextFactory

Factory for creating pipeline contexts with observability pre-configured. The created context automatically has its ExecutionObserver set to MetricsCollectingExecutionObserver, enabling automatic metrics collection during pipeline execution.

IObservabilityCollector

Thread-safe collector for metrics during pipeline execution. Records:

  • Node start/end times
  • Item processing counts
  • Retry attempts
  • Performance metrics (throughput, memory, CPU)

Important: The collector automatically emits metrics to all registered sinks when the pipeline completes or fails.

IMetricsSink

Interface for consuming node-level metrics. Implement to integrate with:

  • Logging frameworks (built-in LoggingMetricsSink)
  • Monitoring systems (Prometheus, Application Insights, etc.)
  • Custom metric storage

All registered sinks are automatically invoked when metrics are emitted.

IPipelineMetricsSink

Interface for consuming pipeline-level metrics aggregated across all nodes.

MetricsCollectingExecutionObserver

IExecutionObserver implementation that automatically collects metrics from pipeline events. This is automatically wired up when using AddNPipelineObservability().

IAutoObservabilityScope

Provides automatic metrics recording for nodes that have observability enabled. Automatically tracks:

  • Items processed and emitted
  • Failures and errors
  • Performance metrics

This scope is automatically created when you use .WithObservability() on a node.

Per-Node Observability Configuration

You can configure observability options for individual nodes using WithObservability() extension method:

public class MyPipeline : IPipelineDefinition
{
    public void Define(PipelineBuilder builder, PipelineContext context)
    {
        var source = builder.AddSource<MySource, int>();

        // Configure with default options (timing, item counts, thread info, performance metrics)
        var transform = builder.AddTransform<MyTransform, int, string>()
            .WithObservability(builder);

        // Configure with full options (includes memory tracking)
        var sink = builder.AddSink<MySink, string>()
            .WithObservability(builder, ObservabilityOptions.Full);

        builder.Connect(source, transform);
        builder.Connect(transform, sink);
    }
}

How it works: When you use .WithObservability(), the system:

  1. Stores the observability options in the node's execution annotations
  2. Creates an IAutoObservabilityScope when the node starts executing
  3. Automatically records item counts, failures, and performance metrics
  4. Disposes the scope when the node completes, ensuring all metrics are captured

ObservabilityOptions Presets

Preset Timing Item Counts Memory Thread Info Performance
Default
Full
Minimal
Disabled

Configuration Options

Default Configuration (Logging Sinks)

services.AddNPipelineObservability();

Custom Sink Types

services.AddNPipelineObservability<CustomMetricsSink, CustomPipelineMetricsSink>();

Factory Delegates

services.AddNPipelineObservability(
    sp => new CustomMetricsSink(sp.GetRequiredService<ILogger>()),
    sp => new CustomPipelineMetricsSink(sp.GetRequiredService<ILogger>()));

Custom Collector

services.AddNPipelineObservability<CustomCollector, CustomMetricsSink, CustomPipelineMetricsSink>();

Metrics Collected

Node Metrics

  • NodeId: Unique identifier
  • StartTime/EndTime: Execution timestamps
  • DurationMs: Execution duration in milliseconds
  • Success: Whether execution succeeded
  • ItemsProcessed/ItemsEmitted: Item counts (automatically tracked when observability is enabled)
  • RetryCount: Number of retry attempts (thread-safe, uses atomic operations)
  • ThroughputItemsPerSec: Processing throughput
  • AverageItemProcessingMs: Average time per item in milliseconds
  • PeakMemoryUsageMb: Peak memory usage during node execution (per-node delta, not global process memory)
  • ProcessorTimeMs: CPU time consumed (not available per-node; only available at process level)
  • ThreadId: Executing thread ID
  • Exception: Any error that occurred

Pipeline Metrics

  • PipelineName: Pipeline identifier
  • RunId: Unique run identifier (GUID)
  • StartTime/EndTime: Pipeline execution timestamps
  • DurationMs: Total execution duration
  • Success: Overall pipeline success
  • TotalItemsProcessed: Sum of all items processed
  • NodeMetrics: Collection of node-level metrics
  • Exception: Any pipeline-level error

Memory Metrics Details

Memory metrics are measured as per-node deltas using GC.GetTotalMemory(false), not global process memory:

  • Each node gets its own memory usage measurement
  • Memory is measured at node start and end using GC.GetTotalMemory(false)
  • The difference (final memory - initial memory) represents the memory allocated during that specific node's execution
  • This provides accurate, isolated memory tracking per node
  • Note: This measures managed memory allocations, not total process memory or peak working set

Important: Memory metrics are only collected when:

  1. The extension is configured with EnableMemoryMetrics = true (via ObservabilityExtensionOptions)
  2. The node has ObservabilityOptions.RecordMemoryUsage = true set

Best Practices

  1. Use scoped collector: Registered as scoped to ensure isolation per pipeline run
  2. Implement async sinks: Use Task RecordAsync for I/O-bound metric operations
  3. Handle failures gracefully: Sinks should not throw exceptions that could disrupt pipelines
  4. Consider performance: Metrics collection adds overhead; profile in production scenarios
  5. Aggregate at pipeline level: Use CreatePipelineMetrics to get comprehensive run summaries
  6. Enable memory tracking selectively: Memory metrics add overhead; use ObservabilityOptions.Full only when needed
  7. Trust automatic emission: Metrics are automatically emitted to sinks on pipeline completion; no manual emission needed

Documentation

Thread Safety

All collector implementations are thread-safe and designed for concurrent use. The ObservabilityCollector uses:

  • ConcurrentDictionary for node metrics storage
  • Interlocked operations for counter updates
  • Interlocked.CompareExchange loop for thread-safe retry counting (prevents lost updates under high contention)

This ensures accurate metrics collection even when multiple nodes execute concurrently or when multiple threads update the same node's metrics.

Performance Considerations

  • Metrics collection overhead is typically < 1% for most pipelines
  • Memory overhead is proportional to the number of nodes (~ 1KB per node)
  • Sink implementations should be async for I/O operations
  • Memory metrics use GC.GetTotalMemory(false) which adds minimal overhead per node when enabled
  • Memory metrics are only collected when both extension-level (EnableMemoryMetrics) and node-level (RecordMemoryUsage) options are enabled
  • Consider sampling or filtering in high-throughput scenarios
  • Disable memory tracking (ObservabilityOptions.Default instead of Full) if not needed to reduce overhead

Troubleshooting

Memory Metrics Not Collected

Problem: Memory metrics are not appearing in collected data.

Solutions:

  1. Verify memory metrics are enabled at extension level: services.AddNPipelineObservability(ObservabilityExtensionOptions.WithMemoryMetrics)
  2. Ensure nodes have memory tracking enabled: .WithObservability(builder, ObservabilityOptions.Full) or set RecordMemoryUsage = true
  3. Memory metrics require both extension-level AND node-level configuration to be enabled

Metrics Not Appearing

Problem: Metrics are not being logged or sent to external systems.

Solutions:

  1. Verify observability is registered: services.AddNPipelineObservability()
  2. Check that the pipeline is using IObservablePipelineContextFactory to create the context
  3. Ensure logging is configured properly for LoggingMetricsSink
  4. Verify sink implementations are not throwing exceptions

Performance Degradation

Problem: Pipeline execution slows down when observability is enabled.

Solutions:

  1. Use async sink implementations
  2. Implement batching or aggregation for external calls
  3. Disable memory tracking (ObservabilityOptions.Default instead of Full) if not needed
  4. Consider sampling for high-volume scenarios

Distributed Tracing

The extension includes PipelineActivity, an implementation of IPipelineActivity that wraps System.Diagnostics.Activity for OpenTelemetry-compatible distributed tracing:

using System.Diagnostics;
using NPipeline.Extensions.Observability.Tracing;
using NPipeline.Observability.Tracing;

// Implement IPipelineTracer
public class DistributedTracer : IPipelineTracer
{
    public IPipelineActivity StartActivity(string name)
    {
        var activity = new Activity(name).Start();
        return activity != null
            ? new PipelineActivity(activity)
            : new NullPipelineActivity();
    }
}

// Register with DI
services.AddSingleton<IPipelineTracer>(new DistributedTracer());

// Use in pipeline context
var context = new PipelineContext(
    PipelineContextConfiguration.WithObservability(
        tracer: provider.GetRequiredService<IPipelineTracer>()
    )
);

OpenTelemetry Integration

Export traces to OpenTelemetry backends like Jaeger or Zipkin:

using OpenTelemetry.Trace;

var tracerProvider = new TracerProviderBuilder()
    .AddSource("MyPipeline")
    .AddJaegerExporter()
    .Build();

License

MIT License - see LICENSE file for details.

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on NPipeline.Extensions.Observability:

Package Downloads
NPipeline.Extensions.Observability.OpenTelemetry

OpenTelemetry integration for NPipeline observability - provides seamless integration with OpenTelemetry SDKs for distributed tracing

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
0.16.0 0 2/24/2026
0.15.0 37 2/19/2026
0.14.0 55 2/17/2026
0.13.1 96 2/13/2026
0.13.0 98 2/13/2026
0.12.0 101 2/9/2026
0.11.0 106 2/8/2026
0.10.0 103 2/6/2026
0.9.1 115 2/5/2026
0.9.0 106 2/5/2026
0.8.0 104 2/3/2026
0.7.1 105 2/1/2026
0.7.0 116 1/31/2026
0.6.6 108 1/21/2026
0.6.5 109 1/19/2026
0.6.4 106 1/18/2026
0.6.3 106 1/14/2026
0.6.2 111 1/13/2026
0.6.1 109 1/13/2026