Softoverse.EventBus.InMemory 10.7.0

There is a newer version of this package available.
See the version list below for details.
dotnet add package Softoverse.EventBus.InMemory --version 10.7.0
                    
NuGet\Install-Package Softoverse.EventBus.InMemory -Version 10.7.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Softoverse.EventBus.InMemory" Version="10.7.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Softoverse.EventBus.InMemory" Version="10.7.0" />
                    
Directory.Packages.props
<PackageReference Include="Softoverse.EventBus.InMemory" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Softoverse.EventBus.InMemory --version 10.7.0
                    
#r "nuget: Softoverse.EventBus.InMemory, 10.7.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Softoverse.EventBus.InMemory@10.7.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Softoverse.EventBus.InMemory&version=10.7.0
                    
Install as a Cake Addin
#tool nuget:?package=Softoverse.EventBus.InMemory&version=10.7.0
                    
Install as a Cake Tool

Softoverse.EventBus.InMemory

NuGet Version License: Apache-2.0 .NET Version

A lightweight, high-performance in-memory event bus for .NET 10+ applications. Enables decoupled communication through the publish-subscribe pattern with built-in support for scheduled events. Perfect for domain events, CQRS, and event-driven architectures within a single application.

๐Ÿ“‹ Table of Contents

โœจ Features

  • โšก High Performance: Built on System.Threading.Channels for optimal throughput
  • ๐Ÿ”„ Async First: Non-blocking event publishing and processing
  • ๐Ÿ“… Event Scheduling: Schedule events to be processed at specific times
  • ๐Ÿ›ก๏ธ Type-Safe: Strongly-typed contracts with compile-time safety
  • ๐Ÿ”ง Flexible: Two processing modes (Channel/General)
  • ๐Ÿงต Concurrency Control: Built-in capacity management
  • ๐Ÿ“ Rich Logging: Comprehensive logging for monitoring
  • ๐Ÿงฉ DI Native: First-class dependency injection support
  • ๐ŸŽฏ Request-Response: Support for query patterns via InvokeAsync

๐Ÿ“ฆ Installation

dotnet add package Softoverse.EventBus.InMemory

๐Ÿš€ Quick Start

Step 1: Define Your Events

using Softoverse.EventBus.InMemory.Abstractions;

public class OrderCreatedEvent : IEvent
{
    public string OrderId { get; set; }
    public decimal Amount { get; set; }
    public string CustomerId { get; set; }
}

Step 2: Create Event Handlers

public class OrderCreatedHandler : IEventHandler<OrderCreatedEvent>
{
    private readonly ILogger<OrderCreatedHandler> _logger;
    
    public OrderCreatedHandler(ILogger<OrderCreatedHandler> logger)
    {
        _logger = logger;
    }

    public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct = default)
    {
        _logger.LogInformation("Processing order {OrderId}", @event.OrderId);
        
        // Your business logic here
        await Task.CompletedTask;
    }
}

Step 3: Register in Program.cs

using Softoverse.EventBus.InMemory;

var builder = WebApplication.CreateBuilder(args);

// Register with DefaultEventProcessor (recommended for most cases)
builder.Services.AddEventBus(
    builder.Configuration,
    [typeof(Program).Assembly]  // Assemblies containing your handlers
);

var app = builder.Build();
app.Run();

Step 4: Configure appsettings.json

{
  "EventBusSettings": {
    "EventBusType": "Channel",
    "EventProcessorCapacity": 10
  }
}

Step 5: Publish Events

public class OrderService
{
    private readonly IEventBus _eventBus;

    public OrderService(IEventBus eventBus)
    {
        _eventBus = eventBus;
    }

    public async Task CreateOrderAsync(CreateOrderRequest request)
    {
        // Your order creation logic...
        
        // Publish event
        await _eventBus.PublishAsync(new OrderCreatedEvent
        {
            OrderId = order.Id,
            Amount = order.Amount,
            CustomerId = order.CustomerId
        });
    }
}

โš™๏ธ Configuration

Configuration Options

Property Description Default
EventBusType Processing mode: "Channel" or "General" "Channel"
EventProcessorCapacity Max concurrent event processors 10
ChannelCapacity Channel buffer size (-1 = unbounded) -1
ExecuteAfterSeconds Interval for checking scheduled events 2
RetryCount Maximum retry attempts (for custom processors) 10
EachRetryInterval Seconds between retries 3

Example Configuration

{
  "EventBusSettings": {
    "EventBusType": "Channel",
    "EventProcessorCapacity": 10,
    "ChannelCapacity": -1,
    "ExecuteAfterSeconds": 2,
    "RetryCount": 10,
    "EachRetryInterval": 3
  }
}

๐Ÿ“– Usage Examples

Publishing Events

Single Event
await _eventBus.PublishAsync(new OrderCreatedEvent { OrderId = "123" });
Multiple Events
var events = new List<OrderCreatedEvent>
{
    new() { OrderId = "123" },
    new() { OrderId = "124" },
    new() { OrderId = "125" }
};

await _eventBus.BulkPublishAsync(events);

Scheduling Events

Schedule Single Event
var reminderDate = DateTimeOffset.UtcNow.AddDays(7);
await _eventBus.ScheduleAsync(
    new SubscriptionReminderEvent { SubscriptionId = "sub-123" },
    reminderDate
);
Schedule Multiple Events
var reminders = new List<SubscriptionReminderEvent>
{
    new() { SubscriptionId = "sub-123", DaysRemaining = 7 },
    new() { SubscriptionId = "sub-124", DaysRemaining = 7 }
};

var scheduleTime = DateTimeOffset.UtcNow.AddDays(7);
await _eventBus.BulkScheduleAsync(reminders, scheduleTime);

Request-Response Pattern

For queries that need responses:

// 1. Define a request handler
public class GetOrderQueryHandler : IRequestHandler<GetOrderQuery, OrderDto>
{
    public async Task<OrderDto> HandleAsync(GetOrderQuery query, CancellationToken ct)
    {
        // Fetch and return order
        return await _repository.GetOrderAsync(query.OrderId);
    }
}

// 2. Use InvokeAsync
var result = await _eventBus.InvokeAsync<OrderDto>(new GetOrderQuery 
{ 
    OrderId = "123" 
});

Multiple Handlers for Same Event

// Handler 1: Send email
public class OrderEmailHandler : IEventHandler<OrderCreatedEvent>
{
    public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
    {
        await _emailService.SendOrderConfirmationAsync(@event.CustomerId);
    }
}

// Handler 2: Update inventory
public class OrderInventoryHandler : IEventHandler<OrderCreatedEvent>
{
    public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
    {
        await _inventoryService.ReserveItemsAsync(@event.OrderId);
    }
}

// Handler 3: Log audit
public class OrderAuditHandler : IEventHandler<OrderCreatedEvent>
{
    public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
    {
        await _auditService.LogOrderCreatedAsync(@event);
    }
}

// All three handlers execute in parallel when the event is published

๐Ÿ”„ Processing Modes

High-performance asynchronous processing with background workers.

Benefits:

  • Non-blocking publishers (immediate return)
  • High throughput via background processing
  • Built-in concurrency control
  • Automatic scheduling support

Use When:

  • High event volume
  • Performance is critical
  • Eventual consistency is acceptable
{
  "EventBusSettings": {
    "EventBusType": "Channel"
  }
}

General Mode

Synchronous immediate processing.

Benefits:

  • Immediate processing
  • Simpler debugging
  • Immediate consistency

Use When:

  • Low event volume
  • Immediate consistency required
  • Testing/debugging
{
  "EventBusSettings": {
    "EventBusType": "General"
  }
}

๐Ÿš€ Advanced Scenarios

Custom Event Processor

If you need custom behavior (retry logic, circuit breakers, etc.), create a custom processor:

public class CustomEventProcessor : IEventProcessor
{
    private readonly IServiceScopeFactory _scopeFactory;
    private readonly ILogger _logger;

    public CustomEventProcessor(
        IServiceScopeFactory scopeFactory, 
        ILogger<CustomEventProcessor> logger)
    {
        _scopeFactory = scopeFactory;
        _logger = logger;
    }

    public async Task ProcessEventAsync(IEvent @event, CancellationToken ct)
    {
        // Custom pre-processing logic
        _logger.LogInformation("Custom processing for {EventType}", @event.GetType().Name);
        
        await ProcessEventHandlersAsync(@event, ct);
    }

    public async Task ProcessScheduledEventAsync(
        IEvent @event, 
        DateTimeOffset scheduledTime, 
        CancellationToken ct)
    {
        // Calculate delay
        var delay = scheduledTime.ToUniversalTime() - DateTimeOffset.UtcNow;
        
        if (delay > TimeSpan.Zero)
        {
            await Task.Delay(delay, ct);
        }
        
        await ProcessEventHandlersAsync(@event, ct);
    }

    public async Task ProcessEventHandlersAsync(IEvent @event, CancellationToken ct)
    {
        await using var scope = _scopeFactory.CreateAsyncScope();
        var handlers = scope.ServiceProvider.GetServices<IEventHandler>();
        
        var applicableHandlers = handlers.Where(h => h.CanHandle(@event));
        
        var tasks = applicableHandlers.Select(h => 
            SafeHandleAsync(h, @event, ct));
        
        await Task.WhenAll(tasks);
    }

    public async Task<TResult> InvokeAsync<TResult>(
        object @event, 
        CancellationToken ct)
    {
        // Custom invoke logic
        await using var scope = _scopeFactory.CreateAsyncScope();
        
        var handlerType = typeof(IRequestHandler<,>)
            .MakeGenericType(@event.GetType(), typeof(TResult));
        
        var handler = scope.ServiceProvider.GetService(handlerType);
        
        if (handler is IRequestHandler baseHandler)
        {
            var result = await baseHandler.HandleAsync(@event, ct);
            return (TResult)result!;
        }
        
        return default!;
    }

    private async Task SafeHandleAsync(
        IEventHandler handler, 
        IEvent @event, 
        CancellationToken ct)
    {
        try
        {
            await handler.HandleAsync(@event, ct);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Handler {Handler} failed", handler.GetType().Name);
            // Don't rethrow - isolate failures
        }
    }
}

Register your custom processor:

builder.Services.AddEventBus<CustomEventProcessor>(
    builder.Configuration,
    [typeof(Program).Assembly]
);

Handler with Scoped Dependencies

public class OrderHandler : IEventHandler<OrderCreatedEvent>
{
    private readonly IOrderRepository _repository;
    private readonly IEmailService _emailService;
    private readonly ILogger _logger;

    public OrderHandler(
        IOrderRepository repository,
        IEmailService emailService,
        ILogger<OrderHandler> logger)
    {
        _repository = repository;
        _emailService = emailService;
        _logger = logger;
    }

    public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
    {
        var order = await _repository.GetByIdAsync(@event.OrderId);
        
        if (order != null)
        {
            await _emailService.SendConfirmationAsync(order.CustomerId, order);
            _logger.LogInformation("Order confirmation sent for {OrderId}", order.Id);
        }
    }
}

Conditional Handling

Handlers can implement conditional logic:

public class PremiumCustomerHandler : IEventHandler<OrderCreatedEvent>
{
    private readonly ICustomerService _customerService;

    public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
    {
        var customer = await _customerService.GetByIdAsync(@event.CustomerId);
        
        // Only process for premium customers
        if (customer.IsPremium)
        {
            // Apply special premium processing
            await ApplyPremiumBenefitsAsync(customer, @event);
        }
    }
}

๐ŸŽฏ Best Practices

1. Keep Handlers Focused

Each handler should have a single responsibility:

// โœ… Good - Single responsibility
public class SendOrderEmailHandler : IEventHandler<OrderCreatedEvent>
{
    public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
    {
        await _emailService.SendOrderConfirmationAsync(@event);
    }
}

// โŒ Bad - Multiple responsibilities
public class OrderCreatedHandler : IEventHandler<OrderCreatedEvent>
{
    public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
    {
        await _emailService.SendEmail(@event);
        await _inventoryService.UpdateInventory(@event);
        await _analyticsService.TrackOrder(@event);
        await _loyaltyService.AddPoints(@event);
    }
}

2. Use Channel Mode for Production

For production workloads, use Channel mode for better performance:

{
  "EventBusSettings": {
    "EventBusType": "Channel",
    "EventProcessorCapacity": 10
  }
}

3. Handle Exceptions Gracefully

Don't let one handler failure affect others:

public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
    try
    {
        await _service.ProcessOrderAsync(@event);
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "Failed to process order {OrderId}", @event.OrderId);
        // Consider: Dead letter queue, retry logic, alerting, etc.
    }
}

4. Use Descriptive Event Names

// โœ… Good
public class OrderCreatedEvent : IEvent { }
public class PaymentProcessedEvent : IEvent { }
public class InventoryReservedEvent : IEvent { }

// โŒ Bad
public class Event1 : IEvent { }
public class DataChanged : IEvent { }

5. Leverage Scheduled Events

Use scheduling for time-based workflows:

// Schedule reminder 7 days before expiry
var reminderDate = subscription.ExpiryDate.AddDays(-7);
await _eventBus.ScheduleAsync(
    new SubscriptionExpiryReminderEvent { SubscriptionId = subscription.Id },
    reminderDate
);

6. Test with Both Modes

Test with General mode for easier debugging, then use Channel mode for production:

// appsettings.Development.json
{
  "EventBusSettings": {
    "EventBusType": "General"
  }
}

// appsettings.Production.json
{
  "EventBusSettings": {
    "EventBusType": "Channel",
    "EventProcessorCapacity": 20
  }
}

7. Monitor Event Processing

Add structured logging to track event flow:

public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
    using var scope = _logger.BeginScope(new Dictionary<string, object>
    {
        ["OrderId"] = @event.OrderId,
        ["EventType"] = nameof(OrderCreatedEvent)
    });
    
    _logger.LogInformation("Processing order creation");
    await ProcessOrderAsync(@event);
    _logger.LogInformation("Order processing completed");
}

๐Ÿ› ๏ธ Troubleshooting

Events Not Processing

  1. Check Configuration: Ensure EventBusSettings is in appsettings.json
  2. Verify Registration: Handlers must be in assemblies passed to AddEventBus()
  3. Check Logs: Look for errors in application logs

Scheduled Events Not Executing

  1. Verify DefaultEventProcessor: Use the parameterless AddEventBus() overload
  2. Check Time: Ensure scheduled time is in the future
  3. Review Logs: Check ScheduledEventProcessingHostedService logs

Performance Issues

  1. Increase Capacity: Adjust EventProcessorCapacity based on load
  2. Use Channel Mode: Ensure using "Channel" mode for production
  3. Optimize Handlers: Profile slow handlers and optimize them

๐Ÿ“š Additional Resources

๐Ÿค Contributing

Contributions are welcome! Please see ARCHITECTURE.md to understand the codebase, then:

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Submit a pull request

๐Ÿ“„ License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.


Made with โค๏ธ by Softoverse

Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
10.9.0 118 4/25/2026
10.8.0 130 2/16/2026
10.7.0 118 2/7/2026
10.6.0 109 2/6/2026
10.5.0 121 1/25/2026
1.0.0 231 11/15/2025