NetEvolve.Pulse.RabbitMQ
0.67.36
Prefix Reserved
dotnet add package NetEvolve.Pulse.RabbitMQ --version 0.67.36
NuGet\Install-Package NetEvolve.Pulse.RabbitMQ -Version 0.67.36
<PackageReference Include="NetEvolve.Pulse.RabbitMQ" Version="0.67.36" />
<PackageVersion Include="NetEvolve.Pulse.RabbitMQ" Version="0.67.36" />
<PackageReference Include="NetEvolve.Pulse.RabbitMQ" />
paket add NetEvolve.Pulse.RabbitMQ --version 0.67.36
#r "nuget: NetEvolve.Pulse.RabbitMQ, 0.67.36"
#:package NetEvolve.Pulse.RabbitMQ@0.67.36
#addin nuget:?package=NetEvolve.Pulse.RabbitMQ&version=0.67.36
#tool nuget:?package=NetEvolve.Pulse.RabbitMQ&version=0.67.36
NetEvolve.Pulse.RabbitMQ
RabbitMQ transport for the Pulse outbox pattern. Publishes outbox messages directly to RabbitMQ exchanges using the official .NET client, enabling reliable event delivery without routing through Dapr or other intermediaries.
Features
- Direct Publishing: Send messages to RabbitMQ exchanges without additional infrastructure
- Flexible Routing: Automatic routing key resolution based on event types via
ITopicNameResolver - Health Checks: Verify connection and channel state for readiness probing
- Batch Support: Efficient batch publishing using parallel execution (default implementation)
- Connection Management: Singleton connection with lazy channel initialization
Installation
NuGet Package Manager
Install-Package NetEvolve.Pulse.RabbitMQ
.NET CLI
dotnet add package NetEvolve.Pulse.RabbitMQ
PackageReference
<PackageReference Include="NetEvolve.Pulse.RabbitMQ" Version="x.x.x" />
Quick Start
1. Add the RabbitMQ client package
dotnet add package RabbitMQ.Client
2. Register services
using Microsoft.Extensions.DependencyInjection;
using NetEvolve.Pulse;
using RabbitMQ.Client;
var services = new ServiceCollection();
// Register RabbitMQ connection before UseRabbitMqTransport
services.AddSingleton<IConnection>(sp =>
{
var factory = new ConnectionFactory
{
HostName = "localhost",
Port = 5672,
VirtualHost = "/",
UserName = "guest",
Password = "guest"
};
return factory.CreateConnectionAsync().GetAwaiter().GetResult();
});
services.AddPulse(config => config
.AddOutbox(
options => options.Schema = "pulse",
processorOptions => processorOptions.BatchSize = 100)
.UseRabbitMqTransport(options =>
{
options.ExchangeName = "events";
}));
3. Store events via IEventOutbox
Use IEventOutbox to store events reliably. The outbox processor picks them up and publishes each one to the configured RabbitMQ exchange:
public class OrderService
{
private readonly IEventOutbox _outbox;
public OrderService(IEventOutbox outbox) => _outbox = outbox;
public async Task CreateOrderAsync(CreateOrderRequest request, CancellationToken ct)
{
// ... business logic ...
// Stored reliably; published via RabbitMQ when the processor runs
await _outbox.StoreAsync(new OrderCreatedEvent
{
OrderId = Guid.NewGuid(),
CustomerId = request.CustomerId
}, ct);
}
}
Transaction Integration
For reliable at-least-once delivery guarantees, store outbox events within the same database transaction as your business data. Pair the RabbitMQ transport with a persistence provider that supports transaction enlistment:
public class OrderService
{
private readonly ApplicationDbContext _context;
private readonly IEventOutbox _outbox;
public OrderService(ApplicationDbContext context, IEventOutbox outbox)
{
_context = context;
_outbox = outbox;
}
public async Task CreateOrderAsync(CreateOrderRequest request, CancellationToken ct)
{
// Begin transaction
await using var transaction = await _context.Database.BeginTransactionAsync(ct);
try
{
// Business operation
var order = new Order { CustomerId = request.CustomerId, Total = request.Total };
_context.Orders.Add(order);
await _context.SaveChangesAsync(ct);
// Store event in outbox (same transaction)
await _outbox.StoreAsync(new OrderCreatedEvent
{
OrderId = order.Id,
CustomerId = order.CustomerId
}, ct);
// Commit both business data and event atomically
await transaction.CommitAsync(ct);
}
catch
{
// Rollback discards both business data AND the outbox event
await transaction.RollbackAsync(ct);
throw;
}
}
}
The RabbitMQ transport only handles publishing. Transactional guarantees are provided by the persistence layer (e.g., NetEvolve.Pulse.EntityFramework or NetEvolve.Pulse.SqlServer).
Configuration
RabbitMqTransportOptions
| Property | Type | Default | Description |
|---|---|---|---|
ExchangeName |
string |
"" |
Target exchange for publishing (required) |
Routing Key Resolution
By default, the simple class name of the event type is used as the routing key. The assembly qualifier and namespace are stripped automatically via ITopicNameResolver.
EventType |
Resolved routing key |
|---|---|
MyApp.Events.OrderCreated, MyApp |
OrderCreated |
MyApp.Events.PaymentProcessed, MyApp |
PaymentProcessed |
Override the resolver for custom naming strategies:
services.AddSingleton<ITopicNameResolver, MyCustomTopicNameResolver>();
services.AddPulse(config => config
.UseRabbitMqTransport(options =>
{
options.ExchangeName = "events";
}));
Exchange Setup
The target RabbitMQ exchange must already exist. This transport does not auto-declare exchanges or queues.
Example: Topic Exchange
# Create a topic exchange for event routing
rabbitmqadmin declare exchange name=events type=topic durable=true
# Create queues and bind them to specific event types
rabbitmqadmin declare queue name=order-service durable=true
rabbitmqadmin declare binding source=events destination=order-service routing_key="OrderCreated"
rabbitmqadmin declare queue name=payment-service durable=true
rabbitmqadmin declare binding source=events destination=payment-service routing_key="PaymentProcessed"
Example: Fanout Exchange
# Create a fanout exchange for broadcasting to all subscribers
rabbitmqadmin declare exchange name=notifications type=fanout durable=true
# Create queues and bind them (no routing key needed for fanout)
rabbitmqadmin declare queue name=email-service durable=true
rabbitmqadmin declare binding source=notifications destination=email-service
rabbitmqadmin declare queue name=sms-service durable=true
rabbitmqadmin declare binding source=notifications destination=sms-service
Consumer Integration
Consume messages using the official RabbitMQ .NET client or any compatible library:
var factory = new ConnectionFactory { HostName = "localhost" };
await using var connection = await factory.CreateConnectionAsync();
await using var channel = await connection.CreateChannelAsync();
await channel.QueueDeclareAsync(
queue: "order-service",
durable: true,
exclusive: false,
autoDelete: false);
await channel.QueueBindAsync(
queue: "order-service",
exchange: "events",
routingKey: "OrderCreated");
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (sender, ea) =>
{
var body = ea.Body.ToArray();
var json = Encoding.UTF8.GetString(body);
var @event = JsonSerializer.Deserialize<OrderCreatedEvent>(json);
// Handle the event
Console.WriteLine($"Order created: {@event.OrderId}");
await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
};
await channel.BasicConsumeAsync(
queue: "order-service",
autoAck: false,
consumer: consumer);
How It Works
- Your application stores events in the outbox via
IEventOutbox.StoreAsyncwithin a database transaction. - The Pulse background processor polls the outbox for pending messages.
- For each message,
RabbitMqMessageTransportpublishes it to the configured exchange with a routing key resolved byITopicNameResolver. - RabbitMQ routes the message to bound queues based on the routing key and exchange type.
- On success, the message is marked as processed; on failure, it remains pending for the next poll cycle.
Performance Considerations
Batch Processing
Configure batch size and polling interval based on your throughput requirements:
.AddOutbox(processorOptions: options =>
{
options.BatchSize = 100; // Messages per poll cycle
options.PollingInterval = TimeSpan.FromSeconds(1);
})
Connection Management
Register IConnection as a singleton in your DI container. The RabbitMQ client library is thread-safe and designed for concurrent use, so a single shared connection is recommended.
Channel Management
Channels are created on demand and reused for subsequent sends. If a channel becomes closed, a new one is automatically created on the next send operation.
Requirements
- .NET 8.0, .NET 9.0, or .NET 10.0
- RabbitMQ 3.8+ (or compatible AMQP 0-9-1 broker)
RabbitMQ.Client7.0+ for async API supportMicrosoft.Extensions.DependencyInjectionfor service registrationMicrosoft.Extensions.Hostingfor the background processor
Related Packages
- NetEvolve.Pulse - Core mediator and outbox abstractions
- NetEvolve.Pulse.Extensibility - Core contracts and abstractions
- NetEvolve.Pulse.EntityFramework - Entity Framework Core persistence provider
- NetEvolve.Pulse.SqlServer - SQL Server ADO.NET persistence provider
- NetEvolve.Pulse.Polly - Polly v8 resilience policies integration
Documentation
For complete documentation, please visit the official documentation.
Contributing
Contributions are welcome! Please read the Contributing Guidelines before submitting a pull request.
Support
- Issues: Report bugs or request features on GitHub Issues
- Documentation: Read the full documentation at https://github.com/dailydevops/pulse
License
This project is licensed under the MIT License - see the LICENSE file for details.
Made with ❤️ by the NetEvolve Team
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.7)
- Microsoft.Extensions.Options (>= 10.0.7)
- NetEvolve.Pulse.Extensibility (>= 0.67.36)
- RabbitMQ.Client (>= 7.2.1)
-
net8.0
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.7)
- Microsoft.Extensions.Options (>= 10.0.7)
- NetEvolve.Pulse.Extensibility (>= 0.67.36)
- RabbitMQ.Client (>= 7.2.1)
-
net9.0
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.7)
- Microsoft.Extensions.Options (>= 10.0.7)
- NetEvolve.Pulse.Extensibility (>= 0.67.36)
- RabbitMQ.Client (>= 7.2.1)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 0.67.36 | 168 | 5/10/2026 |