SimplyWorks.Bus
8.1.9
dotnet add package SimplyWorks.Bus --version 8.1.9
NuGet\Install-Package SimplyWorks.Bus -Version 8.1.9
<PackageReference Include="SimplyWorks.Bus" Version="8.1.9" />
<PackageVersion Include="SimplyWorks.Bus" Version="8.1.9" />
<PackageReference Include="SimplyWorks.Bus" />
paket add SimplyWorks.Bus --version 8.1.9
#r "nuget: SimplyWorks.Bus, 8.1.9"
#:package SimplyWorks.Bus@8.1.9
#addin nuget:?package=SimplyWorks.Bus&version=8.1.9
#tool nuget:?package=SimplyWorks.Bus&version=8.1.9
SimplyWorks.Bus
A lightweight .NET 8 message bus library built on top of RabbitMQ, designed for event-driven microservice architectures in ASP.NET Core.
The library ships as three complementary NuGet packages:
| Package | Purpose |
|---|---|
SimplyWorks.Bus |
Core runtime — publishing, consuming, retries, dead-letter, tracing |
SimplyWorks.Bus.RabbitMqExtensions |
Public contracts, consumer interfaces, dashboard data models |
SimplyWorks.Bus.RabbitMqViewer |
Built-in HTMX operations dashboard (dark + light mode, zero JS framework) |
Table of Contents
- Features
- Installation
- Quick Start
- Publishing Messages
- Consuming Messages
- Broadcasting & Listeners
- Per-Queue Configuration
- Extended Consumer Options (IConsumeExtended)
- Monitoring — IConsumerReader
- Error Queue Inspection — IErrorQueueReader
- Operational Events Pipeline
- Dashboard Data Service — IBusDashboardDataService
- Custom Alert Thresholds — IAlertEvaluator
- Building a Custom Dashboard
- Operations Viewer Dashboard
- Full BusOptions Reference
- Architecture
- Testing
- Dependencies
Features
- 🚌 Simple Message Publishing — typed and string-based
IPublish - 📡 Broadcasting — fan-out to all application instances via
IBroadcast/IListen<T> - 🔄 Automatic Retries — configurable per-queue retry counts and delay with dead-letter routing
- 🎯 Typed Consumers —
IConsume<T>with strongly-typed message deserialization - ⚙️ Extended Consumer Options — per-consumer prefetch and priority via
IConsumeExtended - 🔐 JWT Propagation — user context forwarded through message headers across services
- 📊 Queue Monitoring — live queue depth, rates, and consumer counts via
IConsumerReader - 🔍 Error Queue Inspection — peek retry and dead-letter queues via
IErrorQueueReader - 📈 Operational Events — structured lifecycle events with
ActivitySourcetracing andMetermetrics - 🖥️ Built-in Dashboard — HTMX admin UI with dark/light mode toggle, form-based login, CSS refresh animations, and critical status tooltips via
SimplyWorks.Bus.RabbitMqViewer - 🏗️ Custom Dashboard API — all data exposed through
IBusDashboardDataServiceinSimplyWorks.Bus.RabbitMqExtensions— build your own UI without the viewer package - 🧪 Testing Support — mock publisher for unit testing
Installation
# Core — always required
dotnet add package SimplyWorks.Bus
# Public contracts + monitoring interfaces (no RabbitMQ.Client dependency)
dotnet add package SimplyWorks.Bus.RabbitMqExtensions
# Optional: built-in operations dashboard
dotnet add package SimplyWorks.Bus.RabbitMqViewer
Quick Start
1. Connection string
Add RabbitMQ connection string to your appsettings.json:
{
"ConnectionStrings": {
"RabbitMQ": "amqp://guest:guest@localhost:5672/"
}
}
2. Service registration
In your Startup.cs or Program.cs:
services.AddBus(config =>
{
config.ApplicationName = "MyApp";
// Optional JWT configuration
config.Token.Key = Configuration["Token:Key"];
config.Token.Issuer = Configuration["Token:Issuer"];
config.Token.Audience = Configuration["Token:Audience"];
});
services.AddBusPublish(); // registers IPublish, IBroadcast
services.AddBusConsume(); // registers IHostedService consumer + scans calling assembly
Publishing Messages
public class OrderController : ControllerBase
{
private readonly IPublish _publish;
private readonly IBroadcast _broadcast;
public OrderController(IPublish publish, IBroadcast broadcast)
{
_publish = publish;
_broadcast = broadcast;
}
[HttpPost]
public async Task<IActionResult> Create(CreateOrderRequest req)
{
// Routed to consumers subscribed to OrderCreated
await _publish.Publish(new OrderCreated { OrderId = Guid.NewGuid() });
// Fan-out to all connected application instances
await _broadcast.Broadcast(new OrderNotification { Message = "New order" });
return Ok();
}
}
String-based publish (useful for dynamic routing):
await _publish.Publish("OrderCreated", jsonPayload);
await _publish.Publish("OrderCreated", payloadBytes);
Consuming Messages
Typed consumer
public class OrderCreatedConsumer : IConsume<OrderCreated>
{
public async Task Process(OrderCreated message)
{
// Auto-acked on success, rejected/retried on exception
}
}
Multi-message string consumer
public class GenericConsumer : IConsume
{
public Task<IEnumerable<string>> GetMessageTypeNames()
=> Task.FromResult<IEnumerable<string>>(new[] { "OrderCreated", "OrderCancelled" });
public async Task Process(string typeName, string message) { ... }
}
Optional failure handler
public class OrderCreatedConsumer : IConsume<OrderCreated>
{
public async Task Process(OrderCreated message) { ... }
// Called on every failure (including retries) — optional
public async Task OnFail(Exception ex) { ... }
}
Accessing request context
public class SecureConsumer : IConsume<SecureMessage>
{
private readonly RequestContext _ctx;
public SecureConsumer(RequestContext ctx) => _ctx = ctx;
public async Task Process(SecureMessage msg)
{
var user = _ctx.User;
var correlationId = _ctx.CorrelationId;
var remaining = _ctx.GetValue("RemainingRetries");
}
}
Consumer registration
services.AddBusConsume(); // scan calling assembly
services.AddBusConsume(typeof(OrderCreatedConsumer).Assembly); // specific assembly
Broadcasting & Listeners
Broadcasts are fan-out messages delivered to every running instance simultaneously.
// Send
await _broadcast.Broadcast(new PricingUpdated { Version = 42 });
// Trigger live consumer refresh across all instances
await _broadcast.RefreshConsumers();
// Receive
public class PricingUpdatedListener : IListen<PricingUpdated>
{
public async Task Process(PricingUpdated message) { ... }
public async Task OnFail(Exception ex) { ... } // optional
}
services.AddBusListen(); // scan calling assembly
services.AddBusListen(typeof(PricingUpdatedListener).Assembly); // specific assembly
Per-Queue Configuration
Fine-tune individual queues via BusOptions.AddQueueOption. The key is "{ConsumerClass}.{MessageType}" (case-insensitive).
services.AddBus(config =>
{
config.AddQueueOption(
"OrderCreatedConsumer.OrderCreated",
prefetch: 10,
retryCount: 3,
retryAfterSeconds: 30);
// Enable priority queue (0–10 range)
config.AddQueueOption(
"PaymentConsumer.PaymentProcessed",
priority: 10);
});
Extended Consumer Options (IConsumeExtended)
For consumers that need per-instance prefetch or priority without touching global config:
// Typed consumer with runtime options
public class PriorityOrderConsumer : IConsumeExtended<OrderCreated>
{
public async Task Process(OrderCreated message) { ... }
public Task<ConsumerOptions> GetConsumerOptions() =>
Task.FromResult(new ConsumerOptions { Prefetch = 8, Priority = 5 });
}
// Multi-message consumer with per-type options
public class MultiConsumer : IConsumeExtended
{
public Task<IEnumerable<string>> GetMessageTypeNames() =>
Task.FromResult<IEnumerable<string>>(new[] { "TypeA", "TypeB" });
public Task<IDictionary<string, ConsumerOptions>> GetMessageTypeNamesWithOptions() =>
Task.FromResult<IDictionary<string, ConsumerOptions>>(
new Dictionary<string, ConsumerOptions>
{
["TypeA"] = new ConsumerOptions { Prefetch = 4 },
["TypeB"] = new ConsumerOptions { Prefetch = 16, Priority = 3 }
});
public async Task Process(string typeName, string message) { ... }
}
IConsumeExtended consumers are hot-reloadable — call IBroadcast.RefreshConsumers() to apply updated options across all running instances without restart.
Monitoring — IConsumerReader
Registered automatically by AddBus(). Returns live queue statistics from the RabbitMQ Management API with configurable caching.
// All consumers
var all = await _reader.GetAllConsumersCount();
// Typed consumer + message type
var order = await _reader.GetConsumerCount<OrderConsumer, OrderCreated>();
// Multi-message consumer by message name
var generic = await _reader.GetConsumerCount<GenericConsumer>("OrderCreated");
// All queues for a consumer class
var byClass = await _reader.GetConsumerCount<OrderConsumer>();
ConsumerCount fields:
| Field | Description |
|---|---|
Name |
Consumer class name |
MessageName |
Message type name |
TotalNodes |
Active consumer instances |
ProcessingCount |
Messages in-flight (unacknowledged) |
QueueCount |
Messages ready in main queue |
RetryCount |
Messages in retry queue |
FailedCount |
Messages in dead-letter queue |
Priority |
Consumer priority level |
Prefetch |
QoS prefetch count |
IncomingRate |
Publish rate (msg/s) |
ProcessingRate |
Deliver rate to consumers (msg/s) |
AckRate |
Acknowledge rate (msg/s) |
services.AddBus(config =>
{
config.MonitoringCacheSeconds = 10; // cache duration 3–60 s, default 5
config.ManagementUrl = "http://localhost:15672"; // defaults from AMQP URI
config.ManagementUsername = "guest";
config.ManagementPassword = "guest";
config.VirtualHost = "/";
});
Error Queue Inspection — IErrorQueueReader
Peek at messages in retry or dead-letter queues without removing them.
// Typed consumer
var failed = await _errorReader.Peek<OrderConsumer, OrderCreated>(ErrorQueueType.Bad, count: 20);
// Multi-message consumer
var retrying = await _errorReader.Peek<GenericConsumer>("OrderCreated", ErrorQueueType.Retry);
// Raw queue name
var raw = await _errorReader.PeekByQueueName("v3.production.orderconsumer.ordercreated.bad");
ErrorMessage properties:
| Property | Description |
|---|---|
RawBody |
Original JSON payload |
Exchange |
Exchange where message was published |
RoutingKey |
Routing key used at publish |
Properties |
All AMQP properties |
Headers |
Extracted AMQP headers |
CorrelationId |
Correlation ID if set |
ExceptionHistory |
All recorded exceptions oldest-first |
LastException |
Most recent exception string |
Operational Events Pipeline
SW.Bus emits strongly-typed lifecycle events that flow through a lock-free buffered pipeline. The goal is observability and diagnostics, not logging.
Events emitted
| Event | Trigger |
|---|---|
PublishStarted |
Message about to be published |
PublishCompleted |
Publish succeeded (includes duration ms, payload bytes) |
PublishFailed |
Publish threw an exception |
MessageProcessingStarted |
Consumer received and started processing |
MessageProcessingCompleted |
Processed successfully (includes duration ms) |
MessageProcessingFailed |
Consumer threw an exception (type, message, stack trace) |
MessageRetryScheduled |
Message rejected back to retry queue |
MessageMovedToDeadLetter |
Message exhausted retries |
ConsumerConnected |
Consumer channel attached to a queue |
ConsumerDisconnected |
Consumer channel shut down |
QueueBackpressureDetected |
Queue depth exceeded QueueBackpressureThreshold |
Every event carries: TimestampUtc, SchemaVersion, EventName, MachineName, Environment, ApplicationName, Exchange, QueueName, ConsumerName, MessageType, MessageId, CorrelationId, CausationId, TraceId, SpanId, DeliveryTag.
Pipeline architecture
Consumer / Publisher hot path
│ (non-blocking TryWrite)
▼
BoundedChannel<IOperationalEvent> ← configurable capacity, drop-oldest on full
│
▼ (BackgroundService, configurable flush interval)
OperationalEventDispatcher
├── InMemoryOperationalEventStore (ring buffer — always present)
└── [your IOperationalEventBatchSink registrations]
Plugging in a custom external sink
using SW.Bus.RabbitMqExtensions;
public class ElasticsearchSink : IOperationalEventBatchSink
{
public async Task PublishBatch(IReadOnlyList<IOperationalEvent> events,
CancellationToken cancellationToken = default)
{
await _esClient.BulkAsync(events, cancellationToken);
}
}
// Sinks stack additively — InMemoryStore is always active alongside yours
services.AddSingleton<IOperationalEventBatchSink, ElasticsearchSink>();
OpenTelemetry tracing
services.AddOpenTelemetry()
.WithTracing(b => b
.AddSource("SimplyWorks.Bus") // ActivitySource name
.AddOtlpExporter());
Prometheus / OpenTelemetry metrics (meter name: "SimplyWorks.Bus")
services.AddOpenTelemetry()
.WithMetrics(b => b
.AddMeter("SimplyWorks.Bus")
.AddPrometheusExporter());
| Instrument | Type | Description |
|---|---|---|
sw_bus_publish_started_total |
Counter | Publish attempts |
sw_bus_publish_completed_total |
Counter | Successful publishes |
sw_bus_publish_failed_total |
Counter | Failed publishes |
sw_bus_processing_started_total |
Counter | Messages picked up |
sw_bus_processing_completed_total |
Counter | Messages processed successfully |
sw_bus_processing_failed_total |
Counter | Messages that threw exceptions |
sw_bus_retry_scheduled_total |
Counter | Messages sent to retry queue |
sw_bus_dead_letter_total |
Counter | Messages moved to dead-letter |
sw_bus_operational_event_dropped_total |
Counter | Events dropped (buffer full) |
sw_bus_processing_latency_ms |
Histogram | Consumer processing time |
sw_bus_publish_latency_ms |
Histogram | Publish time |
Pipeline configuration
services.AddBus(config =>
{
config.OperationalEventsEnabled = true; // default
config.OperationalEventsBufferCapacity = 8192; // channel buffer before drop
config.OperationalEventsBatchSize = 256; // events per flush batch
config.OperationalEventsFlushIntervalMs = 1000; // ms between flushes
config.OperationalEventsDropOldest = true; // drop strategy when full
config.OperationalEventsSchemaVersion = "1.0"; // stamped on every event
config.OperationalEventsStoreCapacity = 10000; // in-memory ring buffer size
});
Dashboard Data Service — IBusDashboardDataService
All dashboard data is exposed through IBusDashboardDataService (defined in SimplyWorks.Bus.RabbitMqExtensions, implemented and registered by SimplyWorks.Bus). Inject it directly to build your own custom dashboard, REST API, or health probe — no dependency on SimplyWorks.Bus.RabbitMqViewer needed. See Building a Custom Dashboard for a complete guide.
public class OpsController : ControllerBase
{
private readonly IBusDashboardDataService _dash;
public OpsController(IBusDashboardDataService dash) => _dash = dash;
[HttpGet("ops/summary")]
public async Task<IActionResult> Summary()
=> Ok(await _dash.GetSummaryAsync());
[HttpGet("ops/consumers")]
public async Task<IActionResult> Consumers()
=> Ok(await _dash.GetConsumerHealthAsync());
[HttpGet("ops/queues")]
public async Task<IActionResult> Queues()
=> Ok(await _dash.GetQueueDetailsAsync());
[HttpGet("ops/retries")]
public async Task<IActionResult> Retries()
=> Ok(await _dash.GetRetryAnalysisAsync());
[HttpGet("ops/dead-letters")]
public async Task<IActionResult> DeadLetters()
=> Ok(await _dash.GetDeadLetterSummaryAsync());
[HttpGet("ops/alerts")]
public async Task<IActionResult> Alerts()
=> Ok(await _dash.GetAlertsAsync());
[HttpGet("ops/events")]
public IActionResult Events(
[FromQuery] string? consumer, [FromQuery] string? messageType,
[FromQuery] string? correlationId, [FromQuery] string? traceId,
[FromQuery] string? eventName, [FromQuery] int limit = 200)
=> Ok(_dash.GetRecentEvents(new OperationalEventFilter(
ConsumerName: consumer, MessageType: messageType,
CorrelationId: correlationId, TraceId: traceId,
EventName: eventName, Limit: limit)));
}
View models:
| Record | Key fields |
|---|---|
DashboardSummary |
TotalConsumers, UnhealthyConsumers, DisconnectedConsumers, TotalQueueDepth, TotalRetryBacklog, TotalDeadLetterBacklog, TotalIncomingRate, TotalAckRate, ActiveAlerts, LastUpdatedUtc |
ConsumerHealthView |
All ConsumerCount fields + QueueName, IsBackpressured, HealthStatus (AlertSeverity) |
QueueDetailView |
Main/retry/dead-letter queue names and depths, consumer count, rates |
RetryAnalysisView |
Per-consumer retry backlog ordered by size with severity |
DeadLetterSummaryView |
Per-consumer DL count, last exception type/message, last failure timestamp |
DashboardAlert |
Severity (Info/Warning/Critical), Title, Detail, QueueName, ConsumerName, TimestampUtc |
OperationalEventFilter fields (all optional, string fields are case-insensitive substring matches):
ApplicationName, ConsumerName, MessageType, CorrelationId, TraceId, QueueName, EventName (exact), From, To, Limit (default 200)
Custom Alert Thresholds — IAlertEvaluator
Default thresholds are configured on BusOptions:
services.AddBus(config =>
{
config.AlertRetryWarningThreshold = 10; // Warning when retry backlog ≥ N
config.AlertRetryCriticalThreshold = 100; // Critical when retry backlog ≥ N
config.AlertDeadLetterCriticalThreshold = 100; // Critical when DL count ≥ N
config.QueueBackpressureThreshold = 5000; // backpressure warning threshold
});
To apply domain-specific or SLA-based thresholds, replace the default evaluator:
public class MyAlertEvaluator : IAlertEvaluator
{
public IReadOnlyList<DashboardAlert> Evaluate(ConsumerHealthView[] consumers)
{
var alerts = new List<DashboardAlert>();
foreach (var c in consumers)
{
if (c.Name == "PaymentConsumer" && c.FailedCount > 0)
alerts.Add(new DashboardAlert(
AlertSeverity.Critical, "Payment Dead Letter",
$"{c.FailedCount} failed payments require immediate attention.",
c.QueueName, c.Name, DateTime.UtcNow));
}
return alerts;
}
}
// Register before AddBus() — TryAddSingleton means yours wins
services.AddSingleton<IAlertEvaluator, MyAlertEvaluator>();
services.AddBus(...);
Building a Custom Dashboard
If you do not want to use SimplyWorks.Bus.RabbitMqViewer — e.g. you want React, Blazor, an existing admin framework, or a JSON API for a mobile app — everything you need is in SimplyWorks.Bus.RabbitMqExtensions. You never need to install the viewer package.
Package requirements
dotnet add package SimplyWorks.Bus # core runtime (always required)
dotnet add package SimplyWorks.Bus.RabbitMqExtensions # contracts + data service
# SimplyWorks.Bus.RabbitMqViewer is NOT needed
What the library provides
All interfaces below are registered automatically by services.AddBus(...). Just inject what you need.
| Interface | Description |
|---|---|
IBusDashboardDataService |
Aggregate read layer — summary, consumer health, queues, retries, dead letters, alerts, events |
IConsumerReader |
Raw per-consumer queue statistics from the RabbitMQ Management API (with configurable caching) |
IErrorQueueReader |
Peek at messages in retry and dead-letter queues without removing them |
IOperationalEventStore |
Query the in-memory event ring buffer with OperationalEventFilter |
IAlertEvaluator |
Evaluate a ConsumerHealthView[] snapshot and return DashboardAlert objects |
IOperationalEventBatchSink |
Implement to stream event batches to external systems (Elasticsearch, ClickHouse, etc.) |
Minimal REST API example (no viewer package)
// Program.cs
builder.Services.AddBus(config => { config.ApplicationName = "MyApp"; /* ... */ });
builder.Services.AddBusPublish();
builder.Services.AddBusConsume();
var app = builder.Build();
app.MapGet("/ops/summary", async (IBusDashboardDataService d) => await d.GetSummaryAsync());
app.MapGet("/ops/consumers", async (IBusDashboardDataService d) => await d.GetConsumerHealthAsync());
app.MapGet("/ops/queues", async (IBusDashboardDataService d) => await d.GetQueueDetailsAsync());
app.MapGet("/ops/retries", async (IBusDashboardDataService d) => await d.GetRetryAnalysisAsync());
app.MapGet("/ops/dead-letters", async (IBusDashboardDataService d) => await d.GetDeadLetterSummaryAsync());
app.MapGet("/ops/alerts", async (IBusDashboardDataService d) => await d.GetAlertsAsync());
app.MapGet("/ops/events", (IBusDashboardDataService d,
string? consumer, string? messageType,
string? eventName, int limit = 100) =>
d.GetRecentEvents(new OperationalEventFilter(
ConsumerName: consumer, MessageType: messageType,
EventName: eventName, Limit: limit)));
app.Run();
Querying the operational event store directly
public class MyOpsService
{
private readonly IOperationalEventStore _store;
public MyOpsService(IOperationalEventStore store) => _store = store;
public IReadOnlyList<IOperationalEvent> GetRecentFailures(int limit = 50) =>
_store.GetRecent(new OperationalEventFilter(
EventName: "MessageProcessingFailed",
Limit: limit));
public long TotalEventsSinceStartup => _store.TotalReceived;
}
Pattern matching on event records
All events are strongly typed C# records inheriting from OperationalEventBase. Use pattern matching to extract type-specific fields:
foreach (var evt in _store.GetRecent())
{
switch (evt)
{
case MessageProcessingFailed f:
Console.WriteLine($"[FAIL] {f.ConsumerName} — {f.ExceptionType}: {f.ExceptionMessage}");
break;
case MessageProcessingCompleted c:
Console.WriteLine($"[OK] {c.ConsumerName} in {c.ProcessingDurationMs:F1} ms");
break;
case MessageRetryScheduled r:
Console.WriteLine($"[RETRY] {r.ConsumerName} attempt {r.RetryCount}, {r.RemainingRetryCount} remaining");
break;
case MessageMovedToDeadLetter dl:
Console.WriteLine($"[DLQ] {dl.ConsumerName} → {dl.DeadLetterRoutingKey}");
break;
case QueueBackpressureDetected bp:
Console.WriteLine($"[PRESSURE] {bp.QueueName} depth={bp.QueueDepth} threshold={bp.Threshold}");
break;
case ConsumerConnected cc:
Console.WriteLine($"[CONNECT] {cc.ConsumerName} tag={cc.ConsumerTag}");
break;
case ConsumerDisconnected cd:
Console.WriteLine($"[DISCONN] {cd.ConsumerName} reason={cd.Reason}");
break;
case PublishFailed pf:
Console.WriteLine($"[PUB FAIL] {pf.MessageType} — {pf.ExceptionType}");
break;
}
}
Replacing or extending the in-memory event store
Register IOperationalEventBatchSink to stream events alongside the built-in ring buffer:
public class ElasticsearchSink : IOperationalEventBatchSink
{
public async Task PublishBatch(IReadOnlyList<IOperationalEvent> events,
CancellationToken cancellationToken = default)
=> await _esClient.BulkAsync(events, cancellationToken);
}
services.AddSingleton<IOperationalEventBatchSink, ElasticsearchSink>();
services.AddBus(...); // in-memory store and your sink both active
To replace query access with your own persistent store:
public class ClickHouseEventStore : IOperationalEventStore, IOperationalEventBatchSink
{
public long TotalReceived => /* query row count */;
public IReadOnlyList<IOperationalEvent> GetRecent(OperationalEventFilter? filter = null)
=> /* translate filter → SQL → query */;
public async Task PublishBatch(IReadOnlyList<IOperationalEvent> events,
CancellationToken ct = default)
=> await _clickHouse.BulkInsertAsync(events, ct);
}
// Register BEFORE AddBus() — TryAddSingleton means yours wins
services.AddSingleton<ClickHouseEventStore>();
services.AddSingleton<IOperationalEventStore>(sp => sp.GetRequiredService<ClickHouseEventStore>());
services.AddSingleton<IOperationalEventBatchSink>(sp => sp.GetRequiredService<ClickHouseEventStore>());
services.AddBus(...);
Health probe using ConsumerHealthView.HealthStatus
builder.Services.AddHealthChecks()
.AddAsyncCheck("bus-consumers", async (IBusDashboardDataService dash, ct) =>
{
var consumers = await dash.GetConsumerHealthAsync(ct);
var disconnected = consumers.Where(c => c.TotalNodes == 0).ToList();
var critical = consumers.Where(c => c.HealthStatus == AlertSeverity.Critical).ToList();
if (disconnected.Any())
return HealthCheckResult.Unhealthy(
$"{disconnected.Count} consumer(s) disconnected: " +
string.Join(", ", disconnected.Select(c => c.Name)));
if (critical.Any())
return HealthCheckResult.Degraded($"{critical.Count} consumer(s) in critical state.");
return HealthCheckResult.Healthy($"{consumers.Length} consumer(s) all healthy.");
});
Operations Viewer Dashboard
SimplyWorks.Bus.RabbitMqViewer adds a server-rendered operations dashboard built with Pico.css + HTMX. No JavaScript framework required. All tables auto-refresh via HTMX polling with CSS animations.
dotnet add package SimplyWorks.Bus.RabbitMqViewer
Pages
| Route | Content |
|---|---|
/bus-viewer |
Overview — summary cards, consumer health table, active alerts, live event feed |
/bus-viewer/consumers |
Consumer Health — full grid with status badges, rates, prefetch, priority |
/bus-viewer/queues |
Queue Details — main + retry + dead-letter depths and rates per queue |
/bus-viewer/retries |
Retry Analysis — consumers with retry backlogs ordered by severity |
/bus-viewer/dead-letters |
Dead-Letter Inspection — counts, last exception, last failure timestamp |
/bus-viewer/events |
Live Events — filterable operational event stream with inline exception details |
/bus-viewer/login |
Login page — form-based credential entry (no browser credential caching) |
/bus-viewer/logout |
Logout — clears session cookie and redirects to login |
UI features
- 🌗 Dark / Light mode toggle — persisted in
localStorage; theme restored before first paint to prevent flash - ⚠️ Critical status tooltips — hovering a "⚠ Critical" badge shows a bullet list of exactly why the consumer is critical (disconnected nodes, dead-letter count, retry count, backpressure, publish/ack imbalance)
- ✨ Live refresh animations — a blue-purple shimmer bar sweeps the top edge of each table zone while loading; each
<tbody>row slides up and fades in with a 60 ms stagger on data arrival - 🔴 Pulsing live-data dot — a green dot next to "Updated HH:mm:ss" confirms live polling is active
- Data auto-refreshes: consumer/queue/retry tables every 10 s, events every 5 s, dead-letters every 15 s
Registration
// Program.cs / Startup.ConfigureServices — call after AddBus()
builder.Services.AddBusViewer(o => o.UseBasicAuth());
// Middleware pipeline
app.UseStaticFiles(); // serves /_content/SimplyWorks.Bus.RabbitMqViewer/bus-viewer.css
app.UseRouting();
app.UseAuthentication(); // required for RequirePolicy() mode
app.UseAuthorization();
app.UseBusViewer();
app.MapRazorPages(); // discovers BusViewer area automatically
Note:
UseBusViewer()can be placed at any position in the pipeline. WhenUseBasicAuth()is active, anIStartupFilterautomatically inserts the viewer's authentication gate at the very beginning of the pipeline — beforeUseAuthorization— so 403 errors from the host application's authorization policies can never block the viewer.
Authentication
Mode 1 — Form-based login (UseBasicAuth())
Credentials are validated against IConfiguration at request time (supports secret rotation without restart). On success a short-lived session cookie is issued:
- Cookie
.BusViewerAuth, path-scoped to/bus-viewer,HttpOnly,SameSite=Strict - Session-only: deleted when the browser closes, 8-hour absolute maximum
- No browser credential caching (unlike HTTP Basic auth browser dialogs)
- Logout at
/bus-viewer/logoutclears the cookie immediately
builder.Services.AddBusViewer(o => o.UseBasicAuth(
usernameConfigKey: "BusViewer:Username", // default key
passwordConfigKey: "BusViewer:Password")); // default key
{
"BusViewer": { "Username": "ops", "Password": "super-secret" }
}
Environment variable equivalents: BusViewer__Username, BusViewer__Password
Mode 2 — Decoupled / policy (recommended for production)
Delegates entirely to the host's authorization pipeline. Any scheme works (JWT, cookie, OIDC, Windows Auth):
builder.Services.AddAuthorization(o =>
o.AddPolicy("OpsOnly", p => p.RequireRole("ops")));
builder.Services.AddBusViewer(o => o.RequirePolicy("OpsOnly"));
Mode 3 — Anonymous (development only)
Throws InvalidOperationException at startup when IHostEnvironment.IsProduction() is true unless explicitly suppressed:
builder.Services.AddBusViewer(o =>
{
o.AllowAnonymous();
o.AllowAnonymousInProduction = true; // only if behind proxy/network policy
});
Viewer options reference
| Option | Default | Description |
|---|---|---|
Title |
"SW.Bus Operations" |
Header title displayed in the sidebar |
AuthMode |
None |
Set via RequirePolicy(), UseBasicAuth(), or AllowAnonymous() |
PolicyName |
null |
Policy name used when AuthMode = Policy |
UsernameConfigKey |
"BusViewer:Username" |
Config key for the login username |
PasswordConfigKey |
"BusViewer:Password" |
Config key for the login password |
AllowAnonymousInProduction |
false |
Suppress production guard for anonymous mode |
Full BusOptions Reference
services.AddBus(config =>
{
// ── Identity ──────────────────────────────────────────────────────────
config.ApplicationName = "OrderService";
// ── Connection ────────────────────────────────────────────────────────
config.HeartBeatTimeOut = 60; // heartbeat seconds (0 = off)
config.ManagementUrl = "http://localhost:15672"; // defaults from AMQP URI
config.ManagementUsername = "guest";
config.ManagementPassword = "guest";
config.VirtualHost = "/";
// ── Queue defaults ────────────────────────────────────────────────────
config.DefaultQueuePrefetch = 4; // QoS prefetch per consumer
config.DefaultRetryCount = 5; // retries before dead-letter
config.DefaultRetryAfter = 60; // seconds between retries
config.DefaultMaxPriority = 0; // 0 = priority queues disabled
// ── Per-queue overrides ───────────────────────────────────────────────
config.AddQueueOption("OrderConsumer.OrderCreated",
prefetch: 10, retryCount: 3, retryAfterSeconds: 30, priority: 5);
// ── Broadcast listeners ───────────────────────────────────────────────
config.ListenRetryCount = 5;
config.ListenRetryAfter = 60;
// ── Monitoring ────────────────────────────────────────────────────────
config.MonitoringCacheSeconds = 5; // cache management API responses (3–60)
// ── JWT context propagation ───────────────────────────────────────────
config.Token.Key = "your-secret-key";
config.Token.Issuer = "your-issuer";
config.Token.Audience = "your-audience";
// ── Operational events pipeline ───────────────────────────────────────
config.OperationalEventsEnabled = true;
config.OperationalEventsBufferCapacity = 8192; // channel buffer before drop
config.OperationalEventsBatchSize = 256; // events per flush batch
config.OperationalEventsFlushIntervalMs = 1000; // ms between flushes
config.OperationalEventsDropOldest = true; // drop strategy when buffer full
config.OperationalEventsSchemaVersion = "1.0"; // stamped on every event
config.OperationalEventsStoreCapacity = 10000; // in-memory ring buffer size
// ── Alert thresholds ─────────────────────────────────────────────────
config.QueueBackpressureThreshold = 5000; // queue depth → backpressure event
config.AlertRetryWarningThreshold = 10; // retry count → Warning alert
config.AlertRetryCriticalThreshold = 100; // retry count → Critical alert
config.AlertDeadLetterCriticalThreshold = 100; // DL count → Critical alert
});
Architecture
┌─────────────────────────────────────────────────────────────────┐
│ RabbitMQ Broker │
│ Process Exchange ──► Consumer Queue ──► Retry Queue │
│ Dead-Letter Exchange ──────────────────► Bad Queue │
│ Node Exchange ─────► Per-Instance Queue (broadcasts) │
└─────────────────────────────────────────────────────────────────┘
▲ │
│ publish │ consume
▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ SimplyWorks.Bus Runtime │
│ BasicPublisher ─── ActivitySource ─── OperationalEventPublisher│
│ ConsumerRunner ─── ActivitySource ─── OperationalEventPublisher│
│ ConsumersService ──────────────────── OperationalEventPublisher│
│ │
│ OperationalEventDispatcher (BackgroundService) │
│ ├── InMemoryOperationalEventStore (ring buffer) │
│ └── [custom IOperationalEventBatchSink sinks] │
│ │
│ BusDashboardDataService ── IConsumerReader (+ cache) │
│ ── IOperationalEventStore │
│ ── IAlertEvaluator │
└─────────────────────────────────────────────────────────────────┘
│
▼ (optional)
┌─────────────────────────────────────────────────────────────────┐
│ SimplyWorks.Bus.RabbitMqViewer │
│ /bus-viewer Overview dashboard │
│ /bus-viewer/consumers Consumer health grid │
│ /bus-viewer/queues Queue depths & rates │
│ /bus-viewer/retries Retry analysis │
│ /bus-viewer/dead-letters Dead-letter inspection │
│ /bus-viewer/events Live operational event stream │
└─────────────────────────────────────────────────────────────────┘
Queue naming convention:
{env}.{app}.{ConsumerClass}.{MessageType}
{env}.{app}.{ConsumerClass}.{MessageType}.retry
{env}.{app}.{ConsumerClass}.{MessageType}.bad
Example with ApplicationName = "OrderService" in Development:
v3.development.orderservice.ordercreatedconsumer.ordercreated
v3.development.orderservice.ordercreatedconsumer.ordercreated.retry
v3.development.orderservice.ordercreatedconsumer.ordercreated.bad
Testing
// Use mock publisher — logs publish calls without sending to RabbitMQ
services.AddBusPublishMock();
// IBusDashboardDataService, IConsumerReader, IErrorQueueReader are standard
// interfaces — mock them with any mocking library in unit tests
Dependencies
| Package | Version | Used for |
|---|---|---|
RabbitMQ.Client |
6.8.1 | AMQP connection and channel management |
EasyNetQ.Management.Client |
3.0.1 | RabbitMQ Management API calls |
Scrutor |
4.2.2 | Assembly scanning for consumers |
SimplyWorks.HttpExtensions |
8.1.1 | JWT and request context propagation |
SimplyWorks.PrimitiveTypes |
8.1.3 | RequestContext, IConsume<T>, etc. |
Contributing
- Fork the repository
- Create a feature branch
- Make your changes with tests
- Submit a pull request
License
MIT — see LICENSE
Support
- Issues: github.com/simplify9/SW-Bus/issues
- Sample app:
SW.Bus.SampleWebproject in this repository
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- EasyNetQ.Management.Client (>= 3.0.1)
- RabbitMQ.Client (>= 6.8.1)
- Scrutor (>= 4.2.2)
- SimplyWorks.Bus.RabbitMqExtensions (>= 8.1.9)
- SimplyWorks.HttpExtensions (>= 8.1.1)
NuGet packages (2)
Showing the top 2 NuGet packages that depend on SimplyWorks.Bus:
| Package | Downloads |
|---|---|
|
ChequeScore.PrimitiveTypes.EFUtils
ChequeScore EF Utils |
|
|
SimplyWorks.Bus.RabbitMqViewer
Built-in HTMX operations dashboard for SimplyWorks.Bus. Mounts a server-rendered dark-mode admin UI (Pico.css + HTMX) at a configurable route with consumer health, queue metrics, retry analysis, dead-letter inspection, and live operational events — zero JavaScript framework required. |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 8.1.9 | 97 | 5/11/2026 |
| 8.1.8 | 174 | 4/9/2026 |
| 8.1.7 | 798 | 1/25/2026 |
| 8.1.6 | 135 | 1/25/2026 |
| 8.1.5 | 220 | 1/20/2026 |
| 8.1.4 | 184 | 1/12/2026 |
| 8.1.3 | 157 | 1/8/2026 |
| 8.1.2 | 133 | 1/4/2026 |
| 8.1.1 | 157 | 12/31/2025 |
| 8.1.0 | 428 | 9/16/2025 |
| 8.0.11 | 3,126 | 1/27/2025 |
| 8.0.10 | 609 | 11/7/2024 |
| 8.0.9 | 250 | 10/20/2024 |
| 8.0.4 | 541 | 6/27/2024 |
| 8.0.3 | 255 | 6/27/2024 |
| 8.0.2 | 350 | 6/2/2024 |
| 8.0.1 | 360 | 5/7/2024 |
| 6.0.24 | 2,751 | 12/16/2024 |
| 6.0.23 | 2,822 | 11/7/2024 |
| 6.0.22 | 895 | 10/17/2024 |
See https://github.com/simplify9/SW-Bus/releases for release notes and changelog.