DSeries.DOmniBus.Lite
1.0.5
dotnet add package DSeries.DOmniBus.Lite --version 1.0.5
NuGet\Install-Package DSeries.DOmniBus.Lite -Version 1.0.5
<PackageReference Include="DSeries.DOmniBus.Lite" Version="1.0.5" />
<PackageVersion Include="DSeries.DOmniBus.Lite" Version="1.0.5" />
<PackageReference Include="DSeries.DOmniBus.Lite" />
paket add DSeries.DOmniBus.Lite --version 1.0.5
#r "nuget: DSeries.DOmniBus.Lite, 1.0.5"
#:package DSeries.DOmniBus.Lite@1.0.5
#addin nuget:?package=DSeries.DOmniBus.Lite&version=1.0.5
#tool nuget:?package=DSeries.DOmniBus.Lite&version=1.0.5
DOmniBus.Lite
A single-project Kafka message bus for .NET. One call to set up, no pipeline behaviors, no shared contracts required.
The only external runtime dependency is Confluent.Kafka.
Repository layout
| Path | Description |
|---|---|
DOmniBus.Lite/ |
The library (NuGet package source) |
Sample.Lite/ |
Minimal end-to-end sample app |
test/Unit.Lite/ |
Unit tests |
DOmniBus.Lite.sln |
Solution file |
Build & test
dotnet build DOmniBus.Lite.sln
dotnet test DOmniBus.Lite.sln
dotnet run --project Sample.Lite
1. Install
Reference DOmniBus.Lite from your project:
<ProjectReference Include="path/to/DOmniBus.Lite/DOmniBus.Lite.csproj" />
The only external dependency is Confluent.Kafka.
2. Define Messages
Two message types: commands (fire-and-forget, one handler) and events (fan-out, many handlers).
using DOmniBus.Lite;
// Command — exactly one handler, no response
public sealed record CancelOrder(string OrderId) : MessageBase, ICommand;
// Event — zero or more handlers
public sealed record OrderShipped(string OrderId, DateTimeOffset ShippedAt) : MessageBase, IEvent;
MessageBase gives you MessageId, CorrelationId, CausationId, CreatedAt, and a Headers dictionary for free.
3. Write Handlers
public sealed class CancelOrderHandler : ICommandHandler<CancelOrder>
{
public Task Handle(CancelOrder command, CancellationToken ct)
{
// your logic
return Task.CompletedTask;
}
}
public sealed class ShipmentNotifier : IEventHandler<OrderShipped>
{
public Task Handle(OrderShipped @event, CancellationToken ct)
{
// your logic
return Task.CompletedTask;
}
}
public sealed class AuditLogger : IEventHandler<OrderShipped>
{
public Task Handle(OrderShipped @event, CancellationToken ct)
{
// second handler — both fire when OrderShipped is published
return Task.CompletedTask;
}
}
Handlers are resolved from DI. Constructor injection works normally.
4. Wire Everything Up
One call. Topics, handlers, and Kafka settings — all in one place:
builder.Services.AddDOmniBusLite(bus =>
{
bus.BootstrapServers = "localhost:9092";
bus.GroupId = "my-service";
bus.Map<CancelOrder, CancelOrderHandler>("orders.commands");
bus.Map<OrderShipped, ShipmentNotifier>("orders.shipped");
bus.Map<OrderShipped, AuditLogger>("orders.shipped");
});
Map<TMessage, THandler>("topic") does three things:
- Routes the message type to a Kafka topic
- Registers the handler in DI as transient
- Registers the message type so the consumer can deserialize it
No other registration is needed. No services.AddTransient<>(), no TypeRegistry, no AddKafkaTransport().
Publisher-only services
If a service only sends a message type and never consumes it, use the handler-less overload:
bus.Map<OrderShipped>("orders.shipped"); // producer-only
bus.Map<OrderShipped>("orders.shipped", wireType: "OrderShipped"); // with wireType
The service will produce to the topic when you call _bus.Publish(new OrderShipped(...)), but the built-in Kafka consumer will not subscribe to it. A service with only producer-only mappings won't start a consumer at all.
5. Dispatch Messages
Inject IMessageBus anywhere:
public class OrderService
{
private readonly IMessageBus _bus;
public OrderService(IMessageBus bus) => _bus = bus;
public Task CancelAsync(string orderId, CancellationToken ct)
=> _bus.Execute(new CancelOrder(orderId), ct);
public Task ShipAsync(string orderId, CancellationToken ct)
=> _bus.Publish(new OrderShipped(orderId, DateTimeOffset.UtcNow), ct);
}
| Method | Message Type | What happens |
|---|---|---|
Execute() |
ICommand |
Produces to Kafka topic, consumed by exactly one handler |
Publish() |
IEvent |
Produces to Kafka topic, consumed by all registered handlers |
6. How Routing Works
bus.Execute(new CancelOrder("A123"))
│
├─ Is CancelOrder mapped to a topic?
│ YES → produce to "orders.commands" on Kafka
│ NO → resolve ICommandHandler<CancelOrder> from DI, call Handle() directly
│
└─ Is the message inbound from Kafka? (x-domnibus-ingress header set)
YES → always run locally (prevents infinite loops)
Messages without a Map call run in-process only — no Kafka involved.
Messages arriving from Kafka are stamped with x-domnibus-ingress before being dispatched through the bus, so they always execute locally on the consumer side.
7. Kafka Consumer
AddDOmniBusLite() automatically starts a BackgroundService that subscribes to all mapped topics. No extra call needed.
Single-threaded (default)
bus.MaxConcurrency = 1; // default — one message at a time
Simplest mental model. Messages are processed sequentially, offsets committed after each.
Parallel
bus.MaxConcurrency = 8; // 8 worker tasks process messages in parallel
Uses a bounded Channel<T> internally. Messages are fanned out to worker tasks. Offsets are committed after each message completes.
8. Dead-Letter Queue
bus.EnableDeadLetterQueue = true; // default false
When enabled, messages that fail processing are forwarded to <topic>.dlq:
orders.commands → handler throws → orders.commands.dlq
The original payload and all headers are preserved. An error header is added with the exception message. The offset is committed so the consumer moves on.
When disabled, failed messages block the consumer (the offset is not committed and the message will be redelivered).
9. wireType — Independent Models Across Services
By default, DOmniBus.Lite stamps the full CLR type name in the Kafka message-type header:
message-type: "MyApp.Orders.OrderShipped, MyApp"
This means the consuming service must reference the same assembly to deserialize it. For microservices that don't want to share contracts, use wireType:
bus.Map<OrderShipped, ShipmentNotifier>("orders.shipped", wireType: "OrderShipped");
Now the header is:
message-type: "OrderShipped"
How two services communicate without shared models
Service A — Publisher:
// Service A's own model
public sealed record OrderShipped(string OrderId, DateTimeOffset ShippedAt)
: MessageBase, IEvent;
builder.Services.AddDOmniBusLite(bus =>
{
bus.Map<OrderShipped, SomeHandler>("orders.shipped", wireType: "OrderShipped");
});
await bus.Publish(new OrderShipped("A123", DateTimeOffset.UtcNow));
// Kafka header: message-type = "OrderShipped"
Service B — Consumer (completely separate codebase):
// Service B's own model — different class name, different namespace
public sealed record ShipmentEvent(string OrderId, DateTimeOffset ShippedAt)
: MessageBase, IEvent;
builder.Services.AddDOmniBusLite(bus =>
{
bus.Map<ShipmentEvent, MyHandler>("orders.shipped", wireType: "OrderShipped");
// ^^^^^^^^^^^^^^^^^^^^^^^^
// matches what Service A stamps in the header
});
When the consumer receives a message with message-type: "OrderShipped", it maps to ShipmentEvent and deserializes the JSON payload by field name. Extra fields in the JSON are ignored, missing fields get default values.
Rules
| Scenario | What to do |
|---|---|
| Both services share the same assembly | No wireType needed (default) |
| Independent models, same class name | wireType: "OrderShipped" on both sides |
| Independent models, different class names | wireType: "OrderShipped" on both sides — the wire name is the contract |
| Multiple handlers with wireType | Use the same wireType string on all Map calls for that type |
10. Configuration Reference
using Confluent.Kafka;
builder.Services.AddDOmniBusLite(bus =>
{
// --- Kafka connection ---
bus.BootstrapServers = "localhost:9092"; // default: "localhost:9092"
bus.ClientId = "my-service"; // default: "DOmniBus.Lite"
bus.GroupId = "my-service"; // default: "DOmniBus.Lite"
// --- Security / auth (e.g. Confluent Cloud, MSK SASL, Aiven) ---
bus.SecurityProtocol = SecurityProtocol.SaslSsl;
bus.SaslMechanism = SaslMechanism.Plain;
bus.SaslUsername = "<api-key>";
bus.SaslPassword = "<api-secret>";
// --- Consumer ---
bus.MaxConcurrency = 8; // default: 1
bus.AutoOffsetReset = AutoOffsetReset.Earliest; // default: Earliest
bus.SessionTimeoutMs = 45_000; // default: library/broker default
bus.MaxPollIntervalMs = 300_000; // default: library/broker default
bus.AutoCreateConsumerTopics = true; // default: true
bus.AutoCreateTopicNumPartitions = 1; // default: 1
bus.AutoCreateTopicReplicationFactor = 1; // default: 1
bus.TopicCreationRetryDelayMs = 5_000; // default: 5000
// --- Producer ---
bus.EnableIdempotence = true; // default: true
bus.Acks = Acks.All; // default: broker/library default
bus.CompressionType = CompressionType.Zstd; // default: none
// --- Dead-letter queue ---
bus.EnableDeadLetterQueue = true; // default: false
bus.DeadLetterTopicSuffix = ".dlq"; // default: ".dlq"
// --- Escape hatches for anything else Confluent.Kafka exposes ---
bus.ConfigureProducer = cfg => cfg.LingerMs = 20;
bus.ConfigureConsumer = cfg => cfg.FetchMinBytes = 1024;
bus.ConfigureAdmin = cfg => cfg.ClientId = "my-service-admin";
// --- Mappings ---
bus.Map<CancelOrder, CancelOrderHandler>("orders.commands");
bus.Map<OrderShipped, ShipmentNotifier>("orders.shipped");
bus.Map<OrderShipped, AuditLogger>("orders.shipped");
// --- wireType (optional, per mapping) ---
bus.Map<ShipmentEvent, MyHandler>("orders.shipped", wireType: "OrderShipped");
});
You can also load Kafka settings from configuration:
builder.Services.AddDOmniBusLite(bus =>
{
var kafka = builder.Configuration.GetSection("Kafka");
bus.BootstrapServers = kafka["BootstrapServers"] ?? bus.BootstrapServers;
bus.GroupId = kafka["GroupId"] ?? bus.GroupId;
bus.Map<CancelOrder, CancelOrderHandler>("orders.commands");
});
11. Full Example
// Program.cs
using Microsoft.Extensions.Hosting;
using DOmniBus.Lite;
var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddDOmniBusLite(bus =>
{
bus.BootstrapServers = "localhost:9092";
bus.GroupId = "order-service";
bus.MaxConcurrency = 4;
bus.EnableDeadLetterQueue = true;
bus.Map<CancelOrder, CancelOrderHandler>("orders.commands");
bus.Map<OrderShipped, ShipmentNotifier>("orders.shipped");
bus.Map<OrderShipped, AuditLogger>("orders.shipped");
});
await builder.Build().RunAsync();
// Messages.cs
public sealed record CancelOrder(string OrderId) : MessageBase, ICommand;
public sealed record OrderShipped(string OrderId, DateTimeOffset ShippedAt) : MessageBase, IEvent;
// Handlers.cs
public sealed class CancelOrderHandler : ICommandHandler<CancelOrder>
{
private readonly ILogger<CancelOrderHandler> _logger;
public CancelOrderHandler(ILogger<CancelOrderHandler> logger) => _logger = logger;
public Task Handle(CancelOrder command, CancellationToken ct)
{
_logger.LogInformation("Order {OrderId} cancelled.", command.OrderId);
return Task.CompletedTask;
}
}
public sealed class ShipmentNotifier : IEventHandler<OrderShipped>
{
private readonly ILogger<ShipmentNotifier> _logger;
public ShipmentNotifier(ILogger<ShipmentNotifier> logger) => _logger = logger;
public Task Handle(OrderShipped @event, CancellationToken ct)
{
_logger.LogInformation("Order {OrderId} shipped.", @event.OrderId);
return Task.CompletedTask;
}
}
public sealed class AuditLogger : IEventHandler<OrderShipped>
{
private readonly ILogger<AuditLogger> _logger;
public AuditLogger(ILogger<AuditLogger> logger) => _logger = logger;
public Task Handle(OrderShipped @event, CancellationToken ct)
{
_logger.LogInformation("[AUDIT] Shipment for order {OrderId}.", @event.OrderId);
return Task.CompletedTask;
}
}
12. DOmniBus.Lite vs DOmniBus (Full)
| DOmniBus.Lite | DOmniBus (Full) | |
|---|---|---|
| Projects | 1 | 6 (Abstractions, Core, Behaviors, Hosting, Transport.Kafka, Generators) |
| Setup | AddDOmniBusLite() |
AddDOmniBus() + AddKafkaTransport() + AddBusHosting() + AddDOmniBusBehaviors() |
| Message types | ICommand, IEvent |
ICommand, IEvent, IRequest<TResponse> |
| Request/Reply | No | Yes (shared reply consumer over Kafka) |
| Pipeline behaviors | No | Logging, Validation, Tracing, Retry |
| HTTP ingress | No | /bus/request, /bus/command, /bus/event |
| Handler registration | bus.Map<TMsg, THandler>("topic") |
Manual DI: services.AddTransient<>() |
| Serializer | Hardcoded System.Text.Json | Pluggable ISerializer |
| wireType | bus.Map<T, H>("topic", wireType: "name") |
registry.RegisterAs<T>("name") + UseStructuralBinding flag |
13. Troubleshooting
Handler not found: Ensure every message type has a Map<TMessage, THandler>() call, or is manually registered in DI for in-process-only messages.
Messages not consumed: Check that GroupId matches between producer and consumer services. Verify topics exist on the Kafka cluster.
Mapped topic does not exist yet: Handler-backed mappings are auto-created by default before the consumer subscribes, using 1 partition and replication factor 1. For production clusters where topics are managed by infrastructure, set AutoCreateConsumerTopics = false; the consumer will warn and retry instead of crashing until the topic exists.
DLQ messages: Inspect <topic>.dlq for the original payload. The error header contains the exception message.
wireType mismatch: Both services must use the exact same wireType string. The string is case-insensitive.
Deserialization errors with wireType: Field names must match between the two models (JSON property name matching, camelCase by default). Extra fields are ignored, missing fields get default values.
License
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net5.0 was computed. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
| .NET Core | netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
| .NET Standard | netstandard2.1 is compatible. |
| MonoAndroid | monoandroid was computed. |
| MonoMac | monomac was computed. |
| MonoTouch | monotouch was computed. |
| Tizen | tizen60 was computed. |
| Xamarin.iOS | xamarinios was computed. |
| Xamarin.Mac | xamarinmac was computed. |
| Xamarin.TVOS | xamarintvos was computed. |
| Xamarin.WatchOS | xamarinwatchos was computed. |
-
.NETStandard 2.1
- Confluent.Kafka (>= 2.4.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 8.0.1)
- Microsoft.Extensions.Hosting.Abstractions (>= 8.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 8.0.1)
- Microsoft.Extensions.Options (>= 8.0.2)
- System.Text.Json (>= 8.0.5)
- System.Threading.Channels (>= 8.0.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.