Sora.Messaging.RabbitMq
0.4.0
dotnet add package Sora.Messaging.RabbitMq --version 0.4.0
NuGet\Install-Package Sora.Messaging.RabbitMq -Version 0.4.0
<PackageReference Include="Sora.Messaging.RabbitMq" Version="0.4.0" />
<PackageVersion Include="Sora.Messaging.RabbitMq" Version="0.4.0" />
<PackageReference Include="Sora.Messaging.RabbitMq" />
paket add Sora.Messaging.RabbitMq --version 0.4.0
#r "nuget: Sora.Messaging.RabbitMq, 0.4.0"
#:package Sora.Messaging.RabbitMq@0.4.0
#addin nuget:?package=Sora.Messaging.RabbitMq&version=0.4.0
#tool nuget:?package=Sora.Messaging.RabbitMq&version=0.4.0
Sylin.Sora.Messaging.RabbitMq
RabbitMQ transport for Sora Messaging: connection factory, publisher, and consumer helpers.
- Target framework: net9.0
- License: Apache-2.0
Capabilities
- Alias-based routing to exchanges/queues, optional auto-provisioning
- Publisher confirms, prefetch/QoS, retry buckets (scheduled) and DLQ topology
- Health checks and diagnostics via Sora.Messaging.Core
Install
dotnet add package Sylin.Sora.Messaging.RabbitMq
Minimal setup
Use Sora bootstrap. When this package is referenced, the RabbitMQ provider is auto-registered.
var builder = WebApplication.CreateBuilder(args);
// Core wiring and auto-registrars
builder.Services.AddSora();
// Optional: explicit registration (not required when using AddSora)
// builder.Services.AddRabbitMq();
var app = builder.Build();
// Greenfield boot handled by host templates; ensure AppHost.Current is set and IAppRuntime started.
app.Run();
Quick config (appsettings.json):
{
"Sora": {
"Messaging": {
"DefaultBus": "rabbit",
"DefaultGroup": "workers",
"Buses": {
"rabbit": {
"ConnectionString": "amqp://guest:guest@localhost:5672",
// Or use ConnectionStringName to resolve from ConnectionStrings:{name}
// "ConnectionStringName": "RabbitMq",
"RabbitMq": {
"Exchange": "sora",
"Prefetch": 100,
"ProvisionOnStart": true,
"Dlq": { "Enabled": true },
"Retry": { "MaxAttempts": 5, "FirstDelaySeconds": 2, "Backoff": "exponential", "MaxDelaySeconds": 60 },
"Subscriptions": [
{ "Name": "workers", "RoutingKeys": "#", "Dlq": true, "Concurrency": 1 },
{ "Name": "billing", "RoutingKeys": ["Invoice.*", "Payment.#"], "Dlq": true, "Concurrency": 2, "Queue": "sora.rabbit.billing" }
]
}
}
}
}
}
}
Environment variables (equivalents; use double underscores):
- Sora__Messaging__DefaultBus=rabbit
- Sora__Messaging__Buses__rabbit__ConnectionString=amqp://guest:guest@localhost:5672
- Sora__ConnectionStrings__RabbitMq=amqp://guest:guest@localhost:5672
- Sora__Messaging__Buses__rabbit__ConnectionStringName=RabbitMq
- Sora__Messaging__Buses__rabbit__RabbitMq__Exchange=sora
- Sora__Messaging__Buses__rabbit__RabbitMq__Prefetch=100
- Sora__Messaging__Buses__rabbit__RabbitMq__ProvisionOnStart=true
- Sora__Messaging__Buses__rabbit__RabbitMq__Retry__MaxAttempts=5
Usage
Declare messages with aliases; use attributes for headers, idempotency, partitioning, and optional delay.
using Sora.Messaging;
[Message(Alias = "User.Registered", Version = 1)]
public sealed record UserRegistered(
string UserId,
[Header("x-tenant")] string Tenant,
[IdempotencyKey] string EventId,
[PartitionKey] string PartitionKey,
[DelaySeconds] int DelaySeconds = 0);
// Send (uses DefaultBus)
await new UserRegistered("u-123", "acme", "evt-1", "acme:u-123").Send();
// Send to a specific bus
await new UserRegistered("u-456", "acme", "evt-2", "acme:u-456").SendTo("rabbit");
Handle messages via DI sugar:
// Terse (no envelope)
builder.Services.On<UserRegistered>(msg => Console.WriteLine($"Welcome {msg.UserId}"));
// Or keep envelope when needed
builder.Services.OnMessage<UserRegistered>(async (env, msg, ct) =>
{
// Idempotency key and correlation are available in env.Headers / env.CorrelationId
// Do work; throw to trigger retry/DLQ per configured policy
await Task.CompletedTask;
});
Batch send:
var batch = new object[]
{
new UserRegistered("u-789", "acme", "evt-3", "acme:u-789"),
new UserRegistered("u-790", "acme", "evt-4", "acme:u-790")
};
await batch.Send(); // default bus
await batch.SendTo("rabbit"); // specific bus
Notes
- Subscriptions: if none are configured, the provider creates a default queue for
DefaultGroup
bound to#
. - Routing: the routing key is the message alias with an optional
.p{n}
partition suffix when[PartitionKey]
is present. - Scheduled send: set
[DelaySeconds]
to route through retry buckets for approximate delay delivery. - Idempotency: set
[IdempotencyKey]
on messages and configure an inbox store to de-duplicate on the consumer side.
Operations
- Health: a
RabbitMqHealth
contributor is registered; include health endpoints via Sora.Core. - Diagnostics: effective plan and capabilities are published via
IMessagingDiagnostics
. - DLQ/Retry: DLQs require
Dlq.Enabled=true
and subscriptions withDlq=true
; retries use TTL bucket queues behind a headers exchange.
Distribution patterns (minimal configs)
Round-robin (competing consumers on one queue)
{
"Sora": {
"Messaging": {
"DefaultBus": "rabbit",
"Buses": {
"rabbit": {
"ConnectionString": "amqp://guest:guest@localhost:5672",
"RabbitMq": {
"Exchange": "sora",
"Subscriptions": [
{ "Name": "workers", "RoutingKeys": ["#"], "Concurrency": 4 }
]
}
}
}
}
}
}
- Implementation details: all service instances use the same
Name
(group) so they share a single queue; multiple consumers (Concurrency or many instances) compete, yielding round-robin delivery.
Broadcast (pub/sub to multiple groups)
{
"Sora": {
"Messaging": {
"DefaultBus": "rabbit",
"Buses": {
"rabbit": {
"ConnectionString": "amqp://guest:guest@localhost:5672",
"RabbitMq": {
"Exchange": "sora",
"Subscriptions": [
{ "Name": "billing", "RoutingKeys": ["User.Registered"], "Concurrency": 1 },
{ "Name": "analytics", "RoutingKeys": ["User.Registered"], "Concurrency": 1 }
]
}
}
}
}
}
}
- Implementation details: each
Name
creates its own queue bound to the same routing key; every message is delivered to each queue (fan-out per group).
Selective topics (route by patterns)
{
"Sora": {
"Messaging": {
"Buses": {
"rabbit": {
"RabbitMq": {
"Exchange": "sora",
"Subscriptions": [
{ "Name": "billing", "RoutingKeys": ["Invoice.*", "Payment.#"] },
{ "Name": "fulfillment", "RoutingKeys": ["Order.Created", "Order.Shipped"] }
]
}
}
}
}
}
}
- Implementation details: topic exchange uses
*
(one segment) and#
(many segments); bind only the patterns each group needs.
Partition-aware (shard by key)
{
"Sora": {
"Messaging": {
"Buses": {
"rabbit": {
"RabbitMq": {
"Exchange": "sora",
"Subscriptions": [
{ "Name": "p0-7", "RoutingKeys": ["User.Registered.p0", "User.Registered.p1", "User.Registered.p2", "User.Registered.p3", "User.Registered.p4", "User.Registered.p5", "User.Registered.p6", "User.Registered.p7"] },
{ "Name": "p8-15", "RoutingKeys": ["User.Registered.p8", "User.Registered.p9", "User.Registered.p10", "User.Registered.p11", "User.Registered.p12", "User.Registered.p13", "User.Registered.p14", "User.Registered.p15"] }
]
}
}
}
}
}
}
- Implementation details: messages decorated with
[PartitionKey]
get a stable.p{0..15}
suffix on the routing key; split bindings across groups to shard load while preserving ordering within each queue. UseConcurrency=1
for strict per-queue ordering.
Troubleshooting
- Connection refused/timeout: verify
ConnectionString
/vhost and broker reachability; check firewall and credentials. - Provisioning errors (missing exchange/queue): enable
ProvisionOnStart
or pre-create topology with equivalent names; ensure account has configure/bind permissions. - Publisher confirms timeout: transient broker/backpressure; keep messages small, avoid long-running confirms; optionally set
PublisherConfirms=false
if your ops posture allows. - DLQ not receiving: set both
Dlq.Enabled=true
and subscriptionDlq=true
; ensure handler throws on failures so the consumer canNack
. - Scheduled send not delayed: require retry infra; set
Retry.MaxAttempts >= 2
to create TTL bucket queues; chooseFirstDelaySeconds
/MaxDelaySeconds
appropriately.
References
- Technical reference:
./TECHNICAL.md
- Messaging core reference:
../Sora.Messaging.Core/TECHNICAL.md
- Messaging overview:
/docs/reference/messaging.md
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net9.0
- Microsoft.Extensions.Configuration.Abstractions (>= 9.0.8)
- Microsoft.Extensions.Configuration.Binder (>= 9.0.8)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 9.0.8)
- Microsoft.Extensions.Http (>= 9.0.8)
- Newtonsoft.Json (>= 13.0.3)
- RabbitMQ.Client (>= 6.8.1)
- Sora.Core (>= 0.4.0)
- Sora.Messaging.Core (>= 0.4.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
See release notes: https://github.com/sylin-labs/sora-framework/releases