OakIdeas.Omens.Middleware
0.1.7
dotnet add package OakIdeas.Omens.Middleware --version 0.1.7
NuGet\Install-Package OakIdeas.Omens.Middleware -Version 0.1.7
<PackageReference Include="OakIdeas.Omens.Middleware" Version="0.1.7" />
<PackageVersion Include="OakIdeas.Omens.Middleware" Version="0.1.7" />
<PackageReference Include="OakIdeas.Omens.Middleware" />
paket add OakIdeas.Omens.Middleware --version 0.1.7
#r "nuget: OakIdeas.Omens.Middleware, 0.1.7"
#:package OakIdeas.Omens.Middleware@0.1.7
#addin nuget:?package=OakIdeas.Omens.Middleware&version=0.1.7
#tool nuget:?package=OakIdeas.Omens.Middleware&version=0.1.7
OakIdeas.Omens.Middleware
Middleware abstractions and built-in implementations for OakIdeas.Omens - extensibility interfaces for message processing pipelines.
Overview
This package contains the core middleware interfaces and built-in middleware implementations that allow you to extend OakIdeas.Omens message processing with custom behavior such as logging, validation, transformation, correlation tracking, or error handling.
Features
- IOmenMiddleware<T>: Interface for implementing custom subscribe-side message processing middleware
- IPublishMiddleware<T>: Interface for implementing custom publish-side middleware
- GlobalMiddlewareOptions: Configuration for global middleware applied to all operations
- Built-in Middleware Implementations:
LoggingMiddleware<T>/LoggingPublishMiddleware<T>: Configurable logging for message processingTimingMiddleware<T>/TimingPublishMiddleware<T>: Performance timing and slow message detectionCorrelationMiddleware<T>/CorrelationPublishMiddleware<T>: Distributed tracing with correlation IDsPassthroughMiddleware<T>/PassthroughPublishMiddleware<T>: Base implementations for extension
Installation
dotnet add package OakIdeas.Omens.Middleware
Built-in Middleware
LoggingMiddleware
Logs message processing lifecycle events with configurable templates and log levels:
// Default logging to console
var middleware = new LoggingMiddleware<string>();
// Custom logging configuration
var options = new LoggingMiddlewareOptions
{
Logger = message => _logger.LogInformation(message),
LogOnBefore = true,
LogOnAfter = true,
LogOnError = true,
BeforeMessageTemplate = "START: {Topic} - {MessageId}",
AfterMessageTemplate = "END: {Topic} - {MessageId}",
LogPrefix = "[MyApp]"
};
var middleware = new LoggingMiddleware<string>(options);
// Pre-configured options
var errorsOnly = new LoggingMiddleware<string>(LoggingMiddlewareOptions.ErrorsOnly);
var verbose = new LoggingMiddleware<string>(LoggingMiddlewareOptions.Verbose);
await bus.SubscribeAsync<string>("orders", handler, middleware: middleware);
TimingMiddleware
Measures message processing time with optional slow message detection:
// Default timing (adds to metadata)
var middleware = new TimingMiddleware<string>();
// With slow message detection
var options = TimingMiddlewareOptions.WithSlowMessageDetection(TimeSpan.FromSeconds(1));
var middleware = new TimingMiddleware<string>(options);
// Custom configuration with metrics integration
var options = new TimingMiddlewareOptions
{
TimingCallback = (topic, msgId, elapsed) =>
_metrics.RecordHistogram("message.processing.duration", elapsed.TotalMilliseconds),
SlowMessageThreshold = TimeSpan.FromSeconds(2),
SlowMessageCallback = (topic, msgId, elapsed) =>
_logger.LogWarning("Slow message {MessageId}: {Elapsed}ms", msgId, elapsed.TotalMilliseconds),
AddTimingToMetadata = true,
TimingMetadataKey = "ProcessingTimeMs"
};
var middleware = new TimingMiddleware<string>(options);
CorrelationMiddleware
Ensures messages have correlation IDs for distributed tracing:
// Default correlation (generates GUID if missing)
var middleware = new CorrelationMiddleware<string>();
// Custom configuration
var options = new CorrelationMiddlewareOptions
{
CorrelationIdKey = "X-Request-Id",
GenerateCorrelationId = () => $"order-{Guid.NewGuid():N}",
SetActivityBaggage = true,
SetActivityTag = true,
InheritFromActivity = true,
OnCorrelationIdSet = (topic, msgId, corrId) =>
_logger.LogDebug("Correlation: {CorrelationId}", corrId)
};
var middleware = new CorrelationMiddleware<string>(options);
// For publish-side (enriches messages before publish)
var publishMiddleware = new CorrelationPublishMiddleware<string>();
await bus.PublishAsync("orders", "Order #12345", publishMiddleware);
PassthroughMiddleware
Base implementation for creating custom middleware by inheriting and overriding hooks:
public class CustomMiddleware<T> : PassthroughMiddleware<T>
{
protected override ValueTask OnBeforeNextAsync(Omen<T> message, CancellationToken ct)
{
// Pre-processing logic
message.Metadata["ProcessedAt"] = DateTime.UtcNow;
return ValueTask.CompletedTask;
}
protected override ValueTask OnAfterNextAsync(Omen<T> message, CancellationToken ct)
{
// Post-processing logic
return ValueTask.CompletedTask;
}
protected override ValueTask OnExceptionAsync(Omen<T> message, Exception ex, CancellationToken ct)
{
// Error handling logic
return ValueTask.CompletedTask;
}
}
Custom Middleware
Subscribe-Side Middleware
Subscribe-side middleware processes messages after they are received by a subscriber but before they reach the handler:
using OakIdeas.Omens.Core;
using OakIdeas.Omens.Middleware;
public class ValidationMiddleware<T> : IOmenMiddleware<T>
{
public async ValueTask InvokeAsync(
Omen<T> message,
Func<Omen<T>, CancellationToken, ValueTask> next,
CancellationToken cancellationToken)
{
// Validate message
if (message.Data == null)
throw new InvalidOperationException("Data cannot be null");
await next(message, cancellationToken).ConfigureAwait(false);
}
}
// Usage
await bus.SubscribeAsync<string>("orders", handler, middleware: new ValidationMiddleware<string>());
Publish-Side Middleware
Publish-side middleware executes once per publish operation, before the message is distributed to subscribers:
public class AuditMiddleware<T> : IPublishMiddleware<T>
{
public async ValueTask InvokeAsync(
Omen<T> message,
Func<Omen<T>, CancellationToken, ValueTask> next,
CancellationToken cancellationToken)
{
// Add audit metadata
message.Metadata["PublishedBy"] = Environment.UserName;
message.Metadata["PublishTimestamp"] = DateTime.UtcNow;
await next(message, cancellationToken).ConfigureAwait(false);
}
}
// Usage
await bus.PublishAsync("orders", "Order #12345", new AuditMiddleware<string>());
Global Middleware
Configure middleware that applies to all operations:
var globalMiddleware = new GlobalMiddlewareOptions
{
PublishMiddleware = new IPublishMiddleware<object>[]
{
new CorrelationPublishMiddleware<object>(),
new LoggingPublishMiddleware<object>()
},
SubscribeMiddleware = new IOmenMiddleware<object>[]
{
new LoggingMiddleware<object>(LoggingMiddlewareOptions.Verbose),
new TimingMiddleware<object>()
}
};
var bus = new OmenBus(null, globalMiddleware);
Middleware Pipeline Execution Order
When multiple middleware are chained, they execute in an "onion" pattern:
With middleware [A, B, C] and handler/publish H:
A.pre → B.pre → C.pre → H → C.post → B.post → A.post
Global middleware executes before per-call middleware:
Global.pre → PerCall.pre → H → PerCall.post → Global.post
License
MIT License - see LICENSE file for details.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- OakIdeas.Omens.Core (>= 0.1.7)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on OakIdeas.Omens.Middleware:
| Package | Downloads |
|---|---|
|
OakIdeas.Omens
In-Process Pub/Sub messaging on .NET Channels with support for topics, consumer groups, retry/DLQ, delayed messages, middleware, and tracing hooks. |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 0.1.7 | 1,572 | 12/1/2025 |