Dosaic.Plugins.Messaging.MassTransit 1.2.7

dotnet add package Dosaic.Plugins.Messaging.MassTransit --version 1.2.7
                    
NuGet\Install-Package Dosaic.Plugins.Messaging.MassTransit -Version 1.2.7
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Dosaic.Plugins.Messaging.MassTransit" Version="1.2.7" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Dosaic.Plugins.Messaging.MassTransit" Version="1.2.7" />
                    
Directory.Packages.props
<PackageReference Include="Dosaic.Plugins.Messaging.MassTransit" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Dosaic.Plugins.Messaging.MassTransit --version 1.2.7
                    
#r "nuget: Dosaic.Plugins.Messaging.MassTransit, 1.2.7"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Dosaic.Plugins.Messaging.MassTransit@1.2.7
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Dosaic.Plugins.Messaging.MassTransit&version=1.2.7
                    
Install as a Cake Addin
#tool nuget:?package=Dosaic.Plugins.Messaging.MassTransit&version=1.2.7
                    
Install as a Cake Tool

Dosaic.Plugins.Messaging.MassTransit

Dosaic.Plugins.Messaging.MassTransit is a plugin that provides message-bus capabilities for Dosaic applications using MassTransit over RabbitMQ (or an in-memory transport for integration tests). It auto-discovers all IMessageConsumer<T> implementations at startup, wires them to their queues, and exposes a simple IMessageBus abstraction for sending and scheduling messages.

Installation

dotnet add package Dosaic.Plugins.Messaging.MassTransit

Or add a package reference to your .csproj:

<PackageReference Include="Dosaic.Plugins.Messaging.MassTransit" Version="" />

Configuration

The plugin is activated automatically by the Dosaic plugin system. Configure it in your appsettings.yml (or appsettings.json) under the messageBus key:

messageBus:
  host: localhost
  port: 5672
  vhost: /
  username: rabbitmq
  password: rabbitmq
  useRetry: false
  maxRetryCount: 3
  retryDelaySeconds: 30
  maxRedeliveryCount: 3
  redeliveryDelaySeconds: 30
  deduplication: false
  useInMemory: false
Property Type Default Description
host string "" RabbitMQ hostname
port ushort 5672 RabbitMQ AMQP port
vhost string "/" RabbitMQ virtual host
username string RabbitMQ username (optional)
password string RabbitMQ password (optional)
useRetry bool false Enable immediate message retry on consumer failure
maxRetryCount int 3 Number of immediate retry attempts
retryDelaySeconds int 30 Delay between immediate retries (seconds)
maxRedeliveryCount int 3 Number of delayed redelivery attempts
redeliveryDelaySeconds int 30 Delay between redelivery attempts (seconds)
deduplication bool false Enable message deduplication via x-deduplication-header
useInMemory bool false Use in-memory transport instead of RabbitMQ (useful for testing)

Note: When useInMemory is true, all RabbitMQ settings are ignored.

Usage

Defining Messages

All messages must implement IMessage from Dosaic.Plugins.Messaging.Abstractions:

using Dosaic.Plugins.Messaging.Abstractions;

// Simple message
public record OrderPlaced(Guid OrderId, decimal Total) : IMessage;

// Generic message — queue name becomes "Notification-String"
public record Notification<T>(T Payload) : IMessage;

Queue Name Resolution

Queue names are resolved automatically from the message type name. You can override this with [QueueName]:

using Dosaic.Plugins.Messaging.Abstractions;
using Dosaic.Plugins.Messaging.MassTransit;

// Queue name: "OrderPlaced" (from type name)
public record OrderPlaced(Guid OrderId) : IMessage;

// Queue name: "order-placed-v2" (from attribute)
[QueueName("order-placed-v2")]
public record OrderPlacedV2(Guid OrderId) : IMessage;

// Queue name: "Notification-String" (generic type segments joined with "-")
public record Notification<T>(T Payload) : IMessage;

Implementing Consumers

Implement IMessageConsumer<TMessage> for each message type you want to handle. Multiple consumers for the same message type are supported — they are all invoked concurrently:

using Dosaic.Plugins.Messaging.Abstractions;

public class OrderPlacedConsumer : IMessageConsumer<OrderPlaced>
{
    public async Task ProcessAsync(OrderPlaced message, CancellationToken cancellationToken)
    {
        Console.WriteLine($"Order {message.OrderId} placed for {message.Total}");
    }
}

// Second consumer on the same queue — both are called for every message
public class OrderPlacedAuditConsumer : IMessageConsumer<OrderPlaced>
{
    public async Task ProcessAsync(OrderPlaced message, CancellationToken cancellationToken)
    {
        Console.WriteLine($"Audit: order {message.OrderId}");
    }
}

Consumers are auto-discovered via the Dosaic source generator and registered automatically — no manual registration is required.

Sending Messages

Inject IMessageBus and call SendAsync:

using Dosaic.Plugins.Messaging.Abstractions;

public class OrderService(IMessageBus bus)
{
    public async Task PlaceOrderAsync(Guid orderId, decimal total, CancellationToken ct)
    {
        await bus.SendAsync(new OrderPlaced(orderId, total), cancellationToken: ct);
    }
}

You can also send with custom headers:

var headers = new Dictionary<string, string>
{
    ["x-correlation-id"] = correlationId,
    ["x-tenant-id"] = tenantId
};
await bus.SendAsync(new OrderPlaced(orderId, total), headers, ct);

If no consumer is registered for the message type, SendAsync is a no-op (no exception is thrown).

Scheduling Messages

Schedule a message to be delivered after a delay or at an absolute time. Scheduling requires IMessageScheduler to be available in the container (provided by MassTransit's delayed-message scheduler):

// Deliver in 10 minutes
await bus.ScheduleAsync(new OrderPlaced(orderId, total), TimeSpan.FromMinutes(10), cancellationToken: ct);

// Deliver at a specific UTC time
await bus.ScheduleAsync(new OrderPlaced(orderId, total), DateTime.UtcNow.AddHours(2), cancellationToken: ct);

Message Deduplication

Enable deduplication in configuration (deduplication: true). By default, deduplicate keys are derived from a SHA-256 hash of the JSON-serialised message body prefixed with the full type name.

You can register a custom key factory per message type via IMessageDeduplicateKeyProvider:

using Dosaic.Plugins.Messaging.MassTransit;

public class MyStartup(IMessageDeduplicateKeyProvider dedup)
{
    public void Configure()
    {
        // Use a stable business key instead of JSON hash
        dedup.Register<OrderPlaced>(msg => $"order:{msg.OrderId}");
    }
}

The deduplication key is written to the x-deduplication-header AMQP header. You must pair this with a RabbitMQ deduplication plugin on the broker side.

Advanced Bus Configuration via IMessageBusConfigurator

Implement IMessageBusConfigurator to hook into the MassTransit configuration pipeline. Any number of configurators are discovered automatically and applied in order:

using Dosaic.Plugins.Messaging.MassTransit;
using MassTransit;

public class MyBusConfigurator : IMessageBusConfigurator
{
    // Called once during IBusRegistrationConfigurator setup
    public void ConfigureMassTransit(IBusRegistrationConfigurator opts)
    {
        opts.AddRequestClient<OrderPlaced>();
    }

    // Called for the RabbitMQ bus factory (not invoked when useInMemory = true)
    public void ConfigureRabbitMq(IBusRegistrationContext context, IRabbitMqBusFactoryConfigurator config)
    {
        config.UseMessageData(/* ... */);
    }

    // Called for each receive endpoint (queue)
    public void ConfigureReceiveEndpoint(IBusRegistrationContext context, Uri queue,
        IRabbitMqReceiveEndpointConfigurator configurator)
    {
        configurator.PrefetchCount = 10;
    }
}

Features

  • Auto-discovery — all IMessageConsumer<T> implementations in the application are found at startup without explicit registration.
  • Multiple consumers per queue — any number of consumers may handle the same message type; they are run concurrently, and all exceptions are collected and re-thrown as an AggregateException.
  • Built-in retry — configurable immediate retry (useRetry, maxRetryCount, retryDelaySeconds) and delayed redelivery (maxRedeliveryCount, redeliveryDelaySeconds) using MassTransit middleware.
  • Per-consumer Polly retry — each consumer call is wrapped in a WaitAndRetryAsync Polly policy with up to 2 exponential back-off attempts before the error is propagated to MassTransit's error queue (QUEUE_NAME_error).
  • Scheduled sending — send messages at a future point in time using ScheduleAsync with a TimeSpan or DateTime.
  • Custom message headers — pass arbitrary headers via IDictionary<string, string> overloads of SendAsync / ScheduleAsync.
  • Message deduplication — SHA-256 content hash (or custom factory) written to x-deduplication-header.
  • Queue name customisation — automatic resolution from type name; [QueueName("...")] attribute for explicit overrides; generic types produce hyphen-joined segment names.
  • In-memory transport — set useInMemory: true for integration tests without a running RabbitMQ broker.
  • OpenTelemetry tracing — MassTransit activity source is registered automatically with the OpenTelemetry TracerProvider.
  • Health checks — a MassTransit health check named message-bus is registered under the readiness tag; reports Unhealthy on failure.
  • ExtensibleIMessageBusConfigurator allows full access to the MassTransit configuration API for advanced scenarios.
Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.2.7 64 3/4/2026
1.2.6 87 2/19/2026
1.2.5 83 2/17/2026
1.2.4 106 2/13/2026
1.2.3 113 1/27/2026
1.2.2 303 12/16/2025
1.2.1 274 12/16/2025
1.2.0 435 12/11/2025
1.1.21 453 12/10/2025
1.1.20 418 11/18/2025
1.1.19 298 11/11/2025
1.1.18 221 10/14/2025
1.1.17 205 10/1/2025
1.1.16 214 9/25/2025
1.1.15 195 9/24/2025
1.1.14 209 9/24/2025
1.1.13 200 9/24/2025
1.1.12 329 9/16/2025
1.1.11 181 7/18/2025
1.1.10 167 7/18/2025
Loading failed