Indiko.Blocks.EventBus.RabbitMQ 2.1.2

dotnet add package Indiko.Blocks.EventBus.RabbitMQ --version 2.1.2
                    
NuGet\Install-Package Indiko.Blocks.EventBus.RabbitMQ -Version 2.1.2
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Indiko.Blocks.EventBus.RabbitMQ" Version="2.1.2" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Indiko.Blocks.EventBus.RabbitMQ" Version="2.1.2" />
                    
Directory.Packages.props
<PackageReference Include="Indiko.Blocks.EventBus.RabbitMQ" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Indiko.Blocks.EventBus.RabbitMQ --version 2.1.2
                    
#r "nuget: Indiko.Blocks.EventBus.RabbitMQ, 2.1.2"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Indiko.Blocks.EventBus.RabbitMQ@2.1.2
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Indiko.Blocks.EventBus.RabbitMQ&version=2.1.2
                    
Install as a Cake Addin
#tool nuget:?package=Indiko.Blocks.EventBus.RabbitMQ&version=2.1.2
                    
Install as a Cake Tool

Indiko.Blocks.EventBus.RabbitMQ

RabbitMQ-based distributed event bus implementation for microservices and scalable event-driven architectures.

Overview

This package provides a production-ready RabbitMQ implementation of the event bus abstractions, enabling reliable message delivery across distributed systems with features like persistence, acknowledgments, and automatic reconnection.

Features

  • RabbitMQ Integration: Full RabbitMQ messaging support via EasyNetQ
  • Distributed Events: Publish-subscribe across multiple services
  • Durable Messages: Persistent message storage
  • Automatic Reconnection: Handles connection failures gracefully
  • Queue Management: Automatic queue creation and routing
  • Message Acknowledgment: Reliable message delivery
  • Dead Letter Queues: Failed message handling
  • Multiple Consumers: Scale horizontally with competing consumers
  • Auto-Discovery: Automatic handler registration from DI container

Installation

dotnet add package Indiko.Blocks.EventBus.RabbitMQ

Prerequisites

  • RabbitMQ server (3.8+)
  • Management plugin enabled (optional, for monitoring)

Quick Start

Install RabbitMQ

Docker
docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  rabbitmq:3-management
Local Installation

Configure Services

using Indiko.Blocks.EventBus.RabbitMQ;

public class Startup : WebStartup
{
    public override void ConfigureServices(IServiceCollection services)
    {
        base.ConfigureServices(services);
        
        // Configure RabbitMQ event bus
        services.AddRabbitMQEventBus(options =>
        {
            options.Host = Configuration["RabbitMQ:Host"];
            options.Port = Configuration.GetValue<int>("RabbitMQ:Port");
            options.Username = Configuration["RabbitMQ:Username"];
            options.Password = Configuration["RabbitMQ:Password"];
            options.VirtualHost = Configuration["RabbitMQ:VirtualHost"];
            options.ReConnectOnConnectionLost = true;
        });
        
        // Register event handlers
        services.AddScoped<IEventHandler<UserCreatedEvent>, SendWelcomeEmailHandler>();
        services.AddScoped<IEventHandler<OrderPlacedEvent>, ProcessOrderHandler>();
    }
}

Configuration (appsettings.json)

{
  "RabbitMQ": {
    "Host": "localhost",
    "Port": 5672,
    "Username": "guest",
    "Password": "guest",
    "VirtualHost": "/",
    "PrefetchCount": 10,
    "Timeout": 30,
    "ReConnectOnConnectionLost": true,
    "Product": "MyApplication",
    "Platform": ".NET 10"
  }
}

Define Events

using Indiko.Blocks.EventBus.Abstractions.Interfaces;

public class UserCreatedEvent : IEvent
{
    public Guid UserId { get; set; }
    public string Email { get; set; }
    public string FirstName { get; set; }
    public string LastName { get; set; }
    public DateTime CreatedAt { get; set; }
}

public class OrderPlacedEvent : IEvent
{
    public Guid OrderId { get; set; }
    public Guid UserId { get; set; }
    public List<OrderItem> Items { get; set; }
    public decimal TotalAmount { get; set; }
    public DateTime OrderDate { get; set; }
}

Implement Handlers

Service A - User Service

public class UserService
{
    private readonly IEventBus _eventBus;
    private readonly IUserRepository _userRepository;

    public async Task<User> CreateUserAsync(CreateUserDto dto)
    {
        var user = new User
        {
            Id = Guid.NewGuid(),
            Email = dto.Email,
            FirstName = dto.FirstName,
            LastName = dto.LastName,
            CreatedAt = DateTime.UtcNow
        };
        
        await _userRepository.AddAsync(user);
        
        // Publish event to RabbitMQ
        await _eventBus.PublishAsync(new UserCreatedEvent
        {
            UserId = user.Id,
            Email = user.Email,
            FirstName = user.FirstName,
            LastName = user.LastName,
            CreatedAt = user.CreatedAt
        });
        
        return user;
    }
}

Service B - Email Service

public class SendWelcomeEmailHandler : IEventHandler<UserCreatedEvent>
{
    private readonly IEmailService _emailService;
    private readonly ILogger<SendWelcomeEmailHandler> _logger;

    public SendWelcomeEmailHandler(IEmailService emailService, ILogger<SendWelcomeEmailHandler> logger)
    {
        _emailService = emailService;
        _logger = logger;
    }

    public async Task HandleAsync(UserCreatedEvent @event, CancellationToken cancellationToken = default)
    {
        _logger.LogInformation($"Sending welcome email to {event.Email}");
        
        try
        {
            await _emailService.SendWelcomeEmailAsync(
                @event.Email,
                @event.FirstName,
                cancellationToken
            );
            
            _logger.LogInformation($"Welcome email sent to {event.Email}");
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, $"Failed to send welcome email to {event.Email}");
            throw; // RabbitMQ will retry or move to dead letter queue
        }
    }
}

Service C - Analytics Service

public class TrackUserRegistrationHandler : IEventHandler<UserCreatedEvent>
{
    private readonly IAnalyticsService _analyticsService;

    public async Task HandleAsync(UserCreatedEvent @event, CancellationToken cancellationToken = default)
    {
        await _analyticsService.TrackEventAsync(new AnalyticsEvent
        {
            EventType = "UserRegistration",
            UserId = @event.UserId,
            Timestamp = @event.CreatedAt,
            Properties = new Dictionary<string, object>
            {
                { "email_domain", @event.Email.Split('@')[1] }
            }
        });
    }
}

How It Works

Automatic Handler Registration

The RabbitMQ event bus automatically discovers and registers all handlers from the DI container:

public void AddAllHandlersFromServiceProvider(IServiceProvider serviceProvider)
{
    // Scans all assemblies for IEventHandler<TEvent> implementations
    var eventHandlerTypes = AppDomain.CurrentDomain.GetAssemblies()
        .SelectMany(a => a.GetTypes())
        .Where(t => t.GetInterfaces()
            .Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IEventHandler<>)));

    // Registers each handler with RabbitMQ
    foreach (var handlerType in eventHandlerTypes)
    {
        var handler = serviceProvider.GetService(handlerType);
        if (handler != null)
        {
            RegisterEventHandler(handler);
        }
    }
}

Queue Naming Convention

Each event type gets its own queue:

UserCreatedEvent ? UserCreatedEvent_Queue
OrderPlacedEvent ? OrderPlacedEvent_Queue
PaymentProcessedEvent ? PaymentProcessedEvent_Queue

Message Flow

Publisher (Service A)
    ?
[RabbitMQ Exchange]
    ?
[UserCreatedEvent_Queue]
    ?
Consumers (Services B, C, D...)

Connection Management

Auto-Reconnection

private void Advanced_Disconnected(object sender, DisconnectedEventArgs e)
{
    _logger.LogWarning($"Disconnected from RabbitMQ: {e.Reason}");

    if (_options.ReConnectOnConnectionLost)
    {
        _logger.LogInformation("Reconnecting to RabbitMQ...");
        UnRegisterAllEventHandlers();
        AddAllHandlersFromServiceProvider(_serviceProvider);
        _logger.LogInformation("Reconnected to RabbitMQ");
    }
}

Connection Events

_bus.Advanced.Connected += Advanced_Connected;
_bus.Advanced.Disconnected += Advanced_Disconnected;
_bus.Advanced.MessageReturned += Advanced_MessageReturned;

Advanced Configurations

Multiple RabbitMQ Instances

{
  "RabbitMQ": {
    "Host": "rabbitmq-cluster.example.com",
    "Port": 5672,
    "Username": "app-user",
    "Password": "secure-password",
    "VirtualHost": "/production",
    "Timeout": 60,
    "PrefetchCount": 20
  }
}

Connection String Builder

var connectionString = RabbitConnectionStringBuilder
    .Init(options)
    .Build();

// Produces: host=localhost:5672;virtualHost=/;username=guest;password=guest

Manual Handler Registration

public void Configure(IApplicationBuilder app, IEventBus eventBus, IServiceProvider services)
{
    // Manual registration if needed
    var handler = services.GetRequiredService<IEventHandler<UserCreatedEvent>>();
    eventBus.RegisterEventHandler(handler);
}

Error Handling and Retries

Automatic Retries

RabbitMQ automatically retries failed messages based on configuration:

public class ResilientEventHandler : IEventHandler<OrderPlacedEvent>
{
    public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken cancellationToken)
    {
        try
        {
            await ProcessOrderAsync(@event);
        }
        catch (TransientException ex)
        {
            // Throw to trigger RabbitMQ retry
            _logger.LogWarning($"Transient error, will retry: {ex.Message}");
            throw;
        }
        catch (PermanentException ex)
        {
            // Log and return (don't throw) to acknowledge message
            _logger.LogError($"Permanent error, message will be dropped: {ex.Message}");
            // Message is acknowledged and won't be retried
        }
    }
}

Dead Letter Queue

Configure dead letter queues for failed messages:

services.AddRabbitMQEventBus(options =>
{
    options.Host = "localhost";
    options.DeadLetterExchange = "dlx";
    options.DeadLetterQueue = "failed_messages";
});

Scaling

Horizontal Scaling

Run multiple instances of your service - RabbitMQ distributes messages:

Service Instance 1 ??
Service Instance 2 ???? [UserCreatedEvent_Queue] ?? RabbitMQ
Service Instance 3 ??

Each message is delivered to only ONE instance (competing consumers).

Message Prefetch

Control how many messages each consumer processes at once:

{
  "RabbitMQ": {
    "PrefetchCount": 10
  }
}

Monitoring

RabbitMQ Management UI

Access at: http://localhost:15672

  • Default credentials: guest/guest
  • View queues, exchanges, message rates
  • Monitor consumer connections

Application Logging

_logger.LogInformation("Connected to RabbitMQ at {host}:{port}", options.Host, options.Port);
_logger.LogDebug("Registered event handler {handler} for {event}", handlerType, eventType);
_logger.LogWarning("Disconnected from RabbitMQ: {reason}", disconnectReason);

Best Practices

  1. Idempotent Handlers: Design handlers to be idempotent (handle duplicates)
  2. Event Versioning: Plan for event schema evolution
  3. Correlation IDs: Include correlation IDs for tracing
  4. Small Events: Keep event payloads small
  5. Error Handling: Distinguish transient vs permanent errors
  6. Connection Pooling: Reuse connections
  7. Monitoring: Set up alerts for queue depths

Deployment

Docker Compose

version: '3.8'
services:
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: password
    volumes:
      - rabbitmq-data:/var/lib/rabbitmq

  user-service:
    build: ./UserService
    environment:
      RabbitMQ__Host: rabbitmq
      RabbitMQ__Username: admin
      RabbitMQ__Password: password
    depends_on:
      - rabbitmq

  email-service:
    build: ./EmailService
    environment:
      RabbitMQ__Host: rabbitmq
      RabbitMQ__Username: admin
      RabbitMQ__Password: password
    depends_on:
      - rabbitmq

volumes:
  rabbitmq-data:

Kubernetes

apiVersion: apps/v1
kind: Deployment
metadata:
  name: user-service
spec:
  replicas: 3
  template:
    spec:
      containers:
      - name: user-service
        image: user-service:latest
        env:
        - name: RabbitMQ__Host
          value: "rabbitmq-service"
        - name: RabbitMQ__Username
          valueFrom:
            secretKeyRef:
              name: rabbitmq-secret
              key: username
        - name: RabbitMQ__Password
          valueFrom:
            secretKeyRef:
              name: rabbitmq-secret
              key: password

Migration from InMemory

Simply change the registration:

// From
services.AddInMemoryEventBus();

// To
services.AddRabbitMQEventBus(Configuration);

All events and handlers work without modification!

Target Framework

  • .NET 10

Dependencies

  • Indiko.Blocks.EventBus.Abstractions
  • EasyNetQ (8.0+)
  • RabbitMQ.Client (6.0+)

License

See LICENSE file in the repository root.

  • Indiko.Blocks.EventBus.Abstractions - Core event bus abstractions
  • Indiko.Blocks.EventBus.InMemory - In-memory event bus for development
  • Indiko.Blocks.Mediation.Abstractions - CQRS and mediator pattern

Resources

Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2.1.2 269 12/18/2025
2.1.1 669 12/2/2025
2.1.0 676 12/2/2025
2.0.0 319 9/17/2025
1.7.23 184 9/8/2025
1.7.22 182 9/8/2025
1.7.21 194 8/14/2025
1.7.20 204 6/23/2025
1.7.19 202 6/3/2025
1.7.18 198 5/29/2025
1.7.17 195 5/26/2025
1.7.15 150 4/12/2025
1.7.14 162 4/11/2025
1.7.13 160 3/29/2025
1.7.12 178 3/28/2025
1.7.11 182 3/28/2025
1.7.10 194 3/28/2025
1.7.9 183 3/28/2025
1.7.8 184 3/28/2025
1.7.5 211 3/17/2025
1.7.4 204 3/16/2025
1.7.3 176 3/16/2025
1.7.2 211 3/16/2025
1.7.1 231 3/11/2025
1.6.8 232 3/11/2025
1.6.7 286 3/4/2025
1.6.6 156 2/26/2025
1.6.5 187 2/20/2025
1.6.4 179 2/20/2025
1.6.3 170 2/5/2025
1.6.2 181 1/24/2025
1.6.1 184 1/24/2025
1.6.0 142 1/16/2025
1.5.2 145 1/16/2025
1.5.1 193 11/3/2024
1.5.0 160 10/26/2024
1.3.2 178 10/24/2024
1.3.0 178 10/10/2024
1.2.5 201 10/9/2024
1.2.4 178 10/8/2024
1.2.1 163 10/3/2024
1.2.0 161 9/29/2024
1.1.1 169 9/23/2024
1.1.0 197 9/18/2024
1.0.33 209 9/15/2024
1.0.28 173 8/28/2024
1.0.27 176 8/24/2024
1.0.26 165 7/7/2024
1.0.25 171 7/6/2024
1.0.24 163 6/25/2024
1.0.23 170 6/1/2024
1.0.22 191 5/14/2024
1.0.21 156 5/14/2024
1.0.20 184 4/8/2024
1.0.19 178 4/3/2024
1.0.18 163 3/23/2024
1.0.17 199 3/19/2024
1.0.16 203 3/19/2024
1.0.15 165 3/11/2024
1.0.14 180 3/10/2024
1.0.13 178 3/6/2024
1.0.12 201 3/1/2024
1.0.11 209 3/1/2024
1.0.10 193 3/1/2024
1.0.9 186 3/1/2024
1.0.8 167 2/19/2024
1.0.7 177 2/17/2024
1.0.6 174 2/17/2024
1.0.5 183 2/17/2024
1.0.4 183 2/7/2024
1.0.3 161 2/6/2024
1.0.1 195 2/6/2024
1.0.0 232 1/9/2024
1.0.0-preview99 172 12/22/2023
1.0.0-preview98 155 12/21/2023
1.0.0-preview97 138 12/21/2023
1.0.0-preview96 163 12/20/2023
1.0.0-preview95 170 12/20/2023
1.0.0-preview94 152 12/18/2023
1.0.0-preview93 289 12/13/2023
1.0.0-preview92 151 12/13/2023
1.0.0-preview91 222 12/12/2023
1.0.0-preview90 141 12/11/2023
1.0.0-preview89 142 12/11/2023
1.0.0-preview88 246 12/6/2023
1.0.0-preview87 195 12/6/2023
1.0.0-preview86 178 12/6/2023
1.0.0-preview85 180 12/6/2023
1.0.0-preview84 166 12/5/2023
1.0.0-preview83 218 12/5/2023
1.0.0-preview82 176 12/5/2023
1.0.0-preview81 162 12/4/2023
1.0.0-preview80 163 12/1/2023
1.0.0-preview77 155 12/1/2023
1.0.0-preview76 167 12/1/2023
1.0.0-preview75 146 12/1/2023
1.0.0-preview74 185 11/26/2023
1.0.0-preview73 174 11/7/2023
1.0.0-preview72 155 11/6/2023
1.0.0-preview71 175 11/3/2023
1.0.0-preview70 160 11/2/2023
1.0.0-preview69 156 11/2/2023
1.0.0-preview68 169 11/2/2023
1.0.0-preview67 143 11/2/2023
1.0.0-preview66 149 11/2/2023
1.0.0-preview65 160 11/2/2023
1.0.0-preview64 184 11/2/2023
1.0.0-preview63 154 11/2/2023
1.0.0-preview62 142 11/1/2023
1.0.0-preview61 156 11/1/2023
1.0.0-preview60 149 11/1/2023
1.0.0-preview59 181 11/1/2023
1.0.0-preview58 171 10/31/2023
1.0.0-preview57 151 10/31/2023
1.0.0-preview56 172 10/31/2023
1.0.0-preview55 154 10/31/2023
1.0.0-preview54 147 10/31/2023
1.0.0-preview53 146 10/31/2023
1.0.0-preview52 154 10/31/2023
1.0.0-preview51 151 10/31/2023
1.0.0-preview50 159 10/31/2023
1.0.0-preview48 152 10/31/2023
1.0.0-preview46 141 10/31/2023
1.0.0-preview45 160 10/31/2023
1.0.0-preview44 150 10/31/2023
1.0.0-preview43 172 10/31/2023
1.0.0-preview42 183 10/30/2023
1.0.0-preview41 166 10/30/2023
1.0.0-preview40 159 10/27/2023
1.0.0-preview39 184 10/27/2023
1.0.0-preview38 157 10/27/2023
1.0.0-preview37 179 10/27/2023
1.0.0-preview36 134 10/27/2023
1.0.0-preview35 154 10/27/2023
1.0.0-preview34 147 10/27/2023
1.0.0-preview33 167 10/26/2023
1.0.0-preview32 174 10/26/2023
1.0.0-preview31 171 10/26/2023
1.0.0-preview30 171 10/26/2023
1.0.0-preview29 154 10/26/2023
1.0.0-preview28 168 10/26/2023
1.0.0-preview27 161 10/26/2023
1.0.0-preview26 184 10/25/2023
1.0.0-preview25 186 10/23/2023
1.0.0-preview24 160 10/23/2023
1.0.0-preview23 155 10/23/2023
1.0.0-preview22 161 10/23/2023
1.0.0-preview21 158 10/23/2023
1.0.0-preview20 163 10/20/2023
1.0.0-preview19 180 10/19/2023
1.0.0-preview18 161 10/18/2023
1.0.0-preview16 168 10/11/2023
1.0.0-preview14 194 10/10/2023
1.0.0-preview13 167 10/10/2023
1.0.0-preview12 149 10/9/2023
1.0.0-preview101 160 1/5/2024