HoneyDrunk.Transport.StorageQueue
0.3.0
See the version list below for details.
dotnet add package HoneyDrunk.Transport.StorageQueue --version 0.3.0
NuGet\Install-Package HoneyDrunk.Transport.StorageQueue -Version 0.3.0
<PackageReference Include="HoneyDrunk.Transport.StorageQueue" Version="0.3.0" />
<PackageVersion Include="HoneyDrunk.Transport.StorageQueue" Version="0.3.0" />
<PackageReference Include="HoneyDrunk.Transport.StorageQueue" />
paket add HoneyDrunk.Transport.StorageQueue --version 0.3.0
#r "nuget: HoneyDrunk.Transport.StorageQueue, 0.3.0"
#:package HoneyDrunk.Transport.StorageQueue@0.3.0
#addin nuget:?package=HoneyDrunk.Transport.StorageQueue&version=0.3.0
#tool nuget:?package=HoneyDrunk.Transport.StorageQueue&version=0.3.0
HoneyDrunk.Transport
Reliable messaging and outbox infrastructure for the Hive - Transport unifies brokers, queues, and event buses under one contract ensuring delivery, order, and idempotence. It powers communication between NodesβData, Pulse, Vault, and beyondβso every message finds its way.
Signal Quote: "Every message finds its way."
π¦ What Is This?
HoneyDrunk.Transport is the messaging backbone of HoneyDrunk.OS ("the Hive"). It provides a transport-agnostic abstraction layer over different message brokers with built-in resilience, observability, and exactly-once semantics:
- β
Transport Abstraction - Unified
ITransportPublisherandITransportConsumerover Azure Service Bus, Azure Storage Queue, and InMemory - β Middleware Pipeline - Onion-style processing with logging, telemetry, correlation, and retry
- β
Envelope Pattern - Immutable
ITransportEnvelopewith correlation/causation tracking - β Transactional Outbox - Exactly-once processing with database transactions
- β
Kernel Integration - Uses
TimeProviderandIGridContextfrom HoneyDrunk.Kernel for deterministic timestamps and distributed context - β
Observability - OpenTelemetry spans and pluggable
ITransportMetrics - β Blob Fallback for Service Bus - Persist failed publishes to Azure Blob Storage for later replay
π Quick Start
Installation
<ItemGroup>
<PackageReference Include="HoneyDrunk.Transport" Version="0.1.0" />
<PackageReference Include="HoneyDrunk.Transport.AzureServiceBus" Version="0.1.0" />
<PackageReference Include="HoneyDrunk.Transport.StorageQueue" Version="0.1.0" />
<PackageReference Include="HoneyDrunk.Transport.InMemory" Version="0.1.0" />
</ItemGroup>
Configure in Program.cs
using HoneyDrunk.Kernel.DependencyInjection;
using HoneyDrunk.Transport.DependencyInjection;
var builder = WebApplication.CreateBuilder(args);
// 1. Register Kernel node
builder.Services.AddHoneyDrunkCoreNode(nodeDescriptor);
// 2. Register Transport core
builder.Services.AddHoneyDrunkTransportCore(options =>
{
options.EnableTelemetry = true;
options.EnableLogging = true;
options.EnableCorrelation = true;
});
// 3. Choose a transport
// Azure Service Bus
builder.Services.AddHoneyDrunkServiceBusTransport(options =>
{
options.FullyQualifiedNamespace = "mynamespace.servicebus.windows.net";
options.Address = "orders";
options.EntityType = ServiceBusEntityType.Topic;
options.SubscriptionName = "order-processor";
options.MaxConcurrency = 10;
options.PrefetchCount = 20;
options.ServiceBusRetry.Mode = ServiceBusRetryMode.Exponential;
options.ServiceBusRetry.MaxRetries = 3;
});
// OR Azure Storage Queue
builder.Services
.AddHoneyDrunkTransportStorageQueue(
builder.Configuration["StorageQueue:ConnectionString"]!,
"orders")
.WithMaxDequeueCount(5)
.WithConcurrency(10);
// 4. Register message handlers
builder.Services.AddMessageHandler<OrderCreatedEvent, OrderCreatedHandler>();
var app = builder.Build();
app.Run();
π Usage Examples
Publishing Messages
public class OrderService(
ITransportPublisher publisher,
EnvelopeFactory envelopeFactory,
IMessageSerializer serializer,
IGridContext gridContext)
{
public async Task CreateOrderAsync(CreateOrderCommand command, CancellationToken ct)
{
// Create order...
// Publish event
var @event = new OrderCreatedEvent { OrderId = orderId, Total = total };
var payload = serializer.Serialize(@event);
var envelope = envelopeFactory.CreateEnvelopeWithGridContext<OrderCreatedEvent>(
payload, gridContext);
await publisher.PublishAsync(
envelope,
EndpointAddress.Create("orders", "orders-topic"),
ct);
}
}
Handling Messages
public class OrderCreatedHandler : IMessageHandler<OrderCreatedEvent>
{
private readonly ILogger<OrderCreatedHandler> _logger;
public OrderCreatedHandler(ILogger<OrderCreatedHandler> logger)
{
_logger = logger;
}
public async Task<MessageProcessingResult> HandleAsync(
OrderCreatedEvent message,
MessageContext context,
CancellationToken cancellationToken)
{
var grid = context.GridContext;
_logger.LogInformation(
"Processing order {OrderId} with CorrelationId {CorrelationId} on Node {NodeId}",
message.OrderId,
grid?.CorrelationId,
grid?.NodeId);
await SendConfirmationEmailAsync(message.OrderId, cancellationToken);
return MessageProcessingResult.Success;
}
}
Transactional Outbox
public class OrderService(
IOutboxStore outboxStore,
EnvelopeFactory factory,
IMessageSerializer serializer,
IDbContext dbContext)
{
public async Task CreateOrderAsync(CreateOrderCommand command, CancellationToken ct)
{
await using var transaction = await dbContext.BeginTransactionAsync(ct);
try
{
// Save order to database
var order = new Order { /* ... */ };
await dbContext.Orders.AddAsync(order, ct);
// Save message to outbox (same transaction)
var payload = serializer.Serialize(new OrderCreatedEvent { OrderId = order.Id });
var envelope = factory.CreateEnvelope<OrderCreatedEvent>(payload);
var destination = EndpointAddress.Create("orders", "orders-topic");
await outboxStore.SaveAsync(destination, envelope, ct);
await dbContext.SaveChangesAsync(ct);
await transaction.CommitAsync(ct);
// DefaultOutboxDispatcher publishes from outbox in background
}
catch
{
await transaction.RollbackAsync(ct);
throw;
}
}
}
π― Features
π Core Components
| Component | Purpose | Key Types |
|---|---|---|
| Transport Abstraction | Unified publisher/consumer interface | ITransportPublisher, ITransportConsumer |
| Message Pipeline | Middleware execution engine | IMessagePipeline, IMessageMiddleware |
| Envelope System | Immutable message wrapping | ITransportEnvelope, EnvelopeFactory |
| Grid Context | Correlation/causation tracking | IGridContext, IGridContextFactory |
| Serialization | Pluggable message serialization | IMessageSerializer, JsonMessageSerializer |
| Outbox Pattern | Transactional outbox support | IOutboxStore, DefaultOutboxDispatcher |
π Kernel Integration
HoneyDrunk.Transport extends HoneyDrunk.Kernel with messaging primitives:
| Kernel Service | How Transport Uses It |
|---|---|
TimeProvider |
Deterministic message timestamps via EnvelopeFactory |
IGridContext |
Correlation, causation, Node/Studio/Tenant propagation |
IGridContextFactory |
Creates Grid context for outbound messages |
ILogger<T> |
Structured logging throughout pipeline |
IMeterFactory |
OpenTelemetry metrics via ITransportMetrics |
π Available Transports
| Transport | Package | Status |
|---|---|---|
| Azure Service Bus | HoneyDrunk.Transport.AzureServiceBus |
β Available |
| Azure Storage Queue | HoneyDrunk.Transport.StorageQueue |
β Available |
| In-Memory | HoneyDrunk.Transport.InMemory |
β Available (Testing) |
| RabbitMQ | HoneyDrunk.Transport.RabbitMQ |
π§ Planned |
| Kafka | HoneyDrunk.Transport.Kafka |
π§ Planned |
π§ͺ Testing
Use InMemory transport and DI for tests:
var services = new ServiceCollection();
services.AddHoneyDrunkCoreNode(TestNodeDescriptor);
services.AddHoneyDrunkTransportCore()
.AddHoneyDrunkInMemoryTransport();
services.AddMessageHandler<OrderCreatedEvent>((msg, ctx, ct) =>
{
// Assert in handler
return Task.FromResult(MessageProcessingResult.Success);
});
await using var provider = services.BuildServiceProvider();
var broker = provider.GetRequiredService<InMemoryBroker>();
var publisher = provider.GetRequiredService<ITransportPublisher>();
var pipeline = provider.GetRequiredService<IMessagePipeline>();
// Use broker for broker-level tests, pipeline for pipeline-level tests
See Testing.md for complete patterns including unit tests, integration tests, and test helpers.
π Documentation
| Document | Description |
|---|---|
| Architecture.md | High-level architecture and design principles |
| Abstractions.md | Core contracts: ITransportEnvelope, IMessageHandler, MessageContext |
| Pipeline.md | Middleware pipeline and built-in middleware |
| Configuration.md | All options: TransportCoreOptions, RetryOptions, error strategies |
| Context.md | Grid context propagation and IGridContextFactory |
| Primitives.md | EnvelopeFactory, TransportEnvelope, serialization |
| AzureServiceBus.md | Service Bus transport: sessions, topics, blob fallback |
| StorageQueue.md | Storage Queue transport: concurrency model, poison queues |
| InMemory.md | InMemory transport for testing |
| Outbox.md | Transactional outbox pattern |
| Runtime.md | ITransportRuntime and consumer lifecycle |
| Health.md | Health monitoring with ITransportHealthContributor |
| Metrics.md | ITransportMetrics and OpenTelemetry integration |
| Testing.md | Test patterns and helpers |
π οΈ Repository Layout
HoneyDrunk.Transport/
βββ HoneyDrunk.Transport/ # Core abstractions & pipeline
β βββ Abstractions/ # Contracts & interfaces
β βββ Pipeline/ # Middleware execution engine
β βββ Configuration/ # Options & settings
β βββ Context/ # Grid context integration
β βββ Primitives/ # Envelope & factory
β βββ Outbox/ # Transactional outbox
β βββ Runtime/ # ITransportRuntime host
β βββ Health/ # Health contributors
β βββ Metrics/ # ITransportMetrics
β βββ Telemetry/ # OpenTelemetry integration
β βββ DependencyInjection/ # DI registration
βββ HoneyDrunk.Transport.AzureServiceBus/ # Azure Service Bus provider
βββ HoneyDrunk.Transport.StorageQueue/ # Azure Storage Queue provider
βββ HoneyDrunk.Transport.InMemory/ # In-memory provider
βββ HoneyDrunk.Transport.Tests/ # Test project
βββ docs/ # Documentation
βοΈ Storage Queue vs Service Bus
| Scenario | Storage Queue | Service Bus |
|---|---|---|
| Cost optimization | β $0.0004/10K ops | β Higher cost |
| High volume (millions/day) | β Excellent | β Good |
| Simple queue semantics | β Yes | β Yes |
| Message size < 64KB | β Yes | β Up to 100MB |
| Topics/subscriptions (fan-out) | β No | β Yes |
| Sessions (ordered processing) | β No | β Yes |
| Transactions | β No | β Yes |
| Duplicate detection | β No | β Yes |
Choose Storage Queue for cost-effective, high-volume, simple queue scenarios.
Choose Service Bus for enterprise messaging with topics, sessions, or transactions.
π License
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Azure.Storage.Queues (>= 12.24.0)
- HoneyDrunk.Kernel (>= 0.3.0)
- HoneyDrunk.Kernel.Abstractions (>= 0.3.0)
- HoneyDrunk.Transport (>= 0.3.0)
- Microsoft.Extensions.Hosting.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Options (>= 10.0.0)
- Microsoft.Extensions.Options.DataAnnotations (>= 10.0.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
v0.3.0: BREAKING - Kernel v0.3.0 integration with TenantId/ProjectId in JSON envelope schema. Drain queues before upgrading. See CHANGELOG.md.