Gizza.MessageBroker.Persistence.SqlServer 2.0.1

dotnet add package Gizza.MessageBroker.Persistence.SqlServer --version 2.0.1
                    
NuGet\Install-Package Gizza.MessageBroker.Persistence.SqlServer -Version 2.0.1
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Gizza.MessageBroker.Persistence.SqlServer" Version="2.0.1" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Gizza.MessageBroker.Persistence.SqlServer" Version="2.0.1" />
                    
Directory.Packages.props
<PackageReference Include="Gizza.MessageBroker.Persistence.SqlServer" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Gizza.MessageBroker.Persistence.SqlServer --version 2.0.1
                    
#r "nuget: Gizza.MessageBroker.Persistence.SqlServer, 2.0.1"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Gizza.MessageBroker.Persistence.SqlServer@2.0.1
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Gizza.MessageBroker.Persistence.SqlServer&version=2.0.1
                    
Install as a Cake Addin
#tool nuget:?package=Gizza.MessageBroker.Persistence.SqlServer&version=2.0.1
                    
Install as a Cake Tool

Gizza.MessageBroker

Gizza.MessageBroker is a high-performance, engine-agnostic messaging library for .NET. It provides a small core API for common messaging patterns while keeping broker-specific behavior inside adapter packages.

The project was designed as a lightweight alternative to large messaging frameworks when you want direct control over broker clients, topology, persistence, retries, and runtime behavior without depending on MassTransit.

Current Status

The core API is implemented. RabbitMQ, NATS, Kafka, AWS SQS/SNS, Azure Service Bus, Azure Event Hubs, Google Cloud Pub/Sub, Apache ActiveMQ, and in-memory adapters are available. The in-memory adapter is intended for tests and local development, not as a production broker replacement.

Area Status
Core API Available
RabbitMQ adapter Available
SQL Server persistence Available
PostgreSQL persistence Available
SQLite persistence Available
Oracle persistence Available
NATS adapter Available
Kafka adapter Available
AWS SQS/SNS adapter Available
Azure Service Bus adapter Available
Azure Event Hubs adapter Available
Google Cloud Pub/Sub adapter Available
Apache ActiveMQ adapter Available
In-memory adapter Available

All library projects currently target:

net8.0;net9.0;net10.0

The sample application targets .NET 10.

Package Layout

Applications should code against the core package and add only the adapter/provider packages they need.

Package Purpose
Gizza.MessageBroker Core abstractions, endpoint models, serializers, BrokerFactory, DI registration, diagnostics hooks, in-memory inbox/outbox stores
Gizza.MessageBroker.RabbitMQ RabbitMQ transport adapter using the official RabbitMQ .NET client
Gizza.MessageBroker.Nats NATS transport adapter using the official NATS .NET client
Gizza.MessageBroker.Kafka Apache Kafka transport adapter using Confluent.Kafka
Gizza.MessageBroker.Aws AWS SQS/SNS transport adapter using the official AWS SDK for .NET
Gizza.MessageBroker.Azure Azure Service Bus and Event Hubs transport adapter using the official Azure SDK packages
Gizza.MessageBroker.GoogleCloud Google Cloud Pub/Sub transport adapter using the official Google Cloud .NET client
Gizza.MessageBroker.ActiveMQ Apache ActiveMQ Classic OpenWire adapter using the official Apache NMS ActiveMQ client
Gizza.MessageBroker.Persistence Database-engine agnostic durable outbox/inbox/stream checkpoint infrastructure, SQL store base classes, cleanup jobs, Dapper-based implementation
Gizza.MessageBroker.Persistence.SqlServer SQL Server persistence provider
Gizza.MessageBroker.Persistence.PostgreSql PostgreSQL persistence provider
Gizza.MessageBroker.Persistence.Sqlite SQLite persistence provider
Gizza.MessageBroker.Persistence.Oracle Oracle persistence provider
Gizza.MessageBroker.InMemory In-memory adapter for Queue, Topic, Stream, and Request contracts, intended for tests and local development
Gizza.MessageBroker.Samples Console sample for Queue, Topic, Stream, Request, smoke testing, and DI smoke testing

This mirrors the common NuGet pattern used by libraries such as Serilog: the core package defines the public programming model, and adapter packages provide concrete engines.

Messaging Patterns

Gizza.MessageBroker intentionally uses broker-neutral names in the public API. Application code describes the messaging intent; each adapter translates that intent to RabbitMQ, NATS, Kafka, AWS, Azure, Google Cloud, or another engine.

Queue

Use Queue when each published message should be handled by only one worker from a group of competing consumers.

Think of a Queue as a shared work line. Many services or many instances of the same service can listen to the same logical queue, but each individual message is assigned to one of them. This is the right model for commands and jobs where duplicate parallel handling would be wrong or wasteful.

Typical examples:

  • Send an email.
  • Process an order command.
  • Run an import/export job.
  • Execute one unit of background work.

Example: five worker instances listen to orders.submit. If 100 messages are published, the workers share those 100 messages between them. A single message is not broadcast to all five workers.

Gizza.MessageBroker API:

  • Publisher: IQueuePublisher
  • Consumer: IQueueConsumer
  • Options: QueueEndpointOptions

Engine mapping:

  • RabbitMQ: exchange + queue + binding, consumed by competing consumers.
  • NATS: subject + queue group.
  • Kafka: topic + shared group.id.
  • AWS: SQS queue.
  • Azure: Service Bus queue.
  • Google Cloud: Pub/Sub topic + shared subscription.
  • ActiveMQ: JMS queue with competing consumers.

Topic

Use Topic when every logical subscriber should receive every published message.

Topic is fanout/pub-sub. The publisher does not care how many subscribers exist. Each subscriber represents an independent interest in the same event. If three services subscribe to the same topic, all three should see the same event once.

Typical examples:

  • Publish OrderCreated.
  • Notify accounting, risk, and analytics services.
  • Broadcast a strategy signal to multiple independent listeners.
  • Send domain events to several bounded contexts.

Example: orders.created is published once. Billing records an invoice, analytics updates metrics, and notification sends an email. These are different subscribers, so all of them receive the message.

Gizza.MessageBroker API:

  • Publisher: ITopicPublisher
  • Subscriber: ITopicSubscriber
  • Options: TopicEndpointOptions

Engine mapping:

  • RabbitMQ: exchange + one subscriber queue per subscription.
  • NATS: normal subject subscription, no queue group.
  • Kafka: same topic, different group.id per logical subscriber.
  • AWS: SNS topic + SQS subscription.
  • Azure: Service Bus topic + subscription.
  • Google Cloud: Pub/Sub topic + one subscription per logical subscriber.
  • ActiveMQ: JMS topic, usually with durable subscriptions for named subscribers.

Important distinction: a Topic is live fanout. It is not primarily an event log. If replay, retention, or offset control matters, use Stream.

Request

Use Request when one component needs a response from another component through the broker.

Request is broker-based RPC. A client sends a request message with correlation metadata, a handler processes it, and a response is routed back to the waiting client. This is useful when the caller wants a direct answer but the communication should still flow through the messaging infrastructure instead of HTTP.

Typical examples:

  • Ask a pricing service to calculate a value.
  • Ask a risk service whether an order is acceptable.
  • Run a command that must return a result to the caller.
  • Query a service that is only reachable through broker infrastructure.

Example: a trading strategy sends CalculatePositionSize. A risk service handles the request and replies with PositionSizeCalculated. The client waits until the response arrives or the timeout expires.

Gizza.MessageBroker API:

  • Client: IRequestClient
  • Handler: IRequestHandler
  • Options: RequestEndpointOptions

Engine mapping:

  • RabbitMQ: request queue + ReplyTo + CorrelationId.
  • NATS: native request/reply.
  • Kafka: request topic + reply topic + correlation metadata.
  • AWS: SQS request queue + reply queue + correlation metadata.
  • Azure: Service Bus request queue + reply queue + correlation metadata.
  • Google Cloud: Pub/Sub request topic + temporary reply topic/subscription + correlation metadata.
  • ActiveMQ: request queue + temporary reply queue + correlation metadata.

Request should be used carefully. It introduces waiting, timeout handling, and tighter coupling than Queue or Topic. For high-volume fire-and-forget workflows, Queue or Topic is usually a better fit.

Stream

Use Stream when messages must be durable, ordered enough for the use case, and replayable from a known position.

Stream is an event log abstraction. Consumers can start from the next message, the first retained message, the last message, a numeric offset, or a timestamp, depending on engine support. This is the right model when the history matters, not only the live delivery.

Typical examples:

  • Market tick history.
  • Order book updates.
  • Audit/event feeds.
  • Event-sourced state rebuilds.
  • Analytics consumers that may need to replay data.

Example: a market data publisher writes every BTCUSDT tick to market.ticks. A live strategy starts from Next, while an analytics service starts from First or from a timestamp to rebuild its local state.

Gizza.MessageBroker API:

  • Publisher: IStreamPublisher
  • Consumer: IStreamConsumer
  • Options: StreamEndpointOptions

Engine mapping:

  • RabbitMQ: RabbitMQ stream queue.
  • NATS: JetStream stream + durable consumer.
  • Kafka: topic + offset assignment/consumer group.
  • AWS: Kinesis would fit this model, but it is outside the first AWS phase.
  • Azure: Event Hubs + consumer group checkpointing.
  • Google Cloud: outside the Pub/Sub adapter scope; Pub/Sub is modeled as Queue, Topic, and Request here.
  • ActiveMQ: outside the first ActiveMQ phase unless a provider-specific log/stream model is introduced.

Stream consumers can choose a replay start position:

var options = new StreamEndpointOptions
{
    Address = BrokerAddress.Stream("market.ticks"),
    ConsumerName = "analytics-reader",
    StartPosition = StreamStartPosition.First,
    Checkpoint = StreamCheckpointOptions.StoreBacked(new InMemoryStreamCheckpointStore())
};

Checkpointing is optional. When enabled, adapters that support it can save progress after successful Ack handling and resume from the last stored position on restart. Use InMemoryStreamCheckpointStore for process-local tests, or provider stores such as PostgreSqlStreamCheckpointStore, SqlServerStreamCheckpointStore, SqliteStreamCheckpointStore, and OracleStreamCheckpointStore when progress must survive process restarts.

Important distinction: a Stream is not just a Topic with more subscribers. Topic answers "who should receive this live event?" Stream answers "what durable log should this message be appended to, and where should each consumer read from?"

Quick Start with RabbitMQ

Install the core package and the RabbitMQ adapter:

dotnet add package Gizza.MessageBroker
dotnet add package Gizza.MessageBroker.RabbitMQ

For local development from this repository, start RabbitMQ:

docker compose -f tests\docker-compose.yml up -d rabbitmq

RabbitMQ defaults used by the sample and integration tests:

Host: 127.0.0.1
Port: 5673
Username: gizza
Password: gizza
Connection string: amqp://gizza:gizza@127.0.0.1:5673/
Management UI: http://127.0.0.1:15673

Quick Start with NATS

Install the core package and the NATS adapter:

dotnet add package Gizza.MessageBroker
dotnet add package Gizza.MessageBroker.Nats

For local development from this repository, start NATS:

docker compose -f tests\docker-compose.yml --profile messaging up -d nats

NATS defaults used by integration tests:

Url: nats://127.0.0.1:14222
Monitoring: http://127.0.0.1:18222

Quick Start with Kafka

Install the core package and the Kafka adapter:

dotnet add package Gizza.MessageBroker
dotnet add package Gizza.MessageBroker.Kafka

For local development from this repository, start Kafka:

docker compose -f tests\docker-compose.yml --profile messaging up -d kafka

Kafka defaults used by integration tests:

Bootstrap servers: 127.0.0.1:19092
Container-internal bootstrap: kafka:9092

Basic Usage Examples

The examples below use RabbitMQ, but the same core endpoint code works with NATS or Kafka when the transport and broker options use BrokerEngine.NATS or BrokerEngine.Kafka.

Queue Publish

using Gizza.MessageBroker;

var brokerOptions = BrokerFactory.BuildBrokerOptions(
    BrokerEngine.RabbitMQ,
    host: "127.0.0.1",
    port: 5673,
    username: "gizza",
    password: "gizza",
    connectionString: "amqp://gizza:gizza@127.0.0.1:5673/");

await using var transport = BrokerFactory.BuildQueueTransport(BrokerEngine.RabbitMQ, brokerOptions);

var options = new QueueEndpointOptions
{
    Address = BrokerAddress.Queue("orders")
};

var publisher = await transport.BuildQueuePublisherAsync(options);
await publisher.PublishAsync(new OrderCreated("BTCUSDT", 1));

public sealed record OrderCreated(string Symbol, int Quantity);

Queue Consume

using Gizza.MessageBroker;

await using var transport = BrokerFactory.BuildQueueTransport(BrokerEngine.RabbitMQ, brokerOptions);

var options = new QueueEndpointOptions
{
    Address = BrokerAddress.Queue("orders")
};

var consumer = await transport.BuildQueueConsumerAsync(options, async (context, ct) =>
{
    var message = options.Serializer.Deserialize<OrderCreated>(context.Message.Payload);
    await HandleOrderAsync(message, ct);
    return ConsumeResult.Ack();
});

await consumer.StartAsync();

Consumer acknowledgements are controlled by the handler result. Gizza.MessageBroker does not ACK before the handler completes.

Common results:

  • ConsumeResult.Ack()
  • ConsumeResult.Requeue("temporary failure")
  • ConsumeResult.Nack("invalid message")
  • ConsumeResult.DeadLetter("poison message")
  • ConsumeResult.Defer(TimeSpan.FromSeconds(10), "retry later")

Delivery Order and Throughput

Endpoint consumption is ordered by default. QueueEndpointOptions, TopicEndpointOptions, StreamEndpointOptions, and RequestEndpointOptions default to PrefetchCount = 1; when MaxConcurrentMessages is not set, adapters run one handler at a time. RabbitMQ also defaults RabbitMqBrokerOptions.ConsumerDispatchConcurrency to 1, so the official client dispatch layer does not reorder handler callbacks. For a single publisher or a broker-defined stream/log append order, handlers therefore observe messages in source order.

Applications that prefer throughput over strict handler order can opt into parallel consumption explicitly:

var options = new TopicEndpointOptions
{
    Address = BrokerAddress.Topic("market.signals"),
    PrefetchCount = 32,
    MaxConcurrentMessages = 8
};

var brokerOptions = new RabbitMqBrokerOptions
{
    ConsumerDispatchConcurrency = 8
};

PrefetchCount controls how many messages may be in flight from the broker. MaxConcurrentMessages controls how many handlers may run at the same time in the process. For RabbitMQ, ConsumerDispatchConcurrency controls how many deliveries the official client may dispatch concurrently before the adapter handler limit is applied. Once these settings are raised for parallel throughput, handler completion order and console/log output order are not guaranteed. With multiple concurrent publishers, the only stable ordering reference is the broker's accepted queue or stream order.

Topic Publish and Subscribe

await using var transport = BrokerFactory.BuildTopicTransport(BrokerEngine.RabbitMQ, brokerOptions);

var publisherOptions = new TopicEndpointOptions
{
    Address = BrokerAddress.Topic("orders.events")
};

var subscriberOptions = new TopicEndpointOptions
{
    Address = BrokerAddress.Topic("orders.events"),
    SubscriptionName = "billing-service"
};

var subscriber = await transport.BuildTopicSubscriberAsync(subscriberOptions, (context, ct) =>
{
    var message = subscriberOptions.Serializer.Deserialize<OrderCreated>(context.Message.Payload);
    return Task.FromResult(ConsumeResult.Ack());
});

await subscriber.StartAsync();

var publisher = await transport.BuildTopicPublisherAsync(publisherOptions);
await publisher.PublishAsync(new OrderCreated("ETHUSDT", 2));

Stream Publish and Consume

await using var transport = BrokerFactory.BuildStreamTransport(BrokerEngine.RabbitMQ, brokerOptions);

var options = new StreamEndpointOptions
{
    Address = BrokerAddress.Stream("market.ticks"),
    ConsumerName = "analytics-reader",
    StartPosition = StreamStartPosition.First
};

var publisher = await transport.BuildStreamPublisherAsync(options);
await publisher.PublishAsync(new MarketTick("BTCUSDT", 95000m));

var consumer = await transport.BuildStreamConsumerAsync(options, (context, ct) =>
{
    var tick = options.Serializer.Deserialize<MarketTick>(context.Message.Payload);
    return Task.FromResult(ConsumeResult.Ack());
});

await consumer.StartAsync();

public sealed record MarketTick(string Symbol, decimal Price);

Request/Response

await using var transport = BrokerFactory.BuildRequestTransport(BrokerEngine.RabbitMQ, brokerOptions);

var options = new RequestEndpointOptions
{
    Address = BrokerAddress.Request("pricing.calculate"),
    Timeout = TimeSpan.FromSeconds(10)
};

var handler = await transport.BuildRequestHandlerAsync(options, (context, ct) =>
{
    var request = options.Serializer.Deserialize<PricingRequest>(context.Message.Payload);
    var response = new PricingResponse(request.Symbol, price: 100);
    return Task.FromResult(BrokerMessage.FromPayload(options.Serializer.Serialize(response), options.Serializer.ContentType));
});

await handler.StartAsync();

var client = await transport.BuildRequestClientAsync(options);
var result = await client.RequestAsync<PricingRequest, PricingResponse>(
    new PricingRequest("BTCUSDT"),
    TimeSpan.FromSeconds(10));

public sealed record PricingRequest(string Symbol);
public sealed record PricingResponse(string Symbol, decimal Price);

Dependency Injection

The core package contains DI registration. Adapter packages remain implementation packages.

using Gizza.MessageBroker;
using Microsoft.Extensions.DependencyInjection;

services.AddGizzaMessageBroker(builder =>
{
    builder.AddTransport("rabbit", BrokerEngine.RabbitMQ, brokerOptions);
    builder.AddHealthChecks();
});

Named transports are resolved lazily through IMessageBrokerTransportRegistry:

var registry = serviceProvider.GetRequiredService<IMessageBrokerTransportRegistry>();
var transport = registry.GetTransport("rabbit");

Typed publishers and request clients can be registered as injectable services:

services.AddGizzaMessageBroker(builder =>
{
    builder.AddTransport("rabbit", BrokerEngine.RabbitMQ, brokerOptions);
    builder.AddQueuePublisher<OrderCreated>("rabbit", new QueueEndpointOptions
    {
        Address = BrokerAddress.Queue("orders")
    });
    builder.AddTopicPublisher<OrderCreated>("rabbit", new TopicEndpointOptions
    {
        Address = BrokerAddress.Topic("orders.events")
    });
    builder.AddRequestClient<PricingRequest, PricingResponse>("rabbit", new RequestEndpointOptions
    {
        Address = BrokerAddress.Request("pricing.calculate")
    });
});

public sealed class OrderService
{
    private readonly IQueuePublisher<OrderCreated> _publisher;

    public OrderService(IQueuePublisher<OrderCreated> publisher)
    {
        _publisher = publisher;
    }

    public Task PublishAsync(OrderCreated message, CancellationToken ct)
    {
        return _publisher.PublishAsync(message, ct);
    }
}

Outbox dispatcher and persistence cleanup hosted services can also be registered through the builder when persistence is enabled.

The builder can register a standard Microsoft health check named gizza_messagebroker. By default it checks every registered transport and, when the adapter implements IMessageBrokerHealthProbe, performs a real connectivity probe. The RabbitMQ adapter opens or validates its connection during this check.

services.AddGizzaMessageBroker(builder =>
{
    builder.AddTransport("rabbit", BrokerEngine.RabbitMQ, brokerOptions);
    builder.AddHealthChecks(options =>
    {
        options.Name = "message-broker-ready";
        options.AddTransport("rabbit");
    });
});

Hosted consumers and request handlers can be registered with the same builder. They are started and stopped through the application host:

services.AddGizzaMessageBroker(builder =>
{
    builder.AddTransport("rabbit", BrokerEngine.RabbitMQ, brokerOptions);
    builder.AddHostedQueueConsumer(
        "rabbit",
        new QueueEndpointOptions
        {
            Address = BrokerAddress.Queue("orders")
        },
        async (context, ct) =>
        {
            var message = SystemTextJsonMessageSerializer.Default
                .Deserialize<OrderCreated>(context.Message.Payload);
            await HandleOrderAsync(message, ct);
            return ConsumeResult.Ack();
        });
});

Serialization

The default serializer is SystemTextJsonMessageSerializer, based on System.Text.Json.

Newtonsoft.Json is not used by the core library and no Newtonsoft compatibility layer is planned. Custom serialization can be added by implementing IMessageSerializer and assigning it to endpoint options.

var options = new QueueEndpointOptions
{
    Address = BrokerAddress.Queue("orders"),
    Serializer = SystemTextJsonMessageSerializer.Default
};

Reliability Features

The core model includes:

  • Handler-controlled ACK/NACK
  • Retry policy and backoff
  • Delayed redelivery
  • Dead-letter policy
  • Fault message publishing
  • Message TTL and expiration metadata
  • Scheduled publish metadata
  • Correlation, causation, conversation, and request ids
  • Observer hooks for publish/consume diagnostics
  • Optional endpoint-level inbox and outbox persistence

Persistence is intentionally disabled by default. High-frequency flows such as ticker data, order book updates, market data, and signals should usually stay in direct mode. Critical commands and state transitions can enable outbox and/or inbox per endpoint.

Persistence

Persistence is split into a database-neutral package and provider packages.

Install the common persistence package plus the database provider you need:

dotnet add package Gizza.MessageBroker.Persistence
dotnet add package Gizza.MessageBroker.Persistence.PostgreSql

Available providers:

  • SQL Server
  • PostgreSQL
  • SQLite
  • Oracle

The persistence layer supports:

  • Durable outbox storage
  • Durable inbox/idempotency storage
  • Durable stream checkpoint storage
  • Provider-specific schema creation
  • Provider-specific SQL dialects
  • Batch claim and lock strategies
  • Expired lock reset
  • Published outbox cleanup when explicit retention is configured
  • Expired inbox cleanup

Published outbox cleanup is opt-in by retention. This is deliberate: the library will not delete published outbox records unless the application explicitly defines how long those records should be retained.

Endpoint-Level Outbox

var outboxStore = new PostgreSqlOutboxStore(postgreSqlOptions);

var options = new QueueEndpointOptions
{
    Address = BrokerAddress.Queue("orders"),
    Persistence = EndpointPersistenceOptions.OutboxOnly(
        OutboxPersistenceOptions.StoreBacked(outboxStore))
};

When outbox is enabled, the endpoint publisher writes to the outbox store. OutboxDispatcher or OutboxDispatcherHostedService then publishes pending messages to the broker.

Endpoint-Level Inbox

var inboxStore = new PostgreSqlInboxStore(postgreSqlOptions);

var options = new QueueEndpointOptions
{
    Address = BrokerAddress.Queue("orders"),
    Persistence = EndpointPersistenceOptions.InboxOnly(
        InboxPersistenceOptions.StoreBacked(inboxStore, "orders-worker"))
};

Inbox persistence is used for idempotent consume behavior. It is independent from outbox persistence.

RabbitMQ Adapter Notes

RabbitMQ currently supports:

  • Queue pattern
  • Topic pattern
  • Stream pattern using RabbitMQ stream queues
  • Request pattern
  • Ordered-by-default consumers with opt-in prefetch and bounded handler concurrency
  • Dead-letter routing
  • Delayed redelivery through TTL retry queues
  • Scheduled publish through delay queues
  • Request timeout, cancellation, and late-response cleanup
  • Role-separated RabbitMQ channels for topology, publishers, consumers, and request flows
  • Automatic connection/channel recovery and consumer re-subscribe verified by controlled container restart integration tests
  • Stream consumer identity plus first, last, next, numeric offset, and timestamp replay starts

RabbitMQ-specific details are configured through transport options:

using Gizza.MessageBroker.RabbitMQ;

var transportOptions = new RabbitMqTransportOptions()
    .UseExchangeName("orders.exchange")
    .UseQueueName("orders.queue")
    .UseDeadLetter("orders.dead.exchange", "orders.dead")
    .UseDelayedRedelivery()
    .UseDelayedDelivery()
    .ToTransportOptions();

var options = new QueueEndpointOptions
{
    Address = BrokerAddress.Queue("orders", "orders.route"),
    TransportOptions = transportOptions
};

The adapter keeps one connection per transport and separates channel ownership by responsibility. Topology declarations use a serialized topology channel; each publisher owns a publish channel; each consumer, subscriber, request client, and request handler owns its runtime channel. Publish and request-client operations are serialized per endpoint instance so a single injected publisher or request client can be used safely from concurrent callers.

RabbitMQ connection recovery uses explicit broker options:

var brokerOptions = new RabbitMqBrokerOptions
{
    ConnectionString = "amqp://gizza:gizza@127.0.0.1:5673/",
    AutomaticRecoveryEnabled = true,
    TopologyRecoveryEnabled = true,
    NetworkRecoveryInterval = TimeSpan.FromSeconds(5),
    RequestedHeartbeat = TimeSpan.FromSeconds(30)
};

Kafka Adapter Notes

Kafka currently supports:

  • Queue pattern with a shared consumer group
  • Topic pattern with one consumer group per logical subscriber
  • Stream pattern with first, last, next, numeric offset, and timestamp replay starts
  • Request pattern with a reply topic and correlation metadata
  • Optional endpoint outbox/inbox integration
  • Ordered-by-default partition consumption with opt-in parallel handler execution controlled by MaxConcurrentMessages
  • Retry/backoff through the shared RetryPolicyOptions contract
  • Dead-letter topic publishing for Nack, DeadLetter, and exhausted retry results
  • Health probing through Kafka metadata

Kafka-specific details are configured through transport options:

using Gizza.MessageBroker.Kafka;

var transportOptions = new KafkaTransportOptions()
    .UseTopic("orders.created")
    .UseConsumerGroup("orders-workers")
    .UseDeadLetterTopic("orders.created.dlq")
    .UseTopicTopology(partitionCount: 3, replicationFactor: 1)
    .ToTransportOptions();

var options = new QueueEndpointOptions
{
    Address = BrokerAddress.Queue("orders.created"),
    TransportOptions = transportOptions
};

Kafka consumer acknowledgements map to offset commits. The adapter keeps Kafka commit, seek, pause, and resume calls on the consumer loop thread, while handlers can run in parallel across assigned partitions. A partition is paused while its active message is being handled, so ordering remains stable within that partition. Requeue, delay-free Defer, and handler exceptions can use RetryPolicyOptions for seek-based redelivery with backoff; exhausted retries are published to the configured dead-letter topic.

ActiveMQ Adapter Notes

The ActiveMQ adapter uses the official Apache NMS ActiveMQ OpenWire client. It currently supports Queue, Topic, durable topic subscriptions, Request/Reply with a temporary reply queue, and dead-letter copy for Nack / DeadLetter consume results.

ActiveMQ consumers use IndividualAcknowledge sessions. Ack acknowledges the source message, Requeue and Defer recover the session for broker redelivery, and Nack / DeadLetter publish a copy to the configured DLQ before acknowledging the source message. The Gizza envelope is preserved and gizza-dead-letter-reason is added to the copied message.

using Gizza.MessageBroker.ActiveMQ;

var options = new QueueEndpointOptions
{
    Address = BrokerAddress.Queue("orders"),
    DeadLetterPolicy = DeadLetterPolicyOptions.ToQueue("orders.dlq"),
    TransportOptions = new ActiveMqTransportOptions()
        .UseQueueName("orders")
        .UseDeadLetterQueueName("orders.dlq")
        .ToTransportOptions()
};

AWS Adapter Notes

The AWS adapter currently supports SQS queue publish/consume, SQS dead-letter redrive configuration, SQS request/reply, SNS topic publish, and SNS + SQS topic subscriptions.

SQS consumers poll with bounded concurrency. Ack and Nack delete the message from the queue. Requeue, Defer, and handler exceptions use ChangeMessageVisibility; Defer(TimeSpan) maps the delay to the message visibility timeout.

When DeadLetterPolicyOptions is configured, the consumer sets the SQS RedrivePolicy on the source queue. Nack and DeadLetter also publish a copy to the configured DLQ and then delete the source message, preserving the Gizza envelope and adding gizza-dead-letter-reason.

Topic subscribers use SNS fanout with one SQS queue per logical subscription. On start, the adapter resolves the queue ARN, optionally merges the required SNS sqs:SendMessage policy statement into the queue policy, subscribes the queue to the topic, and then polls SQS through the same consumer pipeline. Raw SNS delivery is enabled by default so the Gizza envelope and message attributes stay intact.

Request/reply uses a request SQS queue and a temporary reply SQS queue owned by the client. The client writes ReplyTo metadata into the Gizza envelope, polls the reply queue, and correlates responses by RequestId. The handler sends the response to the provided reply queue and ACKs the request only after the response publish succeeds.

AWS-specific details are configured through transport options:

using Gizza.MessageBroker.Aws;

var transportOptions = new AwsTransportOptions()
    .UseQueueName("orders")
    .UseQueuePolling(waitTimeSeconds: 1, visibilityTimeoutSeconds: 30, maxMessagesPerPoll: 1)
    .ToTransportOptions();

var options = new QueueEndpointOptions
{
    Address = BrokerAddress.Queue("orders"),
    DeadLetterPolicy = DeadLetterPolicyOptions.ToQueue("orders.dlq"),
    TransportOptions = transportOptions,
    PrefetchCount = 1,
    MaxConcurrentMessages = 1
};

SQS redrive count can be configured through AWS transport options:

var transportOptions = new AwsTransportOptions()
    .UseQueueName("orders")
    .UseSqsRedrive(maxReceiveCount: 5)
    .ToTransportOptions();

FIFO metadata is opt-in:

var fifoOptions = new AwsTransportOptions()
    .UseQueueName("orders.fifo")
    .UseFifo(messageGroupId: "orders")
    .ToTransportOptions();

SNS + SQS topic subscription:

var subscriberOptions = new TopicEndpointOptions
{
    Address = BrokerAddress.Topic("orders.created", "billing"),
    SubscriptionName = "billing",
    DurableSubscription = true,
    TransportOptions = new AwsTransportOptions()
        .UseTopicArn("arn:aws:sns:us-east-1:123456789012:orders-created")
        .UseQueueName("billing-orders-created")
        .UseQueuePolling(waitTimeSeconds: 1, visibilityTimeoutSeconds: 30, maxMessagesPerPoll: 1)
        .UseRawSnsDelivery()
        .ManageSnsSqsQueuePolicy()
        .ToTransportOptions()
};

SQS request/reply:

var options = new RequestEndpointOptions
{
    Address = BrokerAddress.Request("pricing.requests"),
    Timeout = TimeSpan.FromSeconds(10),
    TransportOptions = new AwsTransportOptions()
        .UseQueueName("pricing-requests")
        .UseQueuePolling(waitTimeSeconds: 1, visibilityTimeoutSeconds: 30, maxMessagesPerPoll: 1)
        .ToTransportOptions()
};

Google Cloud Pub/Sub Adapter Notes

The Google Cloud adapter currently supports Pub/Sub Queue, Topic, and Request workflows. Stream replay is intentionally not exposed by this adapter; use Kafka, NATS JetStream, RabbitMQ streams, or Azure Event Hubs when offset/replay semantics are required.

Queue maps to a Pub/Sub topic plus one shared subscription. Topic maps to a Pub/Sub topic plus one subscription per logical subscriber. Request/reply uses a request topic and a temporary reply topic/subscription owned by the client.

Message ordering is enabled by default through Pub/Sub ordering keys. If no explicit ordering key is provided, the adapter uses default, which favors deterministic delivery over maximum parallelism. Increase MaxConcurrentMessages, PrefetchCount, MaxMessagesPerPull, or disable ordering only when the endpoint can tolerate out-of-order handling.

using Gizza.MessageBroker.GoogleCloud;

var brokerOptions = new GoogleCloudBrokerOptions
{
    ProjectId = "my-gcp-project"
};

var options = new QueueEndpointOptions
{
    Address = BrokerAddress.Queue("orders"),
    PrefetchCount = 1,
    MaxConcurrentMessages = 1,
    TransportOptions = new GoogleCloudTransportOptions()
        .UseTopic("orders-topic")
        .UseSubscription("orders-workers")
        .UseOrdering()
        .UsePulling(maxMessagesPerPull: 1, ackDeadlineSeconds: 10)
        .ToTransportOptions()
};

Local Development

Restore and build:

dotnet restore "Gizza Message Broker.slnx"
dotnet build "Gizza Message Broker.slnx" -c Release --nologo

Start test services:

docker compose -f tests\docker-compose.yml up -d rabbitmq sqlserver-init postgres oracle
docker compose -f tests\docker-compose.yml --profile messaging up -d nats kafka activemq
docker compose -f tests\docker-compose.yml --profile cloud up -d localstack google-pubsub azure-servicebus azure-eventhubs

Run tests:

dotnet test tests\Gizza.MessageBroker.UnitTests\Gizza.MessageBroker.UnitTests.csproj -c Release --no-build --nologo
dotnet test tests\Gizza.MessageBroker.IntegrationTests\Gizza.MessageBroker.IntegrationTests.csproj -c Release --no-build --nologo

The integration test project contains a reusable provider contract suite for Queue, Topic, Request, and Stream behavior. RabbitMQ, NATS, Kafka, AWS, Azure, Google Cloud, and ActiveMQ are wired into that suite where the provider supports the relevant pattern.

Run the BenchmarkDotNet suite:

dotnet run --project tests\Gizza.MessageBroker.Benchmarks\Gizza.MessageBroker.Benchmarks.csproj -c Release -f net10.0 -- --filter *

Run the sample:

dotnet run --project src\Gizza.MessageBroker.Samples\Gizza.MessageBroker.Samples.csproj -c Release -f net10.0 -- smoke
dotnet run --project src\Gizza.MessageBroker.Samples\Gizza.MessageBroker.Samples.csproj -c Release -f net10.0 -- di-smoke
dotnet run --project src\Gizza.MessageBroker.Samples\Gizza.MessageBroker.Samples.csproj -c Release -f net10.0 -- stream-publish 25
dotnet run --project src\Gizza.MessageBroker.Samples\Gizza.MessageBroker.Samples.csproj -c Release -f net10.0 -- stream-consume analytics-reader

Available sample commands:

queue-publish
queue-consume
topic-publish
topic-subscribe
request-client
request-handler
stream-publish
stream-consume
stream-replay
smoke
di-smoke

Roadmap

Near-term work:

  • Keep Saga/orchestration preparation design-only until the orchestration package boundary is explicitly opened.
  • Expand benchmarks with real broker scenarios as adapter-level tuning decisions are made.

Out of initial cloud scope:

  • AWS EventBridge
  • AWS Kinesis

Design Principles

  • Keep the public API broker-neutral.
  • Use official provider clients in adapter packages.
  • Keep persistence optional and endpoint-scoped.
  • Favor direct broker hot paths for high-frequency traffic.
  • Use outbox/inbox only for flows that need durability or idempotency.
  • Keep adapter-specific topology and protocol details out of core application code.
  • Make each feature independently testable without requiring a full application host.

Notes for Future Maintainers and Codex

If you are revisiting this project later, this is the short mental model:

  • Gizza.MessageBroker is the core API. Do not put RabbitMQ, Kafka, NATS, AWS, Azure, or ActiveMQ protocol logic there.
  • Adapter packages translate core contracts to concrete broker operations.
  • Persistence follows the same plugin shape: common API in Gizza.MessageBroker.Persistence, database-specific providers in separate packages.
  • Queue means one consumer handles each message.
  • Topic means every subscriber receives each message.
  • Stream means durable/replayable event log.
  • Request means broker-based request/response with correlation and timeout.
  • Persistence defaults to off. Outbox, inbox, and stream checkpointing are enabled per endpoint.
  • Published outbox cleanup must remain opt-in through explicit retention.
  • Newtonsoft.Json should not return to the core package.
  • New adapters should first implement capability mapping, then Queue/Topic/Request, then Stream if the engine has a real stream/log model.
Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2.0.1 48 5/11/2026
2.0.0 51 5/9/2026