Muflone.Transport.RabbitMQ 10.1.0

dotnet add package Muflone.Transport.RabbitMQ --version 10.1.0
                    
NuGet\Install-Package Muflone.Transport.RabbitMQ -Version 10.1.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Muflone.Transport.RabbitMQ" Version="10.1.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Muflone.Transport.RabbitMQ" Version="10.1.0" />
                    
Directory.Packages.props
<PackageReference Include="Muflone.Transport.RabbitMQ" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Muflone.Transport.RabbitMQ --version 10.1.0
                    
#r "nuget: Muflone.Transport.RabbitMQ, 10.1.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Muflone.Transport.RabbitMQ@10.1.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Muflone.Transport.RabbitMQ&version=10.1.0
                    
Install as a Cake Addin
#tool nuget:?package=Muflone.Transport.RabbitMQ&version=10.1.0
                    
Install as a Cake Tool

Muflone.Transport.RabbitMQ

NuGet License: MIT

Muflone extension to manage queues and topics on RabbitMQ, designed for CQRS and Event Sourcing architectures.

Breaking Changes

v10.1.0 — Queue and Exchange Durability

ACTION REQUIRED before upgrading to v10.1.0

All exchanges and queues are now declared as durable (persisted to disk and survived broker restarts). Previously they were non-durable/transient.

RabbitMQ does not allow redeclaring an existing queue or exchange with different durability settings — the broker will reject the connection with a channel error.

Before starting your application with v10.1.0 you must delete all existing queues and exchanges that were created by a previous version of this library. You can do this via:

  • The RabbitMQ Management UI (http://localhost:15672) — navigate to Queues and Exchanges and delete each one.
  • The RabbitMQ CLI: rabbitmqctl delete_queue <queue-name> / rabbitmqctl delete_exchange <exchange-name>.
  • The HTTP API: DELETE /api/queues/%2F/<queue-name> / DELETE /api/exchanges/%2F/<exchange-name>.

Once deleted, the library will recreate them with the correct durable settings on startup.


Features

  • CQRS Support - Separate commands (Direct exchange) and events (Topic exchange) with dedicated routing
  • Built-in Resilience - Automatic retry policies via Polly for both connections and message publishing
  • OpenTelemetry Tracing - Distributed tracing with W3C Trace Context propagation across service boundaries
  • Durable Messaging - Exchanges, queues, and messages are persisted to disk for reliability
  • Async-first - All operations use async/await
  • .NET 10 - Built for net10.0 with nullable reference types

Install

dotnet add package Muflone.Transport.RabbitMQ

or

Install-Package Muflone.Transport.RabbitMQ

Architecture Overview

The library uses two RabbitMQ exchange types to separate concerns:

Concept Exchange Type Routing Queue Behavior
Commands Direct One-to-one Exclusive, durable
Events Topic One-to-many Shared across consumers, durable

Commands are sent to a Direct exchange and delivered to a single consumer (point-to-point). Domain Events and Integration Events are published to a Topic exchange and can be consumed by multiple subscribers (pub/sub).

Quick Start

1. Define your messages

Commands and events must extend the Muflone base classes:

public class CreateOrder : Command
{
    public readonly string OrderNumber;

    public CreateOrder(OrderId aggregateId, string orderNumber)
        : base(aggregateId)
    {
        OrderNumber = orderNumber;
    }
}

public class OrderCreated : DomainEvent
{
    public readonly string OrderNumber;

    public OrderCreated(OrderId aggregateId, string orderNumber)
        : base(aggregateId)
    {
        OrderNumber = orderNumber;
    }
}

2. Create command and event handlers

public class CreateOrderCommandHandler : ICommandHandlerAsync<CreateOrder>
{
    private readonly IRepository _repository;

    public CreateOrderCommandHandler(IRepository repository)
    {
        _repository = repository;
    }

    public async Task HandleAsync(CreateOrder command, CancellationToken cancellationToken = default)
    {
        // Handle the command: load aggregate, apply business logic, persist
        var order = Order.CreateOrder(command.AggregateId, command.OrderNumber);
        await _repository.SaveAsync(order, Guid.NewGuid(), cancellationToken);
    }
}

public class OrderCreatedEventHandler : IDomainEventHandlerAsync<OrderCreated>
{
    private readonly ILogger _logger;

    public OrderCreatedEventHandler(ILoggerFactory loggerFactory)
    {
        _logger = loggerFactory.CreateLogger(GetType());
    }

    public Task HandleAsync(OrderCreated @event, CancellationToken cancellationToken = default)
    {
        _logger.LogInformation("Order created: {OrderNumber}", @event.OrderNumber);
        return Task.CompletedTask;
    }
}

3. Create consumers

Consumers wire messages to their handlers. Extend the appropriate base class:

// Command consumer (one handler per command)
public class CreateOrderConsumer : CommandConsumerBase<CreateOrder>
{
    protected override ICommandHandlerAsync<CreateOrder> HandlerAsync { get; }

    public CreateOrderConsumer(
        IRepository repository,
        IRabbitMQConnectionFactory connectionFactory,
        ILoggerFactory loggerFactory)
        : base(repository, connectionFactory, loggerFactory)
    {
        HandlerAsync = new CreateOrderCommandHandler(repository);
    }
}

// Domain event consumer (supports multiple handlers per event)
public class OrderCreatedConsumer : DomainEventsConsumerBase<OrderCreated>
{
    protected override IEnumerable<IDomainEventHandlerAsync<OrderCreated>> HandlersAsync { get; }

    public OrderCreatedConsumer(
        IRabbitMQConnectionFactory connectionFactory,
        ILoggerFactory loggerFactory)
        : base(connectionFactory, loggerFactory)
    {
        HandlersAsync = new List<IDomainEventHandlerAsync<OrderCreated>>
        {
            new OrderCreatedEventHandler(loggerFactory)
        };
    }
}

// Integration event consumer (for cross-boundary events)
public class OrderShippedConsumer : IntegrationEventsConsumerBase<OrderShipped>
{
    protected override IEnumerable<IIntegrationEventHandlerAsync<OrderShipped>> HandlersAsync { get; }

    public OrderShippedConsumer(
        IRabbitMQConnectionFactory connectionFactory,
        ILoggerFactory loggerFactory)
        : base(connectionFactory, loggerFactory)
    {
        HandlersAsync = new List<IIntegrationEventHandlerAsync<OrderShipped>>
        {
            new OrderShippedIntegrationHandler(loggerFactory)
        };
    }
}

4. Register services in DI

var builder = WebApplication.CreateBuilder(args);

// Configure RabbitMQ connection
var rabbitMQConfiguration = new RabbitMQConfiguration(
    hostName: "localhost",
    userName: "guest",
    password: "guest",
    exchangeCommandsName: "Muflone.Commands",
    exchangeEventsName: "Muflone.Events",
    clientId: "OrderService"
);

// Register handlers
builder.Services.AddScoped<ICommandHandlerAsync<CreateOrder>, CreateOrderCommandHandler>();
builder.Services.AddScoped<IDomainEventHandlerAsync<OrderCreated>, OrderCreatedEventHandler>();

// Register Muflone RabbitMQ transport
builder.Services.AddMufloneTransportRabbitMQ(loggerFactory, rabbitMQConfiguration);

5. Send commands and publish events

// Inject IServiceBus to send commands
public class OrderController : ControllerBase
{
    private readonly IServiceBus _serviceBus;

    public OrderController(IServiceBus serviceBus)
    {
        _serviceBus = serviceBus;
    }

    [HttpPost]
    public async Task<IActionResult> CreateOrder(CreateOrderRequest request)
    {
        var command = new CreateOrder(
            new OrderId(Guid.NewGuid()),
            request.OrderNumber);

        await _serviceBus.SendAsync(command);
        return Accepted();
    }
}

// Inject IEventBus to publish events
public class OrderService
{
    private readonly IEventBus _eventBus;

    public OrderService(IEventBus eventBus)
    {
        _eventBus = eventBus;
    }

    public async Task NotifyOrderCreated(OrderId orderId, string orderNumber)
    {
        var @event = new OrderCreated(orderId, orderNumber);
        await _eventBus.PublishAsync(@event);
    }
}

Configuration

RabbitMQConfiguration supports several constructor overloads:

// Basic (default retry delay: 30s, default vhost: "/")
new RabbitMQConfiguration(hostName, userName, password,
    exchangeCommandsName, exchangeEventsName, clientId);

// With custom retry delay
new RabbitMQConfiguration(hostName, userName, password,
    TimeSpan.FromSeconds(60),
    exchangeCommandsName, exchangeEventsName, clientId);

// With virtual host and custom retry delay
new RabbitMQConfiguration(hostName, "/my-vhost", userName, password,
    TimeSpan.FromSeconds(60),
    exchangeCommandsName, exchangeEventsName, clientId);
Parameter Description Default
hostName RabbitMQ server hostname -
vhost RabbitMQ virtual host "/"
userName Authentication username -
password Authentication password -
retryDelay Delay between connection retry attempts 30s
exchangeCommandsName Name of the Direct exchange for commands -
exchangeEventsName Name of the Topic exchange for events -
clientId Unique identifier for this service (used in queues) -

OpenTelemetry Integration

The library includes built-in distributed tracing using System.Diagnostics.ActivitySource. Trace context is automatically propagated through RabbitMQ message headers using the W3C Trace Context standard (traceparent / tracestate).

The easiest way to enable full distributed tracing is to install the Muflone.OpenTelemetry package:

dotnet add package Muflone.OpenTelemetry

Then register it with just two lines of code:

builder.Services.AddOpenTelemetry()
    .WithTracing(tracing => tracing
        .AddMufloneInstrumentation()  // registers all Muflone activity sources
        .AddOtlpExporter());

builder.Services.AddMufloneOpenTelemetry();  // decorates IServiceBus, IEventBus, and IRepository with instrumented wrappers

This automatically instruments command handlers, event handlers, service bus, event bus, and repository operations with zero changes to your existing code.

Manual Configuration

If you prefer to configure tracing manually without the Muflone.OpenTelemetry package, you can register the activity sources individually:

builder.Services.AddOpenTelemetry()
    .WithTracing(tracing => tracing
        .AddSource("Muflone.ServiceBus")
        .AddSource("Muflone.EventBus")
        .AddOtlpExporter());

Two activity sources are available:

Activity Source Operations
Muflone.ServiceBus Command send
Muflone.EventBus Event publish

You can also pass trace context manually through UserProperties when sending commands or publishing events from within a handler:

var command = new CreateOrder(aggregateId, orderNumber);
command.UserProperties["traceparent"] = Activity.Current?.Id;
command.UserProperties["tracestate"] = Activity.Current?.TraceStateString;

await serviceBus.SendAsync(command);

Resilience

The library uses Polly for resilience at two levels:

  • Connection: 5 retries with exponential backoff on connection failures, with automatic reconnection on shutdown or channel exceptions
  • Message publishing: 3 retries with exponential backoff (2, 4, 8 seconds) for both SendAsync and PublishAsync

Registered Services

AddMufloneTransportRabbitMQ registers the following services as singletons:

Interface Implementation
IRabbitMQConnectionFactory RabbitMQConnectionFactory
IServiceBus ServiceBus
IEventBus ServiceBus
IMessageSubscriber RabbitMQSubscriber
IHostedService RabbitMQStarter
IHostedService MessageHandlersStarter

Fully working example

You can find a fully working example here: https://github.com/BrewUp/

License

This project is licensed under the MIT License.

Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on Muflone.Transport.RabbitMQ:

Package Downloads
Muflone.Transport.RabbitMQ.Saga

Muflone saga implementation for RabbitMQ

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
10.1.0 84 3/1/2026
10.1.0-rc2 88 2/12/2026
10.1.0-rc1 83 2/12/2026
10.0.2 210 2/11/2026
10.0.1 325 12/18/2025
8.5.0 768 10/2/2025
8.5.0-beta.1 297 9/4/2025
8.4.0 1,305 5/20/2025
8.3.2-alpha 241 5/16/2025
8.3.1-alpha 194 5/8/2025
8.2.1 564 4/8/2025
8.2.0 1,494 12/4/2024
8.1.1 1,063 9/9/2024 8.1.1 is deprecated because it has critical bugs.
8.0.3 287 8/2/2024
8.0.2 194 8/1/2024
8.0.1 175 8/1/2024
8.0.0 180 8/1/2024
6.2.3 502 10/24/2023
6.2.2 274 7/10/2023
Loading failed

Fix durability on Queues -> It is important that you delete your existing active queues in RabbitMQ before updating.
Improve support to Muflone.OpenTelemetry