Softoverse.EventBus.InMemory
10.9.0
dotnet add package Softoverse.EventBus.InMemory --version 10.9.0
NuGet\Install-Package Softoverse.EventBus.InMemory -Version 10.9.0
<PackageReference Include="Softoverse.EventBus.InMemory" Version="10.9.0" />
<PackageVersion Include="Softoverse.EventBus.InMemory" Version="10.9.0" />
<PackageReference Include="Softoverse.EventBus.InMemory" />
paket add Softoverse.EventBus.InMemory --version 10.9.0
#r "nuget: Softoverse.EventBus.InMemory, 10.9.0"
#:package Softoverse.EventBus.InMemory@10.9.0
#addin nuget:?package=Softoverse.EventBus.InMemory&version=10.9.0
#tool nuget:?package=Softoverse.EventBus.InMemory&version=10.9.0
Softoverse.EventBus.InMemory
A lightweight, high-performance in-memory event bus for .NET 10+ applications. Enables decoupled communication through the publish-subscribe pattern with built-in support for scheduled events. Perfect for domain events, CQRS, and event-driven architectures within a single application.
๐ Table of Contents
- Features
- Installation
- Quick Start
- Configuration
- Usage Examples
- Processing Modes
- Advanced Scenarios
- Best Practices
- OpenTelemetry Integration
- Troubleshooting
- Contributing
โจ Features
- โก High Performance: Built on
System.Threading.Channelsfor optimal throughput - ๐ Async First: Non-blocking event publishing and processing
- ๐ Event Scheduling: Schedule events to be processed at specific times or after relative delays
- ๐ก๏ธ Type-Safe: Strongly-typed contracts with compile-time safety
- ๐ง Flexible: Extensible architecture with custom processors
- ๐งต Concurrency Control: Built-in capacity management
- ๐ Rich Logging: Comprehensive logging for monitoring
- ๐งฉ DI Native: First-class dependency injection support
- ๐ฏ Request-Response: Support for query patterns via
InvokeAsync
๐ฆ Installation
dotnet add package Softoverse.EventBus.InMemory
๐ Quick Start
Step 1: Define Your Events
using Softoverse.EventBus.InMemory.Abstractions;
public class OrderCreatedEvent : IEvent
{
public string OrderId { get; set; }
public decimal Amount { get; set; }
public string CustomerId { get; set; }
}
Step 2: Create Event Handlers
public class OrderCreatedHandler : IEventHandler<OrderCreatedEvent>
{
private readonly ILogger<OrderCreatedHandler> _logger;
public OrderCreatedHandler(ILogger<OrderCreatedHandler> logger)
{
_logger = logger;
}
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct = default)
{
_logger.LogInformation("Processing order {OrderId}", @event.OrderId);
// Your business logic here
await Task.CompletedTask;
}
}
Step 3: Register in Program.cs
using Softoverse.EventBus.InMemory;
using Softoverse.EventBus.InMemory.Infrastructure.Processors;
var builder = WebApplication.CreateBuilder(args);
// Register with InMemoryEventProcessor (recommended for most cases)
builder.Services.AddEventBus(
builder.Configuration,
[typeof(Program).Assembly] // Assemblies containing your handlers
);
var app = builder.Build();
app.Run();
Step 4: Configure appsettings.json
{
"EventBusSettings": {
"EventProcessorCapacity": 10
}
}
Step 5: Publish Events
public class OrderService
{
private readonly IEventBus _eventBus;
public OrderService(IEventBus eventBus)
{
_eventBus = eventBus;
}
public async Task CreateOrderAsync(CreateOrderRequest request)
{
// Your order creation logic...
// Publish event
await _eventBus.PublishAsync(new OrderCreatedEvent
{
OrderId = order.Id,
Amount = order.Amount,
CustomerId = order.CustomerId
});
}
}
โ๏ธ Configuration
Configuration Options
| Property | Description | Default |
|---|---|---|
EventProcessorCapacity |
Max concurrent event processors | 10 |
ChannelCapacity |
Channel buffer size (-1 = unbounded) | -1 |
ExecuteAfterSeconds |
Interval for checking scheduled events | 2 |
RetryCount |
Maximum retry attempts (for custom processors) | 10 |
EachRetryInterval |
Seconds between retries | 3 |
Example Configuration
{
"EventBusSettings": {
"EventProcessorCapacity": 10,
"ChannelCapacity": -1,
"ExecuteAfterSeconds": 2,
"RetryCount": 10,
"EachRetryInterval": 3
}
}
๐ Usage Examples
Publishing Events
Single Event
await _eventBus.PublishAsync(new OrderCreatedEvent { OrderId = "123" });
Multiple Events
var events = new List<OrderCreatedEvent>
{
new() { OrderId = "123" },
new() { OrderId = "124" },
new() { OrderId = "125" }
};
await _eventBus.BulkPublishAsync(events);
Scheduling Events
Events can be scheduled with an absolute DateTimeOffset or with a relative TimeSpan delay.
Schedule Single Event
var reminderDate = DateTimeOffset.UtcNow.AddDays(7);
await _eventBus.ScheduleAsync(
new SubscriptionReminderEvent { SubscriptionId = "sub-123" },
reminderDate
);
await _eventBus.ScheduleAsync(
new SubscriptionReminderEvent { SubscriptionId = "sub-123" },
TimeSpan.FromDays(7)
);
Schedule Multiple Events
var reminders = new List<SubscriptionReminderEvent>
{
new() { SubscriptionId = "sub-123", DaysRemaining = 7 },
new() { SubscriptionId = "sub-124", DaysRemaining = 7 }
};
var scheduleTime = DateTimeOffset.UtcNow.AddDays(7);
await _eventBus.BulkScheduleAsync(reminders, scheduleTime);
await _eventBus.BulkScheduleAsync(reminders, TimeSpan.FromDays(7));
Scheduled Event Status Tracking
The EventBus includes comprehensive status tracking for scheduled events to prevent duplicate execution and provide visibility into event processing:
Note: The built-in
ScheduledEventStoreis in-memory. Scheduled events and their statuses are kept only while the application process is running. If the application stops, restarts, or crashes, pending scheduled events in this store are lost.
Status Types
- Pending: Event is waiting to be processed
- InProgress: Event is currently being processed
- Done: Event completed successfully
- Failed: Event processing failed (with failure reason)
- Skipped: Event was skipped and won't be processed
Automatic Status Management
The system automatically manages event statuses to prevent duplicate execution:
- When an event becomes due, it's immediately marked as InProgress
- If processing succeeds, it's marked as Done and removed from the store
- If processing fails, it's marked as Failed with the error message and removed
- Events stuck in InProgress for too long (default: 5 minutes) are considered stale and can be retried
Configuring InProgress Timeout
You can control how long an event stays in InProgress state before being considered stale:
// In your startup/configuration
builder.Services.AddSingleton<ScheduledEventStore>(sp =>
{
var logger = sp.GetRequiredService<ILogger<ScheduledEventStore>>();
var store = new ScheduledEventStore(logger)
{
InProgressTimeoutMinutes = 10 // Default is 5 minutes
};
return store;
});
Querying Event Status
You can query events by status for monitoring or debugging:
// Get the ScheduledEventStore from DI
var store = serviceProvider.GetRequiredService<ScheduledEventStore>();
// Get all pending events
var pendingEvents = store.GetEventsByStatus(ScheduledEventStatus.Pending);
// Get all failed events to investigate issues
var failedEvents = store.GetEventsByStatus(ScheduledEventStatus.Failed);
foreach (var entry in failedEvents)
{
Console.WriteLine($"Event {entry.Id} failed: {entry.Remarks}");
}
// Get all in-progress events
var inProgressEvents = store.GetEventsByStatus(ScheduledEventStatus.InProgress);
// Manually mark an event as skipped
store.MarkAsSkipped(eventId, "Cancelled by user");
// Get total count of scheduled events
int totalEvents = store.Count;
// Get next scheduled event time
DateTimeOffset? nextEventTime = store.GetNextScheduledTime();
Why Status Tracking Matters
Problem: When debugging handlers (e.g., using breakpoints), the background service continues polling and may pick up the same event multiple times, causing duplicate execution.
Solution: Status tracking ensures:
- Events marked as InProgress won't be picked up again (unless they time out)
- Failed events are tracked with failure reasons for troubleshooting
- You can query and monitor the state of all scheduled events
- Prevents race conditions in distributed scenarios
Example: Monitoring Scheduled Events
public class ScheduledEventMonitor : BackgroundService
{
private readonly ScheduledEventStore _store;
private readonly ILogger<ScheduledEventMonitor> _logger;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var pending = _store.GetEventsByStatus(ScheduledEventStatus.Pending);
var inProgress = _store.GetEventsByStatus(ScheduledEventStatus.InProgress);
var failed = _store.GetEventsByStatus(ScheduledEventStatus.Failed);
_logger.LogInformation(
"Scheduled Events - Pending: {Pending}, InProgress: {InProgress}, Failed: {Failed}",
pending.Count, inProgress.Count, failed.Count);
// Alert if events are stuck in InProgress for too long
foreach (var evt in inProgress)
{
var stuckDuration = DateTimeOffset.UtcNow - evt.StatusUpdatedAt;
if (stuckDuration > TimeSpan.FromMinutes(15))
{
_logger.LogWarning(
"Event {EventId} has been InProgress for {Duration} minutes",
evt.Id, stuckDuration.TotalMinutes);
}
}
await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
}
}
}
Request-Response Pattern
For queries that need responses:
// 1. Define a request handler
public class GetOrderQueryHandler : IRequestHandler<GetOrderQuery, OrderDto>
{
public async Task<OrderDto> HandleAsync(GetOrderQuery query, CancellationToken ct)
{
// Fetch and return order
return await _repository.GetOrderAsync(query.OrderId);
}
}
// 2. Use InvokeAsync
var result = await _eventBus.InvokeAsync<OrderDto>(new GetOrderQuery
{
OrderId = "123"
});
Multiple Handlers for Same Event
// Handler 1: Send email
public class OrderEmailHandler : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
await _emailService.SendOrderConfirmationAsync(@event.CustomerId);
}
}
// Handler 2: Update inventory
public class OrderInventoryHandler : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
await _inventoryService.ReserveItemsAsync(@event.OrderId);
}
}
// Handler 3: Log audit
public class OrderAuditHandler : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
await _auditService.LogOrderCreatedAsync(@event);
}
}
// All three handlers execute in parallel when the event is published
๐ Processing Mode
The EventBus uses a Channel-based architecture for high-performance asynchronous processing with background workers.
Benefits:
- Non-blocking publishers (immediate return)
- High throughput via background processing
- Built-in concurrency control
- Automatic scheduling support
Characteristics:
- High event volume support
- Optimized for performance
- Eventual consistency model
๐ Advanced Scenarios
Custom Event Processor
If you need custom behavior (retry logic, circuit breakers, etc.), create a custom processor:
public class CustomEventProcessor : IEventProcessor
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger _logger;
public CustomEventProcessor(
IServiceScopeFactory scopeFactory,
ILogger<CustomEventProcessor> logger)
{
_scopeFactory = scopeFactory;
_logger = logger;
}
public async Task ProcessEventAsync(IEvent @event, CancellationToken ct)
{
// Custom pre-processing logic
_logger.LogInformation("Custom processing for {EventType}", @event.GetType().Name);
await ProcessEventHandlersAsync(@event, ct);
}
public async Task ProcessScheduledEventAsync(
IEvent @event,
DateTimeOffset scheduledTime,
CancellationToken ct)
{
// Calculate delay
var delay = scheduledTime.ToUniversalTime() - DateTimeOffset.UtcNow;
if (delay > TimeSpan.Zero)
{
await Task.Delay(delay, ct);
}
await ProcessEventHandlersAsync(@event, ct);
}
public async Task ProcessEventHandlersAsync(IEvent @event, CancellationToken ct)
{
await using var scope = _scopeFactory.CreateAsyncScope();
var handlers = scope.ServiceProvider.GetServices<IEventHandler>();
var applicableHandlers = handlers.Where(h => h.CanHandle(@event));
var tasks = applicableHandlers.Select(h =>
SafeHandleAsync(h, @event, ct));
await Task.WhenAll(tasks);
}
public async Task<TResult> InvokeAsync<TResult>(
object @event,
CancellationToken ct)
{
// Custom invoke logic
await using var scope = _scopeFactory.CreateAsyncScope();
var handlerType = typeof(IRequestHandler<,>)
.MakeGenericType(@event.GetType(), typeof(TResult));
var handler = scope.ServiceProvider.GetService(handlerType);
if (handler is IRequestHandler baseHandler)
{
var result = await baseHandler.HandleAsync(@event, ct);
return (TResult)result!;
}
return default!;
}
private async Task SafeHandleAsync(
IEventHandler handler,
IEvent @event,
CancellationToken ct)
{
try
{
await handler.HandleAsync(@event, ct);
}
catch (Exception ex)
{
_logger.LogError(ex, "Handler {Handler} failed", handler.GetType().Name);
// Don't rethrow - isolate failures
}
}
}
Register your custom processor:
builder.Services.AddEventBus<CustomEventProcessor>(
builder.Configuration,
[typeof(Program).Assembly]
);
Handler with Scoped Dependencies
public class OrderHandler : IEventHandler<OrderCreatedEvent>
{
private readonly IOrderRepository _repository;
private readonly IEmailService _emailService;
private readonly ILogger _logger;
public OrderHandler(
IOrderRepository repository,
IEmailService emailService,
ILogger<OrderHandler> logger)
{
_repository = repository;
_emailService = emailService;
_logger = logger;
}
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
var order = await _repository.GetByIdAsync(@event.OrderId);
if (order != null)
{
await _emailService.SendConfirmationAsync(order.CustomerId, order);
_logger.LogInformation("Order confirmation sent for {OrderId}", order.Id);
}
}
}
Conditional Handling
Handlers can implement conditional logic:
public class PremiumCustomerHandler : IEventHandler<OrderCreatedEvent>
{
private readonly ICustomerService _customerService;
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
var customer = await _customerService.GetByIdAsync(@event.CustomerId);
// Only process for premium customers
if (customer.IsPremium)
{
// Apply special premium processing
await ApplyPremiumBenefitsAsync(customer, @event);
}
}
}
๐ฏ Best Practices
1. Keep Handlers Focused
Each handler should have a single responsibility:
// โ
Good - Single responsibility
public class SendOrderEmailHandler : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
await _emailService.SendOrderConfirmationAsync(@event);
}
}
// โ Bad - Multiple responsibilities
public class OrderCreatedHandler : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
await _emailService.SendEmail(@event);
await _inventoryService.UpdateInventory(@event);
await _analyticsService.TrackOrder(@event);
await _loyaltyService.AddPoints(@event);
}
}
2. Use Channel Mode for Production
For production workloads, use Channel mode for better performance:
{
"EventBusSettings": {
"EventBusType": "Channel",
"EventProcessorCapacity": 10
}
}
3. Handle Exceptions Gracefully
Don't let one handler failure affect others:
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
try
{
await _service.ProcessOrderAsync(@event);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process order {OrderId}", @event.OrderId);
// Consider: Dead letter queue, retry logic, alerting, etc.
}
}
4. Use Descriptive Event Names
// โ
Good
public class OrderCreatedEvent : IEvent { }
public class PaymentProcessedEvent : IEvent { }
public class InventoryReservedEvent : IEvent { }
// โ Bad
public class Event1 : IEvent { }
public class DataChanged : IEvent { }
5. Leverage Scheduled Events
Use scheduling for time-based workflows:
// Schedule reminder 7 days before expiry
var reminderDate = subscription.ExpiryDate.AddDays(-7);
await _eventBus.ScheduleAsync(
new SubscriptionExpiryReminderEvent { SubscriptionId = subscription.Id },
reminderDate
);
// Or schedule relative to now
await _eventBus.ScheduleAsync(
new SubscriptionExpiryReminderEvent { SubscriptionId = subscription.Id },
TimeSpan.FromDays(7)
);
6. Test with Both Modes
Test with General mode for easier debugging, then use Channel mode for production:
// appsettings.Development.json
{
"EventBusSettings": {
"EventBusType": "General"
}
}
// appsettings.Production.json
{
"EventBusSettings": {
"EventBusType": "Channel",
"EventProcessorCapacity": 20
}
}
7. Monitor Event Processing
Add structured logging to track event flow:
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
using var scope = _logger.BeginScope(new Dictionary<string, object>
{
["OrderId"] = @event.OrderId,
["EventType"] = nameof(OrderCreatedEvent)
});
_logger.LogInformation("Processing order creation");
await ProcessOrderAsync(@event);
_logger.LogInformation("Order processing completed");
}
๐ OpenTelemetry Integration
The EventBus library includes built-in support for distributed tracing using OpenTelemetry. This allows you to trace event publishing, processing, and handler execution in observability platforms like Seq, Jaeger, Zipkin, or Application Insights.
๐ For detailed documentation, examples, and troubleshooting, see OpenTelemetry Integration Guide
๐ก Note: If you see detailed trace output in your console (Activity.TraceId, Activity.SpanId, etc.), this is completely normal when using
.AddConsoleExporter()- it's intended for development/debugging and shows that tracing is working correctly! For production, use a proper APM exporter instead.
Activity Source
The library exposes an ActivitySource named "Softoverse.EventBus.InMemory" that creates traces for all event bus operations.
Quick Setup
1. Install OpenTelemetry Packages
# Core OpenTelemetry packages
dotnet add package OpenTelemetry.Extensions.Hosting
dotnet add package OpenTelemetry.Instrumentation.AspNetCore
# Choose your exporter (examples below)
dotnet add package OpenTelemetry.Exporter.Console # For console output
dotnet add package OpenTelemetry.Exporter.OpenTelemetryProtocol # For OTLP (Seq, etc.)
dotnet add package OpenTelemetry.Exporter.Zipkin # For Zipkin
dotnet add package OpenTelemetry.Exporter.Jaeger # For Jaeger
2. Configure OpenTelemetry in Program.cs
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
var builder = WebApplication.CreateBuilder(args);
// Add EventBus
builder.Services.AddEventBus(builder.Configuration, [typeof(Program).Assembly]);
// Configure OpenTelemetry
builder.Services.AddOpenTelemetry()
.ConfigureResource(resource => resource
.AddService("MyApplication"))
.WithTracing(tracing =>
{
tracing
.AddAspNetCoreInstrumentation()
.AddHttpClientInstrumentation()
// ๐ Add EventBus tracing
.AddSource("Softoverse.EventBus.InMemory");
// Console exporter for development (produces detailed output in console)
if (builder.Environment.IsDevelopment())
{
tracing.AddConsoleExporter();
}
// Use your preferred production exporter (Seq, Jaeger, Application Insights, etc.)
});
var app = builder.Build();
app.Run();
Exporter Examples
Export to Seq
// Install: dotnet add package OpenTelemetry.Exporter.OpenTelemetryProtocol
builder.Services.AddOpenTelemetry()
.ConfigureResource(resource => resource
.AddService("MyApplication"))
.WithTracing(tracing => tracing
.AddAspNetCoreInstrumentation()
.AddSource("Softoverse.EventBus.InMemory")
.AddOtlpExporter(options =>
{
options.Endpoint = new Uri("http://localhost:5341/ingest/otlp/v1/traces");
options.Protocol = OpenTelemetry.Exporter.OtlpExportProtocol.HttpProtobuf;
})
);
Seq Configuration: In Seq, go to Settings โ API Keys and create an OTLP ingestion key.
Export to Jaeger
// Install: dotnet add package OpenTelemetry.Exporter.Jaeger
builder.Services.AddOpenTelemetry()
.ConfigureResource(resource => resource
.AddService("MyApplication"))
.WithTracing(tracing => tracing
.AddAspNetCoreInstrumentation()
.AddSource("Softoverse.EventBus.InMemory")
.AddJaegerExporter(options =>
{
options.AgentHost = "localhost";
options.AgentPort = 6831;
})
);
Export to Application Insights
// Install: dotnet add package Azure.Monitor.OpenTelemetry.Exporter
builder.Services.AddOpenTelemetry()
.ConfigureResource(resource => resource
.AddService("MyApplication"))
.WithTracing(tracing => tracing
.AddAspNetCoreInstrumentation()
.AddSource("Softoverse.EventBus.InMemory")
.AddAzureMonitorTraceExporter(options =>
{
options.ConnectionString = builder.Configuration["ApplicationInsights:ConnectionString"];
})
);
Traced Operations
The EventBus creates traces for the following operations:
| Activity Name | Description | Tags |
|---|---|---|
EventBus.Publish |
Single event publication | eventbus.event.type, eventbus.event.id |
EventBus.BulkPublish |
Bulk event publication | eventbus.event.type, eventbus.event.count |
EventBus.Schedule |
Schedule single event | eventbus.event.type, eventbus.scheduled_time |
EventBus.BulkSchedule |
Schedule multiple events | eventbus.event.type, eventbus.event.count, eventbus.scheduled_time |
EventBus.Invoke |
Request-response invocation | eventbus.event.type, eventbus.result.type |
EventBus.ProcessEvent |
Event processing | eventbus.event.type, eventbus.processing.status |
EventBus.HandleEvent |
Individual handler execution | eventbus.event.type, eventbus.handler.type |
EventBus.Channel.Read |
Reading from channel | eventbus.channel.type (Publishing/Scheduling) |
EventBus.Channel.Process |
Channel processing | eventbus.channel.type, eventbus.processing.status |
EventBus.ScheduledEvent.Check |
Scheduled event check | eventbus.event.count, eventbus.processed.count |
Trace Tags (Attributes)
Each trace includes relevant tags for filtering and analysis:
- eventbus.event.type: The name of the event type
- eventbus.event.id: The unique identifier of the event (if available)
- eventbus.event.count: Number of events in bulk operations
- eventbus.handler.type: The name of the handler processing the event
- eventbus.scheduled_time: When the event is scheduled to execute
- eventbus.processing.status: Status of the operation (
success,failed,no_handlers, etc.) - eventbus.error.type: Type of exception when operation fails
- eventbus.channel.type: Type of channel (
PublishingorScheduling)
Example: Viewing Traces in Seq
After configuring Seq exporter:
- Publish an event in your application
- Open Seq at
http://localhost:5341 - Navigate to the Traces view
- Search for traces containing
EventBus.Publish - Click on a trace to see the full span hierarchy:
EventBus.Publish (OrderCreatedEvent)
โโโ EventBus.ProcessEvent
โโโ EventBus.HandleEvent (SendOrderEmailHandler)
โโโ EventBus.HandleEvent (OrderInventoryHandler)
โโโ EventBus.HandleEvent (OrderAuditHandler)
Example: Custom Tracing in Handlers
You can add custom spans in your handlers:
using System.Diagnostics;
public class OrderCreatedHandler : IEventHandler<OrderCreatedEvent>
{
private static readonly ActivitySource ActivitySource = new("MyApplication.Handlers");
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
using var activity = ActivitySource.StartActivity("ProcessOrder");
activity?.SetTag("order.id", @event.OrderId);
activity?.SetTag("order.amount", @event.Amount);
// Your processing logic
await ProcessOrderAsync(@event);
activity?.SetStatus(ActivityStatusCode.Ok);
}
}
Remember to register your custom ActivitySource:
builder.Services.AddOpenTelemetry()
.WithTracing(tracing => tracing
.AddSource("Softoverse.EventBus.InMemory")
.AddSource("MyApplication.Handlers") // ๐ Add your custom source
.AddOtlpExporter(/* ... */)
);
Filtering Traces
Use trace tags to filter and analyze specific scenarios:
In Seq:
@Properties.eventbus.event.type = 'OrderCreatedEvent'
@Properties.eventbus.processing.status = 'failed'
In Jaeger:
- Filter by tag:
eventbus.event.type=OrderCreatedEvent - Filter by operation:
EventBus.HandleEvent
Performance Considerations
- OpenTelemetry tracing has minimal performance impact (<1% in most scenarios)
- Activities are only created when an
ActivityListeneris registered - Activities are only created for actual event processing - idle operations (empty channel reads, scheduled event checks with no due events) don't create traces to avoid noise
- Use sampling in high-throughput scenarios to reduce overhead:
builder.Services.AddOpenTelemetry()
.WithTracing(tracing => tracing
.AddSource("Softoverse.EventBus.InMemory")
.SetSampler(new TraceIdRatioBasedSampler(0.1)) // Sample 10% of traces
.AddOtlpExporter()
);
๐ ๏ธ Troubleshooting
Events Not Processing
- Check Configuration: Ensure
EventBusSettingsis inappsettings.json - Verify Registration: Handlers must be in assemblies passed to
AddEventBus() - Check Logs: Look for errors in application logs
Scheduled Events Not Executing
- Verify InMemoryEventProcessor: Use the parameterless
AddEventBus()overload - Check Time: Ensure scheduled time is in the future
- Review Logs: Check
ScheduledEventProcessingHostedServicelogs - Check Status: Query
ScheduledEventStore.GetEventsByStatus()to see if events are stuck in InProgress or Failed state
Events Executing Multiple Times
- Status Tracking: The system automatically marks events as InProgress to prevent duplicate execution
- Timeout Configuration: If events take longer to process, increase
InProgressTimeoutMinutesonScheduledEventStore - Check for Errors: Review failed events using
GetEventsByStatus(ScheduledEventStatus.Failed)to identify issues
Performance Issues
- Increase Capacity: Adjust
EventProcessorCapacitybased on load - Use Channel Mode: Ensure using "Channel" mode for production
- Optimize Handlers: Profile slow handlers and optimize them
๐ Additional Resources
- Architecture Documentation: See ARCHITECTURE.md for detailed implementation details
- GitHub Repository: https://github.com/softoverse/EventBus
- NuGet Package: https://www.nuget.org/packages/Softoverse.EventBus.InMemory/
๐ค Contributing
Contributions are welcome! Please see ARCHITECTURE.md to understand the codebase, then:
- Fork the repository
- Create a feature branch
- Make your changes
- Submit a pull request
๐ License
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
Made with โค๏ธ by Softoverse
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Microsoft.Extensions.Configuration.Binder (>= 10.0.7)
- Microsoft.Extensions.Hosting.Abstractions (>= 10.0.7)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.7)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.