EasyRabbitFlow 4.0.0
See the version list below for details.
dotnet add package EasyRabbitFlow --version 4.0.0
NuGet\Install-Package EasyRabbitFlow -Version 4.0.0
<PackageReference Include="EasyRabbitFlow" Version="4.0.0" />
<PackageVersion Include="EasyRabbitFlow" Version="4.0.0" />
<PackageReference Include="EasyRabbitFlow" />
paket add EasyRabbitFlow --version 4.0.0
#r "nuget: EasyRabbitFlow, 4.0.0"
#:package EasyRabbitFlow@4.0.0
#addin nuget:?package=EasyRabbitFlow&version=4.0.0
#tool nuget:?package=EasyRabbitFlow&version=4.0.0
RabbitFlow Documentation
Welcome to RabbitFlow, a streamlined .NET library for configuring RabbitMQ messaging with minimal ceremony and high performance.
Table of Contents
- Introduction
- Install
- Configuration
- Consumers
- Hosted Initialization (Recommended)
- Publishing Messages
- Queue State
- Temporary Queue Processing
- Performance Notes
1. Introduction
RabbitFlow simplifies integration with RabbitMQ by:
- Auto-registering consumers with strongly typed settings.
- Optional automatic queue/exchange/dead-letter generation.
- Efficient retry, timeout, and error handling.
- High performance message processing (no per-message reflection).
It supports both pre-existing infrastructure and on-demand generation.
Let’s explore setup and usage.
2. Install
To install the RabbitFlow library into your project, you can use the NuGet package manager:
dotnet add package EasyRabbitFlow
3. Configuration
Register core services using AddRabbitFlow, then optionally start the hosted consumer service with UseRabbitFlowConsumers.
builder.Services
.AddRabbitFlow(cfg =>
{
cfg.ConfigureHost(host =>
{
host.Host = "rabbitmq.example.com";
host.Username = "guest";
host.Password = "guest";
});
cfg.ConfigureJsonSerializerOptions(json =>
{
json.PropertyNameCaseInsensitive = true;
json.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
json.DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull;
});
cfg.ConfigurePublisher(pub => pub.DisposePublisherConnection = false);
cfg.AddConsumer<EmailConsumer>("email-queue", c =>
{
c.PrefetchCount = 5;
c.Timeout = TimeSpan.FromSeconds(2);
c.AutoGenerate = true;
c.ConfigureAutoGenerate(a =>
{
a.ExchangeName = "notifications";
a.ExchangeType = ExchangeType.Fanout;
a.GenerateDeadletterQueue = true;
});
c.ConfigureRetryPolicy(r =>
{
r.MaxRetryCount = 3;
r.RetryInterval = 1000; // ms
r.ExponentialBackoff = true;
r.ExponentialBackoffFactor = 2;
});
});
})
.UseRabbitFlowConsumers(); // starts background consumption
3.1 Host Configuration
The ConfigureHost method allows you to specify the connection details for your RabbitMQ host:
opt.ConfigureHost(hostSettings =>
{
hostSettings.Host = "rabbitmq.example.com";
hostSettings.Username = "guest";
hostSettings.Password = "guest";
});
3.2 JSON Serialization Options
This option allows you to globally configure how JSON serialization should be handled. This configuration is optional; if not provided, the default JsonSerializerOptions will be used.
opt.ConfigureJsonSerializerOptions(jsonSettings =>
{
jsonSettings.PropertyNameCaseInsensitive = true;
jsonSettings.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
jsonSettings.DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull;
});
3.3 Publisher Options
Optionally, you may configure the publisher that you intend to use by defining the 'DisposePublisherConnection' variable.
This variable determines whether the connection established by the publisher with RabbitMQ should be kept alive or terminated upon the completion of the process. The default value is false.
opt.ConfigurePublisher(publisherSettings => publisherSettings.DisposePublisherConnection = true);
4. Consumers
4.1 Adding Consumers
Define and configure consumers for specific queues using the AddConsumer method:
All consumers must implement the interface IRabbitFlowConsumer<TEvent> where TEvent us the event or message model.
Using the AddConsumer method, all required services and configurations will be created and registered into the DI container, ready for use.
opt.AddConsumer<EmailConsumer>("email-queue", consumerSettings =>
{
consumerSettings.PrefetchCount = 1;
consumerSettings.Timeout = TimeSpan.FromMilliseconds(500);
consumerSettings.AutoGenerate = true;
consumerSettings.ConfigureAutoGenerate(opt =>
{
opt.DurableQueue = true;
opt.DurableExchange = true;
opt.ExclusiveQueue = false;
opt.AutoDeleteQueue = false;
opt.GenerateDeadletterQueue = true;
opt.ExchangeType = ExchangeType.Direct;
// ... other settings ...
});
4.2 Retry Policies
You can configure a retry policy to handle message processing failures effectively.
By default, all exceptions related to timeout issues will be automatically retried if the retry mechanism is enabled.
Additionally, you can customize the retry logic by defining your own rules for handling specific use cases using the TranscientException class from the EasyRabbitFlow.Exceptions namespace.
consumerSettings.ConfigureRetryPolicy(retryPolicy =>
{
retryPolicy.MaxRetryCount = 3;
retryPolicy.RetryInterval = 1000;
retryPolicy.ExponentialBackoff = true;
retryPolicy.ExponentialBackoffFactor = 2;
});
4.3 Consumer Interface Implementation
Consumers must implement the IRabbitFlowConsumer<TEvent> interface:
// Consumers must implement the IRabbitFlowConsumer<TEvent> interface:
public interface IRabbitFlowConsumer<TEvent>
{
Task HandleAsync(TEvent message, CancellationToken cancellationToken);
}
// Example EmailConsumer
public class EmailConsumer : IRabbitFlowConsumer<EmailEvent>
{
private readonly ILogger<EmailConsumer> _logger;
public EmailConsumer(ILogger<EmailConsumer> logger)
{
_logger = logger;
}
public async Task HandleAsync(EmailEvent message, CancellationToken cancellationToken)
{
await Task.CompletedTask;
_logger.LogInformation("New email event received. Event:{event}", JsonSerializer.Serialize(message));
}
}
5. Hosted Initialization (Recommended)
Consumers are automatically started by calling:
builder.Services.AddRabbitFlow(cfg => { /* config */ })
.UseRabbitFlowConsumers();
Older manual initialization methods are deprecated.
6. Publishing Messages
Publisher Interface
Use the IRabbitFlowPublisher interface to publish messages to a RabbitMQ:
JsonSerializerOptions can be overridden from global settings.
The publisherId parameter is intended to identify the connection created with RabbitMQ.
public interface IRabbitFlowPublisher
{
Task<bool> PublishAsync<TEvent>(TEvent message, string exchangeName, string routingKey, string publisherId = "", JsonSerializerOptions? jsonOptions = null) where TEvent : class;
Task<bool> PublishAsync<TEvent>(TEvent message, string queueName, string publisherId = "", JsonSerializerOptions? jsonOptions = null) where TEvent : class;
}
7. Queue State
The IRabbitFlowState interface allows you to access queue status information:
public interface IRabbitFlowState
{
bool IsEmptyQueue(string queueName);
uint GetQueueLength(string queueName);
uint GetConsumersCount(string queueName);
bool QueueHasConsumers(string queueName);
}
8. Temporary Message Processing
IRabbitFlowTemporary is a utility designed to simplify fire-and-forget style workflows where a batch of messages is sent to RabbitMQ, processed by handlers, and discarded — all within a temporary queue.
This is ideal for:
- Background jobs that need to process large collections of data (e.g., generating PDFs, sending emails, calculating reports), where you want to free up the main thread or HTTP request quickly and continue processing in the background.
- Ephemeral tasks that don't need long-term queues or persistent consumers.
- One-time batch processing, such as database cleanup, syncing remote systems, or running onboarding flows.
This approach lets you publish the work and return immediately, while the internal RabbitMQ mechanism ensures each message is processed asynchronously and independently — with timeout and cancellation support.
✨ How It Works
- A temporary, exclusive queue and exchange are created automatically for the message type.
- All messages are published to this queue and immediately consumed by an internal async handler.
- The temporary queue and exchange are deleted automatically after the process completes.
- You can specify:
- Per-message timeout.
- Degree of parallelism via prefetch.
- A callback on completion.
- A global cancellation token.
🔧 API Overview
Task<int> RunAsync<T>(
IReadOnlyList<T> messages,
Func<T, CancellationToken, Task> onMessageReceived,
Action<int, int>? onCompleted = null,
RunTemporaryOptions? options = null,
CancellationToken cancellationToken = default
) where T : class;
Sample
public record InvoiceToProcess(string InvoiceId, decimal Amount);
public class InvoiceService
{
private readonly IRabbitFlowTemporary _rabbitFlow;
public InvoiceService(IRabbitFlowTemporary rabbitFlow)
{
_rabbitFlow = rabbitFlow;
}
public async Task StartBatchProcessingAsync(List<InvoiceToProcess> invoices)
{
_ = _rabbitFlow.RunAsync(
invoices,
async (invoice, ct) =>
{
Console.WriteLine($"Processing invoice {invoice.InvoiceId}...");
// Simulate long-running processing
await Task.Delay(1000, ct);
// You could do: Save to DB, call APIs, generate PDFs, etc.
Console.WriteLine($"Finished invoice {invoice.InvoiceId}");
},
onCompleted: (success, errors) =>
{
Console.WriteLine($"Invoice batch complete. ✅ {success}, ❌ {errors}");
},
options: new RunTemporaryOptions
{
Timeout = TimeSpan.FromSeconds(5),
PrefetchCount = 5,
QueuePrefixName = "invoice",
CorrelationId = Guid.NewGuid().ToString()
}
);
Console.WriteLine("Batch dispatch complete — processing will continue in background.");
}
}
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net5.0 was computed. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
| .NET Core | netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
| .NET Standard | netstandard2.1 is compatible. |
| MonoAndroid | monoandroid was computed. |
| MonoMac | monomac was computed. |
| MonoTouch | monotouch was computed. |
| Tizen | tizen60 was computed. |
| Xamarin.iOS | xamarinios was computed. |
| Xamarin.Mac | xamarinmac was computed. |
| Xamarin.TVOS | xamarintvos was computed. |
| Xamarin.WatchOS | xamarinwatchos was computed. |
-
.NETStandard 2.1
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 9.0.0)
- Microsoft.Extensions.Hosting.Abstractions (>= 9.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 9.0.0)
- RabbitMQ.Client (>= 7.1.2)
- System.Text.Json (>= 9.0.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 4.0.2 | 470 | 10/31/2025 |
| 4.0.1 | 244 | 10/13/2025 |
| 4.0.0 | 194 | 10/12/2025 |
| 3.0.9 | 137 | 10/10/2025 |
| 3.0.8 | 142 | 10/10/2025 |
| 3.0.7 | 390 | 8/10/2025 |
| 3.0.6 | 377 | 6/19/2025 |
| 3.0.5 | 450 | 5/1/2025 |
| 3.0.4 | 194 | 4/25/2025 |
| 3.0.3 | 219 | 4/25/2025 |
| 3.0.2 | 193 | 4/25/2025 |
| 3.0.1 | 299 | 4/17/2025 |
| 3.0.0 | 260 | 4/17/2025 |
| 2.2.7 | 312 | 4/14/2025 |
| 2.2.6 | 274 | 4/13/2025 |
| 2.2.5 | 266 | 4/13/2025 |
| 2.2.4 | 193 | 4/11/2025 |
| 2.2.3 | 196 | 4/11/2025 |
| 2.2.2 | 210 | 4/11/2025 |
| 2.2.1 | 204 | 4/11/2025 |
| 2.1.1 | 1,048 | 8/29/2024 |
| 2.1.0 | 412 | 8/24/2024 |
| 2.0.0 | 220 | 8/23/2024 |
| 1.0.1 | 206 | 4/26/2024 |
| 1.0.0 | 240 | 10/3/2023 |