OakIdeas.Omens.Middleware 0.1.7

dotnet add package OakIdeas.Omens.Middleware --version 0.1.7
                    
NuGet\Install-Package OakIdeas.Omens.Middleware -Version 0.1.7
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="OakIdeas.Omens.Middleware" Version="0.1.7" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="OakIdeas.Omens.Middleware" Version="0.1.7" />
                    
Directory.Packages.props
<PackageReference Include="OakIdeas.Omens.Middleware" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add OakIdeas.Omens.Middleware --version 0.1.7
                    
#r "nuget: OakIdeas.Omens.Middleware, 0.1.7"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package OakIdeas.Omens.Middleware@0.1.7
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=OakIdeas.Omens.Middleware&version=0.1.7
                    
Install as a Cake Addin
#tool nuget:?package=OakIdeas.Omens.Middleware&version=0.1.7
                    
Install as a Cake Tool

OakIdeas.Omens.Middleware

Middleware abstractions and built-in implementations for OakIdeas.Omens - extensibility interfaces for message processing pipelines.

Overview

This package contains the core middleware interfaces and built-in middleware implementations that allow you to extend OakIdeas.Omens message processing with custom behavior such as logging, validation, transformation, correlation tracking, or error handling.

Features

  • IOmenMiddleware<T>: Interface for implementing custom subscribe-side message processing middleware
  • IPublishMiddleware<T>: Interface for implementing custom publish-side middleware
  • GlobalMiddlewareOptions: Configuration for global middleware applied to all operations
  • Built-in Middleware Implementations:
    • LoggingMiddleware<T> / LoggingPublishMiddleware<T>: Configurable logging for message processing
    • TimingMiddleware<T> / TimingPublishMiddleware<T>: Performance timing and slow message detection
    • CorrelationMiddleware<T> / CorrelationPublishMiddleware<T>: Distributed tracing with correlation IDs
    • PassthroughMiddleware<T> / PassthroughPublishMiddleware<T>: Base implementations for extension

Installation

dotnet add package OakIdeas.Omens.Middleware

Built-in Middleware

LoggingMiddleware

Logs message processing lifecycle events with configurable templates and log levels:

// Default logging to console
var middleware = new LoggingMiddleware<string>();

// Custom logging configuration
var options = new LoggingMiddlewareOptions
{
    Logger = message => _logger.LogInformation(message),
    LogOnBefore = true,
    LogOnAfter = true,
    LogOnError = true,
    BeforeMessageTemplate = "START: {Topic} - {MessageId}",
    AfterMessageTemplate = "END: {Topic} - {MessageId}",
    LogPrefix = "[MyApp]"
};
var middleware = new LoggingMiddleware<string>(options);

// Pre-configured options
var errorsOnly = new LoggingMiddleware<string>(LoggingMiddlewareOptions.ErrorsOnly);
var verbose = new LoggingMiddleware<string>(LoggingMiddlewareOptions.Verbose);

await bus.SubscribeAsync<string>("orders", handler, middleware: middleware);

TimingMiddleware

Measures message processing time with optional slow message detection:

// Default timing (adds to metadata)
var middleware = new TimingMiddleware<string>();

// With slow message detection
var options = TimingMiddlewareOptions.WithSlowMessageDetection(TimeSpan.FromSeconds(1));
var middleware = new TimingMiddleware<string>(options);

// Custom configuration with metrics integration
var options = new TimingMiddlewareOptions
{
    TimingCallback = (topic, msgId, elapsed) =>
        _metrics.RecordHistogram("message.processing.duration", elapsed.TotalMilliseconds),
    SlowMessageThreshold = TimeSpan.FromSeconds(2),
    SlowMessageCallback = (topic, msgId, elapsed) =>
        _logger.LogWarning("Slow message {MessageId}: {Elapsed}ms", msgId, elapsed.TotalMilliseconds),
    AddTimingToMetadata = true,
    TimingMetadataKey = "ProcessingTimeMs"
};
var middleware = new TimingMiddleware<string>(options);

CorrelationMiddleware

Ensures messages have correlation IDs for distributed tracing:

// Default correlation (generates GUID if missing)
var middleware = new CorrelationMiddleware<string>();

// Custom configuration
var options = new CorrelationMiddlewareOptions
{
    CorrelationIdKey = "X-Request-Id",
    GenerateCorrelationId = () => $"order-{Guid.NewGuid():N}",
    SetActivityBaggage = true,
    SetActivityTag = true,
    InheritFromActivity = true,
    OnCorrelationIdSet = (topic, msgId, corrId) =>
        _logger.LogDebug("Correlation: {CorrelationId}", corrId)
};
var middleware = new CorrelationMiddleware<string>(options);

// For publish-side (enriches messages before publish)
var publishMiddleware = new CorrelationPublishMiddleware<string>();
await bus.PublishAsync("orders", "Order #12345", publishMiddleware);

PassthroughMiddleware

Base implementation for creating custom middleware by inheriting and overriding hooks:

public class CustomMiddleware<T> : PassthroughMiddleware<T>
{
    protected override ValueTask OnBeforeNextAsync(Omen<T> message, CancellationToken ct)
    {
        // Pre-processing logic
        message.Metadata["ProcessedAt"] = DateTime.UtcNow;
        return ValueTask.CompletedTask;
    }

    protected override ValueTask OnAfterNextAsync(Omen<T> message, CancellationToken ct)
    {
        // Post-processing logic
        return ValueTask.CompletedTask;
    }

    protected override ValueTask OnExceptionAsync(Omen<T> message, Exception ex, CancellationToken ct)
    {
        // Error handling logic
        return ValueTask.CompletedTask;
    }
}

Custom Middleware

Subscribe-Side Middleware

Subscribe-side middleware processes messages after they are received by a subscriber but before they reach the handler:

using OakIdeas.Omens.Core;
using OakIdeas.Omens.Middleware;

public class ValidationMiddleware<T> : IOmenMiddleware<T>
{
    public async ValueTask InvokeAsync(
        Omen<T> message,
        Func<Omen<T>, CancellationToken, ValueTask> next,
        CancellationToken cancellationToken)
    {
        // Validate message
        if (message.Data == null)
            throw new InvalidOperationException("Data cannot be null");

        await next(message, cancellationToken).ConfigureAwait(false);
    }
}

// Usage
await bus.SubscribeAsync<string>("orders", handler, middleware: new ValidationMiddleware<string>());

Publish-Side Middleware

Publish-side middleware executes once per publish operation, before the message is distributed to subscribers:

public class AuditMiddleware<T> : IPublishMiddleware<T>
{
    public async ValueTask InvokeAsync(
        Omen<T> message,
        Func<Omen<T>, CancellationToken, ValueTask> next,
        CancellationToken cancellationToken)
    {
        // Add audit metadata
        message.Metadata["PublishedBy"] = Environment.UserName;
        message.Metadata["PublishTimestamp"] = DateTime.UtcNow;
        
        await next(message, cancellationToken).ConfigureAwait(false);
    }
}

// Usage
await bus.PublishAsync("orders", "Order #12345", new AuditMiddleware<string>());

Global Middleware

Configure middleware that applies to all operations:

var globalMiddleware = new GlobalMiddlewareOptions
{
    PublishMiddleware = new IPublishMiddleware<object>[]
    {
        new CorrelationPublishMiddleware<object>(),
        new LoggingPublishMiddleware<object>()
    },
    SubscribeMiddleware = new IOmenMiddleware<object>[]
    {
        new LoggingMiddleware<object>(LoggingMiddlewareOptions.Verbose),
        new TimingMiddleware<object>()
    }
};

var bus = new OmenBus(null, globalMiddleware);

Middleware Pipeline Execution Order

When multiple middleware are chained, they execute in an "onion" pattern:

With middleware [A, B, C] and handler/publish H:

A.pre → B.pre → C.pre → H → C.post → B.post → A.post

Global middleware executes before per-call middleware:

Global.pre → PerCall.pre → H → PerCall.post → Global.post

License

MIT License - see LICENSE file for details.

Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on OakIdeas.Omens.Middleware:

Package Downloads
OakIdeas.Omens

In-Process Pub/Sub messaging on .NET Channels with support for topics, consumer groups, retry/DLQ, delayed messages, middleware, and tracing hooks.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
0.1.7 1,572 12/1/2025