DSeries.DOmniBus.Lite 1.0.5

dotnet add package DSeries.DOmniBus.Lite --version 1.0.5
                    
NuGet\Install-Package DSeries.DOmniBus.Lite -Version 1.0.5
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="DSeries.DOmniBus.Lite" Version="1.0.5" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="DSeries.DOmniBus.Lite" Version="1.0.5" />
                    
Directory.Packages.props
<PackageReference Include="DSeries.DOmniBus.Lite" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add DSeries.DOmniBus.Lite --version 1.0.5
                    
#r "nuget: DSeries.DOmniBus.Lite, 1.0.5"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package DSeries.DOmniBus.Lite@1.0.5
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=DSeries.DOmniBus.Lite&version=1.0.5
                    
Install as a Cake Addin
#tool nuget:?package=DSeries.DOmniBus.Lite&version=1.0.5
                    
Install as a Cake Tool

DOmniBus.Lite

A single-project Kafka message bus for .NET. One call to set up, no pipeline behaviors, no shared contracts required.

The only external runtime dependency is Confluent.Kafka.

Repository layout

Path Description
DOmniBus.Lite/ The library (NuGet package source)
Sample.Lite/ Minimal end-to-end sample app
test/Unit.Lite/ Unit tests
DOmniBus.Lite.sln Solution file

Build & test

dotnet build DOmniBus.Lite.sln
dotnet test  DOmniBus.Lite.sln
dotnet run --project Sample.Lite

1. Install

Reference DOmniBus.Lite from your project:

<ProjectReference Include="path/to/DOmniBus.Lite/DOmniBus.Lite.csproj" />

The only external dependency is Confluent.Kafka.

2. Define Messages

Two message types: commands (fire-and-forget, one handler) and events (fan-out, many handlers).

using DOmniBus.Lite;

// Command — exactly one handler, no response
public sealed record CancelOrder(string OrderId) : MessageBase, ICommand;

// Event — zero or more handlers
public sealed record OrderShipped(string OrderId, DateTimeOffset ShippedAt) : MessageBase, IEvent;

MessageBase gives you MessageId, CorrelationId, CausationId, CreatedAt, and a Headers dictionary for free.

3. Write Handlers

public sealed class CancelOrderHandler : ICommandHandler<CancelOrder>
{
    public Task Handle(CancelOrder command, CancellationToken ct)
    {
        // your logic
        return Task.CompletedTask;
    }
}

public sealed class ShipmentNotifier : IEventHandler<OrderShipped>
{
    public Task Handle(OrderShipped @event, CancellationToken ct)
    {
        // your logic
        return Task.CompletedTask;
    }
}

public sealed class AuditLogger : IEventHandler<OrderShipped>
{
    public Task Handle(OrderShipped @event, CancellationToken ct)
    {
        // second handler — both fire when OrderShipped is published
        return Task.CompletedTask;
    }
}

Handlers are resolved from DI. Constructor injection works normally.

4. Wire Everything Up

One call. Topics, handlers, and Kafka settings — all in one place:

builder.Services.AddDOmniBusLite(bus =>
{
    bus.BootstrapServers = "localhost:9092";
    bus.GroupId = "my-service";

    bus.Map<CancelOrder, CancelOrderHandler>("orders.commands");
    bus.Map<OrderShipped, ShipmentNotifier>("orders.shipped");
    bus.Map<OrderShipped, AuditLogger>("orders.shipped");
});

Map<TMessage, THandler>("topic") does three things:

  1. Routes the message type to a Kafka topic
  2. Registers the handler in DI as transient
  3. Registers the message type so the consumer can deserialize it

No other registration is needed. No services.AddTransient<>(), no TypeRegistry, no AddKafkaTransport().

Publisher-only services

If a service only sends a message type and never consumes it, use the handler-less overload:

bus.Map<OrderShipped>("orders.shipped");                       // producer-only
bus.Map<OrderShipped>("orders.shipped", wireType: "OrderShipped"); // with wireType

The service will produce to the topic when you call _bus.Publish(new OrderShipped(...)), but the built-in Kafka consumer will not subscribe to it. A service with only producer-only mappings won't start a consumer at all.

5. Dispatch Messages

Inject IMessageBus anywhere:

public class OrderService
{
    private readonly IMessageBus _bus;
    public OrderService(IMessageBus bus) => _bus = bus;

    public Task CancelAsync(string orderId, CancellationToken ct)
        => _bus.Execute(new CancelOrder(orderId), ct);

    public Task ShipAsync(string orderId, CancellationToken ct)
        => _bus.Publish(new OrderShipped(orderId, DateTimeOffset.UtcNow), ct);
}
Method Message Type What happens
Execute() ICommand Produces to Kafka topic, consumed by exactly one handler
Publish() IEvent Produces to Kafka topic, consumed by all registered handlers

6. How Routing Works

bus.Execute(new CancelOrder("A123"))
    │
    ├─ Is CancelOrder mapped to a topic?
    │     YES → produce to "orders.commands" on Kafka
    │     NO  → resolve ICommandHandler<CancelOrder> from DI, call Handle() directly
    │
    └─ Is the message inbound from Kafka? (x-domnibus-ingress header set)
          YES → always run locally (prevents infinite loops)

Messages without a Map call run in-process only — no Kafka involved.

Messages arriving from Kafka are stamped with x-domnibus-ingress before being dispatched through the bus, so they always execute locally on the consumer side.

7. Kafka Consumer

AddDOmniBusLite() automatically starts a BackgroundService that subscribes to all mapped topics. No extra call needed.

Single-threaded (default)

bus.MaxConcurrency = 1;  // default — one message at a time

Simplest mental model. Messages are processed sequentially, offsets committed after each.

Parallel

bus.MaxConcurrency = 8;  // 8 worker tasks process messages in parallel

Uses a bounded Channel<T> internally. Messages are fanned out to worker tasks. Offsets are committed after each message completes.

8. Dead-Letter Queue

bus.EnableDeadLetterQueue = true;  // default false

When enabled, messages that fail processing are forwarded to <topic>.dlq:

orders.commands → handler throws → orders.commands.dlq

The original payload and all headers are preserved. An error header is added with the exception message. The offset is committed so the consumer moves on.

When disabled, failed messages block the consumer (the offset is not committed and the message will be redelivered).

9. wireType — Independent Models Across Services

By default, DOmniBus.Lite stamps the full CLR type name in the Kafka message-type header:

message-type: "MyApp.Orders.OrderShipped, MyApp"

This means the consuming service must reference the same assembly to deserialize it. For microservices that don't want to share contracts, use wireType:

bus.Map<OrderShipped, ShipmentNotifier>("orders.shipped", wireType: "OrderShipped");

Now the header is:

message-type: "OrderShipped"

How two services communicate without shared models

Service A — Publisher:

// Service A's own model
public sealed record OrderShipped(string OrderId, DateTimeOffset ShippedAt)
    : MessageBase, IEvent;

builder.Services.AddDOmniBusLite(bus =>
{
    bus.Map<OrderShipped, SomeHandler>("orders.shipped", wireType: "OrderShipped");
});

await bus.Publish(new OrderShipped("A123", DateTimeOffset.UtcNow));
// Kafka header: message-type = "OrderShipped"

Service B — Consumer (completely separate codebase):

// Service B's own model — different class name, different namespace
public sealed record ShipmentEvent(string OrderId, DateTimeOffset ShippedAt)
    : MessageBase, IEvent;

builder.Services.AddDOmniBusLite(bus =>
{
    bus.Map<ShipmentEvent, MyHandler>("orders.shipped", wireType: "OrderShipped");
    //                                                  ^^^^^^^^^^^^^^^^^^^^^^^^
    //                     matches what Service A stamps in the header
});

When the consumer receives a message with message-type: "OrderShipped", it maps to ShipmentEvent and deserializes the JSON payload by field name. Extra fields in the JSON are ignored, missing fields get default values.

Rules

Scenario What to do
Both services share the same assembly No wireType needed (default)
Independent models, same class name wireType: "OrderShipped" on both sides
Independent models, different class names wireType: "OrderShipped" on both sides — the wire name is the contract
Multiple handlers with wireType Use the same wireType string on all Map calls for that type

10. Configuration Reference

using Confluent.Kafka;

builder.Services.AddDOmniBusLite(bus =>
{
    // --- Kafka connection ---
    bus.BootstrapServers = "localhost:9092";   // default: "localhost:9092"
    bus.ClientId         = "my-service";       // default: "DOmniBus.Lite"
    bus.GroupId          = "my-service";       // default: "DOmniBus.Lite"

    // --- Security / auth (e.g. Confluent Cloud, MSK SASL, Aiven) ---
    bus.SecurityProtocol = SecurityProtocol.SaslSsl;
    bus.SaslMechanism    = SaslMechanism.Plain;
    bus.SaslUsername     = "<api-key>";
    bus.SaslPassword     = "<api-secret>";

    // --- Consumer ---
    bus.MaxConcurrency     = 8;                        // default: 1
    bus.AutoOffsetReset    = AutoOffsetReset.Earliest; // default: Earliest
    bus.SessionTimeoutMs   = 45_000;                   // default: library/broker default
    bus.MaxPollIntervalMs  = 300_000;                  // default: library/broker default
    bus.AutoCreateConsumerTopics = true;               // default: true
    bus.AutoCreateTopicNumPartitions = 1;              // default: 1
    bus.AutoCreateTopicReplicationFactor = 1;          // default: 1
    bus.TopicCreationRetryDelayMs = 5_000;             // default: 5000

    // --- Producer ---
    bus.EnableIdempotence = true;                      // default: true
    bus.Acks              = Acks.All;                  // default: broker/library default
    bus.CompressionType   = CompressionType.Zstd;      // default: none

    // --- Dead-letter queue ---
    bus.EnableDeadLetterQueue = true;                  // default: false
    bus.DeadLetterTopicSuffix = ".dlq";                // default: ".dlq"

    // --- Escape hatches for anything else Confluent.Kafka exposes ---
    bus.ConfigureProducer = cfg => cfg.LingerMs = 20;
    bus.ConfigureConsumer = cfg => cfg.FetchMinBytes = 1024;
    bus.ConfigureAdmin = cfg => cfg.ClientId = "my-service-admin";

    // --- Mappings ---
    bus.Map<CancelOrder, CancelOrderHandler>("orders.commands");
    bus.Map<OrderShipped, ShipmentNotifier>("orders.shipped");
    bus.Map<OrderShipped, AuditLogger>("orders.shipped");

    // --- wireType (optional, per mapping) ---
    bus.Map<ShipmentEvent, MyHandler>("orders.shipped", wireType: "OrderShipped");
});

You can also load Kafka settings from configuration:

builder.Services.AddDOmniBusLite(bus =>
{
    var kafka = builder.Configuration.GetSection("Kafka");
    bus.BootstrapServers = kafka["BootstrapServers"] ?? bus.BootstrapServers;
    bus.GroupId = kafka["GroupId"] ?? bus.GroupId;

    bus.Map<CancelOrder, CancelOrderHandler>("orders.commands");
});

11. Full Example

// Program.cs
using Microsoft.Extensions.Hosting;
using DOmniBus.Lite;

var builder = Host.CreateApplicationBuilder(args);

builder.Services.AddDOmniBusLite(bus =>
{
    bus.BootstrapServers = "localhost:9092";
    bus.GroupId = "order-service";
    bus.MaxConcurrency = 4;
    bus.EnableDeadLetterQueue = true;

    bus.Map<CancelOrder, CancelOrderHandler>("orders.commands");
    bus.Map<OrderShipped, ShipmentNotifier>("orders.shipped");
    bus.Map<OrderShipped, AuditLogger>("orders.shipped");
});

await builder.Build().RunAsync();

// Messages.cs
public sealed record CancelOrder(string OrderId) : MessageBase, ICommand;
public sealed record OrderShipped(string OrderId, DateTimeOffset ShippedAt) : MessageBase, IEvent;

// Handlers.cs
public sealed class CancelOrderHandler : ICommandHandler<CancelOrder>
{
    private readonly ILogger<CancelOrderHandler> _logger;
    public CancelOrderHandler(ILogger<CancelOrderHandler> logger) => _logger = logger;

    public Task Handle(CancelOrder command, CancellationToken ct)
    {
        _logger.LogInformation("Order {OrderId} cancelled.", command.OrderId);
        return Task.CompletedTask;
    }
}

public sealed class ShipmentNotifier : IEventHandler<OrderShipped>
{
    private readonly ILogger<ShipmentNotifier> _logger;
    public ShipmentNotifier(ILogger<ShipmentNotifier> logger) => _logger = logger;

    public Task Handle(OrderShipped @event, CancellationToken ct)
    {
        _logger.LogInformation("Order {OrderId} shipped.", @event.OrderId);
        return Task.CompletedTask;
    }
}

public sealed class AuditLogger : IEventHandler<OrderShipped>
{
    private readonly ILogger<AuditLogger> _logger;
    public AuditLogger(ILogger<AuditLogger> logger) => _logger = logger;

    public Task Handle(OrderShipped @event, CancellationToken ct)
    {
        _logger.LogInformation("[AUDIT] Shipment for order {OrderId}.", @event.OrderId);
        return Task.CompletedTask;
    }
}

12. DOmniBus.Lite vs DOmniBus (Full)

DOmniBus.Lite DOmniBus (Full)
Projects 1 6 (Abstractions, Core, Behaviors, Hosting, Transport.Kafka, Generators)
Setup AddDOmniBusLite() AddDOmniBus() + AddKafkaTransport() + AddBusHosting() + AddDOmniBusBehaviors()
Message types ICommand, IEvent ICommand, IEvent, IRequest<TResponse>
Request/Reply No Yes (shared reply consumer over Kafka)
Pipeline behaviors No Logging, Validation, Tracing, Retry
HTTP ingress No /bus/request, /bus/command, /bus/event
Handler registration bus.Map<TMsg, THandler>("topic") Manual DI: services.AddTransient<>()
Serializer Hardcoded System.Text.Json Pluggable ISerializer
wireType bus.Map<T, H>("topic", wireType: "name") registry.RegisterAs<T>("name") + UseStructuralBinding flag

13. Troubleshooting

Handler not found: Ensure every message type has a Map<TMessage, THandler>() call, or is manually registered in DI for in-process-only messages.

Messages not consumed: Check that GroupId matches between producer and consumer services. Verify topics exist on the Kafka cluster.

Mapped topic does not exist yet: Handler-backed mappings are auto-created by default before the consumer subscribes, using 1 partition and replication factor 1. For production clusters where topics are managed by infrastructure, set AutoCreateConsumerTopics = false; the consumer will warn and retry instead of crashing until the topic exists.

DLQ messages: Inspect <topic>.dlq for the original payload. The error header contains the exception message.

wireType mismatch: Both services must use the exact same wireType string. The string is case-insensitive.

Deserialization errors with wireType: Field names must match between the two models (JSON property name matching, camelCase by default). Extra fields are ignored, missing fields get default values.

License

MIT

Product Compatible and additional computed target framework versions.
.NET net5.0 was computed.  net5.0-windows was computed.  net6.0 was computed.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
.NET Core netcoreapp3.0 was computed.  netcoreapp3.1 was computed. 
.NET Standard netstandard2.1 is compatible. 
MonoAndroid monoandroid was computed. 
MonoMac monomac was computed. 
MonoTouch monotouch was computed. 
Tizen tizen60 was computed. 
Xamarin.iOS xamarinios was computed. 
Xamarin.Mac xamarinmac was computed. 
Xamarin.TVOS xamarintvos was computed. 
Xamarin.WatchOS xamarinwatchos was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.0.5 103 5/11/2026
1.0.4 156 4/24/2026
1.0.3 114 4/18/2026
1.0.2 92 4/18/2026
1.0.1 98 4/16/2026
1.0.0 103 4/15/2026