Gizza.MessageBroker.Persistence.SqlServer
2.0.1
dotnet add package Gizza.MessageBroker.Persistence.SqlServer --version 2.0.1
NuGet\Install-Package Gizza.MessageBroker.Persistence.SqlServer -Version 2.0.1
<PackageReference Include="Gizza.MessageBroker.Persistence.SqlServer" Version="2.0.1" />
<PackageVersion Include="Gizza.MessageBroker.Persistence.SqlServer" Version="2.0.1" />
<PackageReference Include="Gizza.MessageBroker.Persistence.SqlServer" />
paket add Gizza.MessageBroker.Persistence.SqlServer --version 2.0.1
#r "nuget: Gizza.MessageBroker.Persistence.SqlServer, 2.0.1"
#:package Gizza.MessageBroker.Persistence.SqlServer@2.0.1
#addin nuget:?package=Gizza.MessageBroker.Persistence.SqlServer&version=2.0.1
#tool nuget:?package=Gizza.MessageBroker.Persistence.SqlServer&version=2.0.1
Gizza.MessageBroker
Gizza.MessageBroker is a high-performance, engine-agnostic messaging library for .NET. It provides a small core API for common messaging patterns while keeping broker-specific behavior inside adapter packages.
The project was designed as a lightweight alternative to large messaging frameworks when you want direct control over broker clients, topology, persistence, retries, and runtime behavior without depending on MassTransit.
Current Status
The core API is implemented. RabbitMQ, NATS, Kafka, AWS SQS/SNS, Azure Service Bus, Azure Event Hubs, Google Cloud Pub/Sub, Apache ActiveMQ, and in-memory adapters are available. The in-memory adapter is intended for tests and local development, not as a production broker replacement.
| Area | Status |
|---|---|
| Core API | Available |
| RabbitMQ adapter | Available |
| SQL Server persistence | Available |
| PostgreSQL persistence | Available |
| SQLite persistence | Available |
| Oracle persistence | Available |
| NATS adapter | Available |
| Kafka adapter | Available |
| AWS SQS/SNS adapter | Available |
| Azure Service Bus adapter | Available |
| Azure Event Hubs adapter | Available |
| Google Cloud Pub/Sub adapter | Available |
| Apache ActiveMQ adapter | Available |
| In-memory adapter | Available |
All library projects currently target:
net8.0;net9.0;net10.0
The sample application targets .NET 10.
Package Layout
Applications should code against the core package and add only the adapter/provider packages they need.
| Package | Purpose |
|---|---|
Gizza.MessageBroker |
Core abstractions, endpoint models, serializers, BrokerFactory, DI registration, diagnostics hooks, in-memory inbox/outbox stores |
Gizza.MessageBroker.RabbitMQ |
RabbitMQ transport adapter using the official RabbitMQ .NET client |
Gizza.MessageBroker.Nats |
NATS transport adapter using the official NATS .NET client |
Gizza.MessageBroker.Kafka |
Apache Kafka transport adapter using Confluent.Kafka |
Gizza.MessageBroker.Aws |
AWS SQS/SNS transport adapter using the official AWS SDK for .NET |
Gizza.MessageBroker.Azure |
Azure Service Bus and Event Hubs transport adapter using the official Azure SDK packages |
Gizza.MessageBroker.GoogleCloud |
Google Cloud Pub/Sub transport adapter using the official Google Cloud .NET client |
Gizza.MessageBroker.ActiveMQ |
Apache ActiveMQ Classic OpenWire adapter using the official Apache NMS ActiveMQ client |
Gizza.MessageBroker.Persistence |
Database-engine agnostic durable outbox/inbox/stream checkpoint infrastructure, SQL store base classes, cleanup jobs, Dapper-based implementation |
Gizza.MessageBroker.Persistence.SqlServer |
SQL Server persistence provider |
Gizza.MessageBroker.Persistence.PostgreSql |
PostgreSQL persistence provider |
Gizza.MessageBroker.Persistence.Sqlite |
SQLite persistence provider |
Gizza.MessageBroker.Persistence.Oracle |
Oracle persistence provider |
Gizza.MessageBroker.InMemory |
In-memory adapter for Queue, Topic, Stream, and Request contracts, intended for tests and local development |
Gizza.MessageBroker.Samples |
Console sample for Queue, Topic, Stream, Request, smoke testing, and DI smoke testing |
This mirrors the common NuGet pattern used by libraries such as Serilog: the core package defines the public programming model, and adapter packages provide concrete engines.
Messaging Patterns
Gizza.MessageBroker intentionally uses broker-neutral names in the public API. Application code describes the messaging intent; each adapter translates that intent to RabbitMQ, NATS, Kafka, AWS, Azure, Google Cloud, or another engine.
Queue
Use Queue when each published message should be handled by only one worker from a group of competing consumers.
Think of a Queue as a shared work line. Many services or many instances of the same service can listen to the same logical queue, but each individual message is assigned to one of them. This is the right model for commands and jobs where duplicate parallel handling would be wrong or wasteful.
Typical examples:
- Send an email.
- Process an order command.
- Run an import/export job.
- Execute one unit of background work.
Example: five worker instances listen to orders.submit. If 100 messages are published, the workers share those 100 messages between them. A single message is not broadcast to all five workers.
Gizza.MessageBroker API:
- Publisher:
IQueuePublisher - Consumer:
IQueueConsumer - Options:
QueueEndpointOptions
Engine mapping:
- RabbitMQ: exchange + queue + binding, consumed by competing consumers.
- NATS: subject + queue group.
- Kafka: topic + shared
group.id. - AWS: SQS queue.
- Azure: Service Bus queue.
- Google Cloud: Pub/Sub topic + shared subscription.
- ActiveMQ: JMS queue with competing consumers.
Topic
Use Topic when every logical subscriber should receive every published message.
Topic is fanout/pub-sub. The publisher does not care how many subscribers exist. Each subscriber represents an independent interest in the same event. If three services subscribe to the same topic, all three should see the same event once.
Typical examples:
- Publish
OrderCreated. - Notify accounting, risk, and analytics services.
- Broadcast a strategy signal to multiple independent listeners.
- Send domain events to several bounded contexts.
Example: orders.created is published once. Billing records an invoice, analytics updates metrics, and notification sends an email. These are different subscribers, so all of them receive the message.
Gizza.MessageBroker API:
- Publisher:
ITopicPublisher - Subscriber:
ITopicSubscriber - Options:
TopicEndpointOptions
Engine mapping:
- RabbitMQ: exchange + one subscriber queue per subscription.
- NATS: normal subject subscription, no queue group.
- Kafka: same topic, different
group.idper logical subscriber. - AWS: SNS topic + SQS subscription.
- Azure: Service Bus topic + subscription.
- Google Cloud: Pub/Sub topic + one subscription per logical subscriber.
- ActiveMQ: JMS topic, usually with durable subscriptions for named subscribers.
Important distinction: a Topic is live fanout. It is not primarily an event log. If replay, retention, or offset control matters, use Stream.
Request
Use Request when one component needs a response from another component through the broker.
Request is broker-based RPC. A client sends a request message with correlation metadata, a handler processes it, and a response is routed back to the waiting client. This is useful when the caller wants a direct answer but the communication should still flow through the messaging infrastructure instead of HTTP.
Typical examples:
- Ask a pricing service to calculate a value.
- Ask a risk service whether an order is acceptable.
- Run a command that must return a result to the caller.
- Query a service that is only reachable through broker infrastructure.
Example: a trading strategy sends CalculatePositionSize. A risk service handles the request and replies with PositionSizeCalculated. The client waits until the response arrives or the timeout expires.
Gizza.MessageBroker API:
- Client:
IRequestClient - Handler:
IRequestHandler - Options:
RequestEndpointOptions
Engine mapping:
- RabbitMQ: request queue +
ReplyTo+CorrelationId. - NATS: native request/reply.
- Kafka: request topic + reply topic + correlation metadata.
- AWS: SQS request queue + reply queue + correlation metadata.
- Azure: Service Bus request queue + reply queue + correlation metadata.
- Google Cloud: Pub/Sub request topic + temporary reply topic/subscription + correlation metadata.
- ActiveMQ: request queue + temporary reply queue + correlation metadata.
Request should be used carefully. It introduces waiting, timeout handling, and tighter coupling than Queue or Topic. For high-volume fire-and-forget workflows, Queue or Topic is usually a better fit.
Stream
Use Stream when messages must be durable, ordered enough for the use case, and replayable from a known position.
Stream is an event log abstraction. Consumers can start from the next message, the first retained message, the last message, a numeric offset, or a timestamp, depending on engine support. This is the right model when the history matters, not only the live delivery.
Typical examples:
- Market tick history.
- Order book updates.
- Audit/event feeds.
- Event-sourced state rebuilds.
- Analytics consumers that may need to replay data.
Example: a market data publisher writes every BTCUSDT tick to market.ticks. A live strategy starts from Next, while an analytics service starts from First or from a timestamp to rebuild its local state.
Gizza.MessageBroker API:
- Publisher:
IStreamPublisher - Consumer:
IStreamConsumer - Options:
StreamEndpointOptions
Engine mapping:
- RabbitMQ: RabbitMQ stream queue.
- NATS: JetStream stream + durable consumer.
- Kafka: topic + offset assignment/consumer group.
- AWS: Kinesis would fit this model, but it is outside the first AWS phase.
- Azure: Event Hubs + consumer group checkpointing.
- Google Cloud: outside the Pub/Sub adapter scope; Pub/Sub is modeled as Queue, Topic, and Request here.
- ActiveMQ: outside the first ActiveMQ phase unless a provider-specific log/stream model is introduced.
Stream consumers can choose a replay start position:
var options = new StreamEndpointOptions
{
Address = BrokerAddress.Stream("market.ticks"),
ConsumerName = "analytics-reader",
StartPosition = StreamStartPosition.First,
Checkpoint = StreamCheckpointOptions.StoreBacked(new InMemoryStreamCheckpointStore())
};
Checkpointing is optional. When enabled, adapters that support it can save progress after successful Ack handling and resume from the last stored position on restart. Use InMemoryStreamCheckpointStore for process-local tests, or provider stores such as PostgreSqlStreamCheckpointStore, SqlServerStreamCheckpointStore, SqliteStreamCheckpointStore, and OracleStreamCheckpointStore when progress must survive process restarts.
Important distinction: a Stream is not just a Topic with more subscribers. Topic answers "who should receive this live event?" Stream answers "what durable log should this message be appended to, and where should each consumer read from?"
Quick Start with RabbitMQ
Install the core package and the RabbitMQ adapter:
dotnet add package Gizza.MessageBroker
dotnet add package Gizza.MessageBroker.RabbitMQ
For local development from this repository, start RabbitMQ:
docker compose -f tests\docker-compose.yml up -d rabbitmq
RabbitMQ defaults used by the sample and integration tests:
Host: 127.0.0.1
Port: 5673
Username: gizza
Password: gizza
Connection string: amqp://gizza:gizza@127.0.0.1:5673/
Management UI: http://127.0.0.1:15673
Quick Start with NATS
Install the core package and the NATS adapter:
dotnet add package Gizza.MessageBroker
dotnet add package Gizza.MessageBroker.Nats
For local development from this repository, start NATS:
docker compose -f tests\docker-compose.yml --profile messaging up -d nats
NATS defaults used by integration tests:
Url: nats://127.0.0.1:14222
Monitoring: http://127.0.0.1:18222
Quick Start with Kafka
Install the core package and the Kafka adapter:
dotnet add package Gizza.MessageBroker
dotnet add package Gizza.MessageBroker.Kafka
For local development from this repository, start Kafka:
docker compose -f tests\docker-compose.yml --profile messaging up -d kafka
Kafka defaults used by integration tests:
Bootstrap servers: 127.0.0.1:19092
Container-internal bootstrap: kafka:9092
Basic Usage Examples
The examples below use RabbitMQ, but the same core endpoint code works with NATS or Kafka when the transport and broker options use BrokerEngine.NATS or BrokerEngine.Kafka.
Queue Publish
using Gizza.MessageBroker;
var brokerOptions = BrokerFactory.BuildBrokerOptions(
BrokerEngine.RabbitMQ,
host: "127.0.0.1",
port: 5673,
username: "gizza",
password: "gizza",
connectionString: "amqp://gizza:gizza@127.0.0.1:5673/");
await using var transport = BrokerFactory.BuildQueueTransport(BrokerEngine.RabbitMQ, brokerOptions);
var options = new QueueEndpointOptions
{
Address = BrokerAddress.Queue("orders")
};
var publisher = await transport.BuildQueuePublisherAsync(options);
await publisher.PublishAsync(new OrderCreated("BTCUSDT", 1));
public sealed record OrderCreated(string Symbol, int Quantity);
Queue Consume
using Gizza.MessageBroker;
await using var transport = BrokerFactory.BuildQueueTransport(BrokerEngine.RabbitMQ, brokerOptions);
var options = new QueueEndpointOptions
{
Address = BrokerAddress.Queue("orders")
};
var consumer = await transport.BuildQueueConsumerAsync(options, async (context, ct) =>
{
var message = options.Serializer.Deserialize<OrderCreated>(context.Message.Payload);
await HandleOrderAsync(message, ct);
return ConsumeResult.Ack();
});
await consumer.StartAsync();
Consumer acknowledgements are controlled by the handler result. Gizza.MessageBroker does not ACK before the handler completes.
Common results:
ConsumeResult.Ack()ConsumeResult.Requeue("temporary failure")ConsumeResult.Nack("invalid message")ConsumeResult.DeadLetter("poison message")ConsumeResult.Defer(TimeSpan.FromSeconds(10), "retry later")
Delivery Order and Throughput
Endpoint consumption is ordered by default. QueueEndpointOptions, TopicEndpointOptions, StreamEndpointOptions, and RequestEndpointOptions default to PrefetchCount = 1; when MaxConcurrentMessages is not set, adapters run one handler at a time. RabbitMQ also defaults RabbitMqBrokerOptions.ConsumerDispatchConcurrency to 1, so the official client dispatch layer does not reorder handler callbacks. For a single publisher or a broker-defined stream/log append order, handlers therefore observe messages in source order.
Applications that prefer throughput over strict handler order can opt into parallel consumption explicitly:
var options = new TopicEndpointOptions
{
Address = BrokerAddress.Topic("market.signals"),
PrefetchCount = 32,
MaxConcurrentMessages = 8
};
var brokerOptions = new RabbitMqBrokerOptions
{
ConsumerDispatchConcurrency = 8
};
PrefetchCount controls how many messages may be in flight from the broker. MaxConcurrentMessages controls how many handlers may run at the same time in the process. For RabbitMQ, ConsumerDispatchConcurrency controls how many deliveries the official client may dispatch concurrently before the adapter handler limit is applied. Once these settings are raised for parallel throughput, handler completion order and console/log output order are not guaranteed. With multiple concurrent publishers, the only stable ordering reference is the broker's accepted queue or stream order.
Topic Publish and Subscribe
await using var transport = BrokerFactory.BuildTopicTransport(BrokerEngine.RabbitMQ, brokerOptions);
var publisherOptions = new TopicEndpointOptions
{
Address = BrokerAddress.Topic("orders.events")
};
var subscriberOptions = new TopicEndpointOptions
{
Address = BrokerAddress.Topic("orders.events"),
SubscriptionName = "billing-service"
};
var subscriber = await transport.BuildTopicSubscriberAsync(subscriberOptions, (context, ct) =>
{
var message = subscriberOptions.Serializer.Deserialize<OrderCreated>(context.Message.Payload);
return Task.FromResult(ConsumeResult.Ack());
});
await subscriber.StartAsync();
var publisher = await transport.BuildTopicPublisherAsync(publisherOptions);
await publisher.PublishAsync(new OrderCreated("ETHUSDT", 2));
Stream Publish and Consume
await using var transport = BrokerFactory.BuildStreamTransport(BrokerEngine.RabbitMQ, brokerOptions);
var options = new StreamEndpointOptions
{
Address = BrokerAddress.Stream("market.ticks"),
ConsumerName = "analytics-reader",
StartPosition = StreamStartPosition.First
};
var publisher = await transport.BuildStreamPublisherAsync(options);
await publisher.PublishAsync(new MarketTick("BTCUSDT", 95000m));
var consumer = await transport.BuildStreamConsumerAsync(options, (context, ct) =>
{
var tick = options.Serializer.Deserialize<MarketTick>(context.Message.Payload);
return Task.FromResult(ConsumeResult.Ack());
});
await consumer.StartAsync();
public sealed record MarketTick(string Symbol, decimal Price);
Request/Response
await using var transport = BrokerFactory.BuildRequestTransport(BrokerEngine.RabbitMQ, brokerOptions);
var options = new RequestEndpointOptions
{
Address = BrokerAddress.Request("pricing.calculate"),
Timeout = TimeSpan.FromSeconds(10)
};
var handler = await transport.BuildRequestHandlerAsync(options, (context, ct) =>
{
var request = options.Serializer.Deserialize<PricingRequest>(context.Message.Payload);
var response = new PricingResponse(request.Symbol, price: 100);
return Task.FromResult(BrokerMessage.FromPayload(options.Serializer.Serialize(response), options.Serializer.ContentType));
});
await handler.StartAsync();
var client = await transport.BuildRequestClientAsync(options);
var result = await client.RequestAsync<PricingRequest, PricingResponse>(
new PricingRequest("BTCUSDT"),
TimeSpan.FromSeconds(10));
public sealed record PricingRequest(string Symbol);
public sealed record PricingResponse(string Symbol, decimal Price);
Dependency Injection
The core package contains DI registration. Adapter packages remain implementation packages.
using Gizza.MessageBroker;
using Microsoft.Extensions.DependencyInjection;
services.AddGizzaMessageBroker(builder =>
{
builder.AddTransport("rabbit", BrokerEngine.RabbitMQ, brokerOptions);
builder.AddHealthChecks();
});
Named transports are resolved lazily through IMessageBrokerTransportRegistry:
var registry = serviceProvider.GetRequiredService<IMessageBrokerTransportRegistry>();
var transport = registry.GetTransport("rabbit");
Typed publishers and request clients can be registered as injectable services:
services.AddGizzaMessageBroker(builder =>
{
builder.AddTransport("rabbit", BrokerEngine.RabbitMQ, brokerOptions);
builder.AddQueuePublisher<OrderCreated>("rabbit", new QueueEndpointOptions
{
Address = BrokerAddress.Queue("orders")
});
builder.AddTopicPublisher<OrderCreated>("rabbit", new TopicEndpointOptions
{
Address = BrokerAddress.Topic("orders.events")
});
builder.AddRequestClient<PricingRequest, PricingResponse>("rabbit", new RequestEndpointOptions
{
Address = BrokerAddress.Request("pricing.calculate")
});
});
public sealed class OrderService
{
private readonly IQueuePublisher<OrderCreated> _publisher;
public OrderService(IQueuePublisher<OrderCreated> publisher)
{
_publisher = publisher;
}
public Task PublishAsync(OrderCreated message, CancellationToken ct)
{
return _publisher.PublishAsync(message, ct);
}
}
Outbox dispatcher and persistence cleanup hosted services can also be registered through the builder when persistence is enabled.
The builder can register a standard Microsoft health check named gizza_messagebroker. By default it checks every registered transport and, when the adapter implements IMessageBrokerHealthProbe, performs a real connectivity probe. The RabbitMQ adapter opens or validates its connection during this check.
services.AddGizzaMessageBroker(builder =>
{
builder.AddTransport("rabbit", BrokerEngine.RabbitMQ, brokerOptions);
builder.AddHealthChecks(options =>
{
options.Name = "message-broker-ready";
options.AddTransport("rabbit");
});
});
Hosted consumers and request handlers can be registered with the same builder. They are started and stopped through the application host:
services.AddGizzaMessageBroker(builder =>
{
builder.AddTransport("rabbit", BrokerEngine.RabbitMQ, brokerOptions);
builder.AddHostedQueueConsumer(
"rabbit",
new QueueEndpointOptions
{
Address = BrokerAddress.Queue("orders")
},
async (context, ct) =>
{
var message = SystemTextJsonMessageSerializer.Default
.Deserialize<OrderCreated>(context.Message.Payload);
await HandleOrderAsync(message, ct);
return ConsumeResult.Ack();
});
});
Serialization
The default serializer is SystemTextJsonMessageSerializer, based on System.Text.Json.
Newtonsoft.Json is not used by the core library and no Newtonsoft compatibility layer is planned. Custom serialization can be added by implementing IMessageSerializer and assigning it to endpoint options.
var options = new QueueEndpointOptions
{
Address = BrokerAddress.Queue("orders"),
Serializer = SystemTextJsonMessageSerializer.Default
};
Reliability Features
The core model includes:
- Handler-controlled ACK/NACK
- Retry policy and backoff
- Delayed redelivery
- Dead-letter policy
- Fault message publishing
- Message TTL and expiration metadata
- Scheduled publish metadata
- Correlation, causation, conversation, and request ids
- Observer hooks for publish/consume diagnostics
- Optional endpoint-level inbox and outbox persistence
Persistence is intentionally disabled by default. High-frequency flows such as ticker data, order book updates, market data, and signals should usually stay in direct mode. Critical commands and state transitions can enable outbox and/or inbox per endpoint.
Persistence
Persistence is split into a database-neutral package and provider packages.
Install the common persistence package plus the database provider you need:
dotnet add package Gizza.MessageBroker.Persistence
dotnet add package Gizza.MessageBroker.Persistence.PostgreSql
Available providers:
- SQL Server
- PostgreSQL
- SQLite
- Oracle
The persistence layer supports:
- Durable outbox storage
- Durable inbox/idempotency storage
- Durable stream checkpoint storage
- Provider-specific schema creation
- Provider-specific SQL dialects
- Batch claim and lock strategies
- Expired lock reset
- Published outbox cleanup when explicit retention is configured
- Expired inbox cleanup
Published outbox cleanup is opt-in by retention. This is deliberate: the library will not delete published outbox records unless the application explicitly defines how long those records should be retained.
Endpoint-Level Outbox
var outboxStore = new PostgreSqlOutboxStore(postgreSqlOptions);
var options = new QueueEndpointOptions
{
Address = BrokerAddress.Queue("orders"),
Persistence = EndpointPersistenceOptions.OutboxOnly(
OutboxPersistenceOptions.StoreBacked(outboxStore))
};
When outbox is enabled, the endpoint publisher writes to the outbox store. OutboxDispatcher or OutboxDispatcherHostedService then publishes pending messages to the broker.
Endpoint-Level Inbox
var inboxStore = new PostgreSqlInboxStore(postgreSqlOptions);
var options = new QueueEndpointOptions
{
Address = BrokerAddress.Queue("orders"),
Persistence = EndpointPersistenceOptions.InboxOnly(
InboxPersistenceOptions.StoreBacked(inboxStore, "orders-worker"))
};
Inbox persistence is used for idempotent consume behavior. It is independent from outbox persistence.
RabbitMQ Adapter Notes
RabbitMQ currently supports:
- Queue pattern
- Topic pattern
- Stream pattern using RabbitMQ stream queues
- Request pattern
- Ordered-by-default consumers with opt-in prefetch and bounded handler concurrency
- Dead-letter routing
- Delayed redelivery through TTL retry queues
- Scheduled publish through delay queues
- Request timeout, cancellation, and late-response cleanup
- Role-separated RabbitMQ channels for topology, publishers, consumers, and request flows
- Automatic connection/channel recovery and consumer re-subscribe verified by controlled container restart integration tests
- Stream consumer identity plus
first,last,next, numeric offset, and timestamp replay starts
RabbitMQ-specific details are configured through transport options:
using Gizza.MessageBroker.RabbitMQ;
var transportOptions = new RabbitMqTransportOptions()
.UseExchangeName("orders.exchange")
.UseQueueName("orders.queue")
.UseDeadLetter("orders.dead.exchange", "orders.dead")
.UseDelayedRedelivery()
.UseDelayedDelivery()
.ToTransportOptions();
var options = new QueueEndpointOptions
{
Address = BrokerAddress.Queue("orders", "orders.route"),
TransportOptions = transportOptions
};
The adapter keeps one connection per transport and separates channel ownership by responsibility. Topology declarations use a serialized topology channel; each publisher owns a publish channel; each consumer, subscriber, request client, and request handler owns its runtime channel. Publish and request-client operations are serialized per endpoint instance so a single injected publisher or request client can be used safely from concurrent callers.
RabbitMQ connection recovery uses explicit broker options:
var brokerOptions = new RabbitMqBrokerOptions
{
ConnectionString = "amqp://gizza:gizza@127.0.0.1:5673/",
AutomaticRecoveryEnabled = true,
TopologyRecoveryEnabled = true,
NetworkRecoveryInterval = TimeSpan.FromSeconds(5),
RequestedHeartbeat = TimeSpan.FromSeconds(30)
};
Kafka Adapter Notes
Kafka currently supports:
- Queue pattern with a shared consumer group
- Topic pattern with one consumer group per logical subscriber
- Stream pattern with
first,last,next, numeric offset, and timestamp replay starts - Request pattern with a reply topic and correlation metadata
- Optional endpoint outbox/inbox integration
- Ordered-by-default partition consumption with opt-in parallel handler execution controlled by
MaxConcurrentMessages - Retry/backoff through the shared
RetryPolicyOptionscontract - Dead-letter topic publishing for
Nack,DeadLetter, and exhausted retry results - Health probing through Kafka metadata
Kafka-specific details are configured through transport options:
using Gizza.MessageBroker.Kafka;
var transportOptions = new KafkaTransportOptions()
.UseTopic("orders.created")
.UseConsumerGroup("orders-workers")
.UseDeadLetterTopic("orders.created.dlq")
.UseTopicTopology(partitionCount: 3, replicationFactor: 1)
.ToTransportOptions();
var options = new QueueEndpointOptions
{
Address = BrokerAddress.Queue("orders.created"),
TransportOptions = transportOptions
};
Kafka consumer acknowledgements map to offset commits. The adapter keeps Kafka commit, seek, pause, and resume calls on the consumer loop thread, while handlers can run in parallel across assigned partitions. A partition is paused while its active message is being handled, so ordering remains stable within that partition. Requeue, delay-free Defer, and handler exceptions can use RetryPolicyOptions for seek-based redelivery with backoff; exhausted retries are published to the configured dead-letter topic.
ActiveMQ Adapter Notes
The ActiveMQ adapter uses the official Apache NMS ActiveMQ OpenWire client. It currently supports Queue, Topic, durable topic subscriptions, Request/Reply with a temporary reply queue, and dead-letter copy for Nack / DeadLetter consume results.
ActiveMQ consumers use IndividualAcknowledge sessions. Ack acknowledges the source message, Requeue and Defer recover the session for broker redelivery, and Nack / DeadLetter publish a copy to the configured DLQ before acknowledging the source message. The Gizza envelope is preserved and gizza-dead-letter-reason is added to the copied message.
using Gizza.MessageBroker.ActiveMQ;
var options = new QueueEndpointOptions
{
Address = BrokerAddress.Queue("orders"),
DeadLetterPolicy = DeadLetterPolicyOptions.ToQueue("orders.dlq"),
TransportOptions = new ActiveMqTransportOptions()
.UseQueueName("orders")
.UseDeadLetterQueueName("orders.dlq")
.ToTransportOptions()
};
AWS Adapter Notes
The AWS adapter currently supports SQS queue publish/consume, SQS dead-letter redrive configuration, SQS request/reply, SNS topic publish, and SNS + SQS topic subscriptions.
SQS consumers poll with bounded concurrency. Ack and Nack delete the message from the queue. Requeue, Defer, and handler exceptions use ChangeMessageVisibility; Defer(TimeSpan) maps the delay to the message visibility timeout.
When DeadLetterPolicyOptions is configured, the consumer sets the SQS RedrivePolicy on the source queue. Nack and DeadLetter also publish a copy to the configured DLQ and then delete the source message, preserving the Gizza envelope and adding gizza-dead-letter-reason.
Topic subscribers use SNS fanout with one SQS queue per logical subscription. On start, the adapter resolves the queue ARN, optionally merges the required SNS sqs:SendMessage policy statement into the queue policy, subscribes the queue to the topic, and then polls SQS through the same consumer pipeline. Raw SNS delivery is enabled by default so the Gizza envelope and message attributes stay intact.
Request/reply uses a request SQS queue and a temporary reply SQS queue owned by the client. The client writes ReplyTo metadata into the Gizza envelope, polls the reply queue, and correlates responses by RequestId. The handler sends the response to the provided reply queue and ACKs the request only after the response publish succeeds.
AWS-specific details are configured through transport options:
using Gizza.MessageBroker.Aws;
var transportOptions = new AwsTransportOptions()
.UseQueueName("orders")
.UseQueuePolling(waitTimeSeconds: 1, visibilityTimeoutSeconds: 30, maxMessagesPerPoll: 1)
.ToTransportOptions();
var options = new QueueEndpointOptions
{
Address = BrokerAddress.Queue("orders"),
DeadLetterPolicy = DeadLetterPolicyOptions.ToQueue("orders.dlq"),
TransportOptions = transportOptions,
PrefetchCount = 1,
MaxConcurrentMessages = 1
};
SQS redrive count can be configured through AWS transport options:
var transportOptions = new AwsTransportOptions()
.UseQueueName("orders")
.UseSqsRedrive(maxReceiveCount: 5)
.ToTransportOptions();
FIFO metadata is opt-in:
var fifoOptions = new AwsTransportOptions()
.UseQueueName("orders.fifo")
.UseFifo(messageGroupId: "orders")
.ToTransportOptions();
SNS + SQS topic subscription:
var subscriberOptions = new TopicEndpointOptions
{
Address = BrokerAddress.Topic("orders.created", "billing"),
SubscriptionName = "billing",
DurableSubscription = true,
TransportOptions = new AwsTransportOptions()
.UseTopicArn("arn:aws:sns:us-east-1:123456789012:orders-created")
.UseQueueName("billing-orders-created")
.UseQueuePolling(waitTimeSeconds: 1, visibilityTimeoutSeconds: 30, maxMessagesPerPoll: 1)
.UseRawSnsDelivery()
.ManageSnsSqsQueuePolicy()
.ToTransportOptions()
};
SQS request/reply:
var options = new RequestEndpointOptions
{
Address = BrokerAddress.Request("pricing.requests"),
Timeout = TimeSpan.FromSeconds(10),
TransportOptions = new AwsTransportOptions()
.UseQueueName("pricing-requests")
.UseQueuePolling(waitTimeSeconds: 1, visibilityTimeoutSeconds: 30, maxMessagesPerPoll: 1)
.ToTransportOptions()
};
Google Cloud Pub/Sub Adapter Notes
The Google Cloud adapter currently supports Pub/Sub Queue, Topic, and Request workflows. Stream replay is intentionally not exposed by this adapter; use Kafka, NATS JetStream, RabbitMQ streams, or Azure Event Hubs when offset/replay semantics are required.
Queue maps to a Pub/Sub topic plus one shared subscription. Topic maps to a Pub/Sub topic plus one subscription per logical subscriber. Request/reply uses a request topic and a temporary reply topic/subscription owned by the client.
Message ordering is enabled by default through Pub/Sub ordering keys. If no explicit ordering key is provided, the adapter uses default, which favors deterministic delivery over maximum parallelism. Increase MaxConcurrentMessages, PrefetchCount, MaxMessagesPerPull, or disable ordering only when the endpoint can tolerate out-of-order handling.
using Gizza.MessageBroker.GoogleCloud;
var brokerOptions = new GoogleCloudBrokerOptions
{
ProjectId = "my-gcp-project"
};
var options = new QueueEndpointOptions
{
Address = BrokerAddress.Queue("orders"),
PrefetchCount = 1,
MaxConcurrentMessages = 1,
TransportOptions = new GoogleCloudTransportOptions()
.UseTopic("orders-topic")
.UseSubscription("orders-workers")
.UseOrdering()
.UsePulling(maxMessagesPerPull: 1, ackDeadlineSeconds: 10)
.ToTransportOptions()
};
Local Development
Restore and build:
dotnet restore "Gizza Message Broker.slnx"
dotnet build "Gizza Message Broker.slnx" -c Release --nologo
Start test services:
docker compose -f tests\docker-compose.yml up -d rabbitmq sqlserver-init postgres oracle
docker compose -f tests\docker-compose.yml --profile messaging up -d nats kafka activemq
docker compose -f tests\docker-compose.yml --profile cloud up -d localstack google-pubsub azure-servicebus azure-eventhubs
Run tests:
dotnet test tests\Gizza.MessageBroker.UnitTests\Gizza.MessageBroker.UnitTests.csproj -c Release --no-build --nologo
dotnet test tests\Gizza.MessageBroker.IntegrationTests\Gizza.MessageBroker.IntegrationTests.csproj -c Release --no-build --nologo
The integration test project contains a reusable provider contract suite for Queue, Topic, Request, and Stream behavior. RabbitMQ, NATS, Kafka, AWS, Azure, Google Cloud, and ActiveMQ are wired into that suite where the provider supports the relevant pattern.
Run the BenchmarkDotNet suite:
dotnet run --project tests\Gizza.MessageBroker.Benchmarks\Gizza.MessageBroker.Benchmarks.csproj -c Release -f net10.0 -- --filter *
Run the sample:
dotnet run --project src\Gizza.MessageBroker.Samples\Gizza.MessageBroker.Samples.csproj -c Release -f net10.0 -- smoke
dotnet run --project src\Gizza.MessageBroker.Samples\Gizza.MessageBroker.Samples.csproj -c Release -f net10.0 -- di-smoke
dotnet run --project src\Gizza.MessageBroker.Samples\Gizza.MessageBroker.Samples.csproj -c Release -f net10.0 -- stream-publish 25
dotnet run --project src\Gizza.MessageBroker.Samples\Gizza.MessageBroker.Samples.csproj -c Release -f net10.0 -- stream-consume analytics-reader
Available sample commands:
queue-publish
queue-consume
topic-publish
topic-subscribe
request-client
request-handler
stream-publish
stream-consume
stream-replay
smoke
di-smoke
Roadmap
Near-term work:
- Keep Saga/orchestration preparation design-only until the orchestration package boundary is explicitly opened.
- Expand benchmarks with real broker scenarios as adapter-level tuning decisions are made.
Out of initial cloud scope:
- AWS EventBridge
- AWS Kinesis
Design Principles
- Keep the public API broker-neutral.
- Use official provider clients in adapter packages.
- Keep persistence optional and endpoint-scoped.
- Favor direct broker hot paths for high-frequency traffic.
- Use outbox/inbox only for flows that need durability or idempotency.
- Keep adapter-specific topology and protocol details out of core application code.
- Make each feature independently testable without requiring a full application host.
Notes for Future Maintainers and Codex
If you are revisiting this project later, this is the short mental model:
Gizza.MessageBrokeris the core API. Do not put RabbitMQ, Kafka, NATS, AWS, Azure, or ActiveMQ protocol logic there.- Adapter packages translate core contracts to concrete broker operations.
- Persistence follows the same plugin shape: common API in
Gizza.MessageBroker.Persistence, database-specific providers in separate packages. - Queue means one consumer handles each message.
- Topic means every subscriber receives each message.
- Stream means durable/replayable event log.
- Request means broker-based request/response with correlation and timeout.
- Persistence defaults to off. Outbox, inbox, and stream checkpointing are enabled per endpoint.
- Published outbox cleanup must remain opt-in through explicit retention.
- Newtonsoft.Json should not return to the core package.
- New adapters should first implement capability mapping, then Queue/Topic/Request, then Stream if the engine has a real stream/log model.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Gizza.MessageBroker.Persistence (>= 2.0.1)
- Microsoft.Data.SqlClient (>= 7.0.1)
-
net8.0
- Gizza.MessageBroker.Persistence (>= 2.0.1)
- Microsoft.Data.SqlClient (>= 7.0.1)
-
net9.0
- Gizza.MessageBroker.Persistence (>= 2.0.1)
- Microsoft.Data.SqlClient (>= 7.0.1)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.