Dosaic.Plugins.Messaging.MassTransit
1.2.7
dotnet add package Dosaic.Plugins.Messaging.MassTransit --version 1.2.7
NuGet\Install-Package Dosaic.Plugins.Messaging.MassTransit -Version 1.2.7
<PackageReference Include="Dosaic.Plugins.Messaging.MassTransit" Version="1.2.7" />
<PackageVersion Include="Dosaic.Plugins.Messaging.MassTransit" Version="1.2.7" />
<PackageReference Include="Dosaic.Plugins.Messaging.MassTransit" />
paket add Dosaic.Plugins.Messaging.MassTransit --version 1.2.7
#r "nuget: Dosaic.Plugins.Messaging.MassTransit, 1.2.7"
#:package Dosaic.Plugins.Messaging.MassTransit@1.2.7
#addin nuget:?package=Dosaic.Plugins.Messaging.MassTransit&version=1.2.7
#tool nuget:?package=Dosaic.Plugins.Messaging.MassTransit&version=1.2.7
Dosaic.Plugins.Messaging.MassTransit
Dosaic.Plugins.Messaging.MassTransit is a plugin that provides message-bus capabilities for Dosaic applications using MassTransit over RabbitMQ (or an in-memory transport for integration tests). It auto-discovers all IMessageConsumer<T> implementations at startup, wires them to their queues, and exposes a simple IMessageBus abstraction for sending and scheduling messages.
Installation
dotnet add package Dosaic.Plugins.Messaging.MassTransit
Or add a package reference to your .csproj:
<PackageReference Include="Dosaic.Plugins.Messaging.MassTransit" Version="" />
Configuration
The plugin is activated automatically by the Dosaic plugin system. Configure it in your appsettings.yml (or appsettings.json) under the messageBus key:
messageBus:
host: localhost
port: 5672
vhost: /
username: rabbitmq
password: rabbitmq
useRetry: false
maxRetryCount: 3
retryDelaySeconds: 30
maxRedeliveryCount: 3
redeliveryDelaySeconds: 30
deduplication: false
useInMemory: false
| Property | Type | Default | Description |
|---|---|---|---|
host |
string |
"" |
RabbitMQ hostname |
port |
ushort |
5672 |
RabbitMQ AMQP port |
vhost |
string |
"/" |
RabbitMQ virtual host |
username |
string |
— | RabbitMQ username (optional) |
password |
string |
— | RabbitMQ password (optional) |
useRetry |
bool |
false |
Enable immediate message retry on consumer failure |
maxRetryCount |
int |
3 |
Number of immediate retry attempts |
retryDelaySeconds |
int |
30 |
Delay between immediate retries (seconds) |
maxRedeliveryCount |
int |
3 |
Number of delayed redelivery attempts |
redeliveryDelaySeconds |
int |
30 |
Delay between redelivery attempts (seconds) |
deduplication |
bool |
false |
Enable message deduplication via x-deduplication-header |
useInMemory |
bool |
false |
Use in-memory transport instead of RabbitMQ (useful for testing) |
Note: When
useInMemoryistrue, all RabbitMQ settings are ignored.
Usage
Defining Messages
All messages must implement IMessage from Dosaic.Plugins.Messaging.Abstractions:
using Dosaic.Plugins.Messaging.Abstractions;
// Simple message
public record OrderPlaced(Guid OrderId, decimal Total) : IMessage;
// Generic message — queue name becomes "Notification-String"
public record Notification<T>(T Payload) : IMessage;
Queue Name Resolution
Queue names are resolved automatically from the message type name. You can override this with [QueueName]:
using Dosaic.Plugins.Messaging.Abstractions;
using Dosaic.Plugins.Messaging.MassTransit;
// Queue name: "OrderPlaced" (from type name)
public record OrderPlaced(Guid OrderId) : IMessage;
// Queue name: "order-placed-v2" (from attribute)
[QueueName("order-placed-v2")]
public record OrderPlacedV2(Guid OrderId) : IMessage;
// Queue name: "Notification-String" (generic type segments joined with "-")
public record Notification<T>(T Payload) : IMessage;
Implementing Consumers
Implement IMessageConsumer<TMessage> for each message type you want to handle. Multiple consumers for the same message type are supported — they are all invoked concurrently:
using Dosaic.Plugins.Messaging.Abstractions;
public class OrderPlacedConsumer : IMessageConsumer<OrderPlaced>
{
public async Task ProcessAsync(OrderPlaced message, CancellationToken cancellationToken)
{
Console.WriteLine($"Order {message.OrderId} placed for {message.Total}");
}
}
// Second consumer on the same queue — both are called for every message
public class OrderPlacedAuditConsumer : IMessageConsumer<OrderPlaced>
{
public async Task ProcessAsync(OrderPlaced message, CancellationToken cancellationToken)
{
Console.WriteLine($"Audit: order {message.OrderId}");
}
}
Consumers are auto-discovered via the Dosaic source generator and registered automatically — no manual registration is required.
Sending Messages
Inject IMessageBus and call SendAsync:
using Dosaic.Plugins.Messaging.Abstractions;
public class OrderService(IMessageBus bus)
{
public async Task PlaceOrderAsync(Guid orderId, decimal total, CancellationToken ct)
{
await bus.SendAsync(new OrderPlaced(orderId, total), cancellationToken: ct);
}
}
You can also send with custom headers:
var headers = new Dictionary<string, string>
{
["x-correlation-id"] = correlationId,
["x-tenant-id"] = tenantId
};
await bus.SendAsync(new OrderPlaced(orderId, total), headers, ct);
If no consumer is registered for the message type, SendAsync is a no-op (no exception is thrown).
Scheduling Messages
Schedule a message to be delivered after a delay or at an absolute time. Scheduling requires IMessageScheduler to be available in the container (provided by MassTransit's delayed-message scheduler):
// Deliver in 10 minutes
await bus.ScheduleAsync(new OrderPlaced(orderId, total), TimeSpan.FromMinutes(10), cancellationToken: ct);
// Deliver at a specific UTC time
await bus.ScheduleAsync(new OrderPlaced(orderId, total), DateTime.UtcNow.AddHours(2), cancellationToken: ct);
Message Deduplication
Enable deduplication in configuration (deduplication: true). By default, deduplicate keys are derived from a SHA-256 hash of the JSON-serialised message body prefixed with the full type name.
You can register a custom key factory per message type via IMessageDeduplicateKeyProvider:
using Dosaic.Plugins.Messaging.MassTransit;
public class MyStartup(IMessageDeduplicateKeyProvider dedup)
{
public void Configure()
{
// Use a stable business key instead of JSON hash
dedup.Register<OrderPlaced>(msg => $"order:{msg.OrderId}");
}
}
The deduplication key is written to the x-deduplication-header AMQP header. You must pair this with a RabbitMQ deduplication plugin on the broker side.
Advanced Bus Configuration via IMessageBusConfigurator
Implement IMessageBusConfigurator to hook into the MassTransit configuration pipeline. Any number of configurators are discovered automatically and applied in order:
using Dosaic.Plugins.Messaging.MassTransit;
using MassTransit;
public class MyBusConfigurator : IMessageBusConfigurator
{
// Called once during IBusRegistrationConfigurator setup
public void ConfigureMassTransit(IBusRegistrationConfigurator opts)
{
opts.AddRequestClient<OrderPlaced>();
}
// Called for the RabbitMQ bus factory (not invoked when useInMemory = true)
public void ConfigureRabbitMq(IBusRegistrationContext context, IRabbitMqBusFactoryConfigurator config)
{
config.UseMessageData(/* ... */);
}
// Called for each receive endpoint (queue)
public void ConfigureReceiveEndpoint(IBusRegistrationContext context, Uri queue,
IRabbitMqReceiveEndpointConfigurator configurator)
{
configurator.PrefetchCount = 10;
}
}
Features
- Auto-discovery — all
IMessageConsumer<T>implementations in the application are found at startup without explicit registration. - Multiple consumers per queue — any number of consumers may handle the same message type; they are run concurrently, and all exceptions are collected and re-thrown as an
AggregateException. - Built-in retry — configurable immediate retry (
useRetry,maxRetryCount,retryDelaySeconds) and delayed redelivery (maxRedeliveryCount,redeliveryDelaySeconds) using MassTransit middleware. - Per-consumer Polly retry — each consumer call is wrapped in a
WaitAndRetryAsyncPolly policy with up to 2 exponential back-off attempts before the error is propagated to MassTransit's error queue (QUEUE_NAME_error). - Scheduled sending — send messages at a future point in time using
ScheduleAsyncwith aTimeSpanorDateTime. - Custom message headers — pass arbitrary headers via
IDictionary<string, string>overloads ofSendAsync/ScheduleAsync. - Message deduplication — SHA-256 content hash (or custom factory) written to
x-deduplication-header. - Queue name customisation — automatic resolution from type name;
[QueueName("...")]attribute for explicit overrides; generic types produce hyphen-joined segment names. - In-memory transport — set
useInMemory: truefor integration tests without a running RabbitMQ broker. - OpenTelemetry tracing — MassTransit activity source is registered automatically with the OpenTelemetry
TracerProvider. - Health checks — a MassTransit health check named
message-busis registered under thereadinesstag; reportsUnhealthyon failure. - Extensible —
IMessageBusConfiguratorallows full access to the MassTransit configuration API for advanced scenarios.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Dosaic.Hosting.Abstractions (>= 1.2.7)
- Dosaic.Plugins.Messaging.Abstractions (>= 1.2.7)
- MassTransit (>= 8.5.7)
- MassTransit.RabbitMQ (>= 8.5.7)
- OpenTelemetry.Extensions.Hosting (>= 1.15.0)
- Polly (>= 8.6.5)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 1.2.7 | 64 | 3/4/2026 |
| 1.2.6 | 87 | 2/19/2026 |
| 1.2.5 | 83 | 2/17/2026 |
| 1.2.4 | 106 | 2/13/2026 |
| 1.2.3 | 113 | 1/27/2026 |
| 1.2.2 | 303 | 12/16/2025 |
| 1.2.1 | 274 | 12/16/2025 |
| 1.2.0 | 435 | 12/11/2025 |
| 1.1.21 | 453 | 12/10/2025 |
| 1.1.20 | 418 | 11/18/2025 |
| 1.1.19 | 298 | 11/11/2025 |
| 1.1.18 | 221 | 10/14/2025 |
| 1.1.17 | 205 | 10/1/2025 |
| 1.1.16 | 214 | 9/25/2025 |
| 1.1.15 | 195 | 9/24/2025 |
| 1.1.14 | 209 | 9/24/2025 |
| 1.1.13 | 200 | 9/24/2025 |
| 1.1.12 | 329 | 9/16/2025 |
| 1.1.11 | 181 | 7/18/2025 |
| 1.1.10 | 167 | 7/18/2025 |