PubSea.Messaging.EntityFramework.Kafka
9.2.17
See the version list below for details.
dotnet add package PubSea.Messaging.EntityFramework.Kafka --version 9.2.17
NuGet\Install-Package PubSea.Messaging.EntityFramework.Kafka -Version 9.2.17
<PackageReference Include="PubSea.Messaging.EntityFramework.Kafka" Version="9.2.17" />
<PackageVersion Include="PubSea.Messaging.EntityFramework.Kafka" Version="9.2.17" />
<PackageReference Include="PubSea.Messaging.EntityFramework.Kafka" />
paket add PubSea.Messaging.EntityFramework.Kafka --version 9.2.17
#r "nuget: PubSea.Messaging.EntityFramework.Kafka, 9.2.17"
#:package PubSea.Messaging.EntityFramework.Kafka@9.2.17
#addin nuget:?package=PubSea.Messaging.EntityFramework.Kafka&version=9.2.17
#tool nuget:?package=PubSea.Messaging.EntityFramework.Kafka&version=9.2.17
PubSea Framework
Table of Contents
- Overview
- Solution Layout
- NuGet Packages and Features
- Getting Started
- Package Reference
- Design Notes
- Troubleshooting
- FAQ
- Build & Pack
- Contributing
- License
Overview
SeaFramework is an opinionated ASP.NET Core framework that applies Domain-Driven Design (DDD) patterns and provides production-ready utilities: a lightweight Mediator, an Entity Framework–backed Outbox/Inbox messaging module with pluggable brokers (Kafka, RabbitMQ, or a no‑op default), mapping helpers, health checks, error handling middleware, Snowflake/HashId services, file storage (MinIO), hybrid caching, and more.
Use packages together for a full stack, or reference only what you need.
Solution Layout
00-Framework: Core DDD and web utilities- Domain events and dispatcher (
IEvent,IEventHandler<T>,AddSeaEventDispatcher) - Unit of Work (
IUnitOfWork,IEfUnitOfWork,AddEfUnitOfWork<TDbContext>) and EF helpers - Middlewares (exception handler, web error handler) and API response helpers
- Health checks (cURL checks), HTTP error logging handler
- Mapster-based mapper (
AddSeaMapper,UseSeaMapper) - Snowflake/HashId/DateTime services
- File storage (MinIO) via
AddSeaFileStore - Utilities: caching extensions, SLO helpers, logging, helpers
- Domain events and dispatcher (
01-Messaging.EntityFramework: EF Core Outbox/Inbox messaging- Outbox model and interceptor, inbox store, background services:
OutboxPollingPublisher,OutboxMessageTransporter,InboxCleaner
- Auto‑discovery for
ISeaConsumer<TMessage>implementations - Dynamic message contract registration for interface-based payloads
- Outbox model and interceptor, inbox store, background services:
02-Messaging.EntityFramework.Kafka: Kafka broker provider03-Messaging.EntityFramework.RabbitMq: RabbitMQ broker provider04-Mediator: Lightweight mediator withSend/Publishand handler discoveryQualifiers/00-RestApi: runnable sample for wiring the framework in an ASP.NET Core appQualifiers/01-MessagingEfKafka: runnable sample for messaging with Kafka
NuGet Packages and Features
Each numbered project is produced as a NuGet package. Install only what you need.
Packages:
PubSea.Framework
- PackageId:
PubSea.Framework - Provides: DDD primitives, domain events/dispatcher, UoW, middlewares, API response helpers, mapper, health checks, HttpClient error logging, Snowflake/HashId/DateTime services, MinIO file store
- Install:
dotnet add package PubSea.Framework
- PackageId:
PubSea.Mediator
- PackageId:
PubSea.Mediator - Provides: Mediator abstractions and implementation; auto registration of handlers via DI
- Install:
dotnet add package PubSea.Mediator
- PackageId:
PubSea.Messaging.EntityFramework
- PackageId:
PubSea.Messaging.EntityFramework - Provides: EF Core Outbox/Inbox messaging,
ISeaPublisher, background services, consumer discovery, EF interceptor - Depends on:
PubSea.Framework - Install:
dotnet add package PubSea.Messaging.EntityFramework
- PackageId:
PubSea.Messaging.EntityFramework.Kafka
- PackageId:
PubSea.Messaging.EntityFramework.Kafka - Provides: Kafka broker transport/consume integration for the messaging module
- Depends on:
PubSea.Messaging.EntityFramework - Install:
dotnet add package PubSea.Messaging.EntityFramework.Kafka
- PackageId:
PubSea.Messaging.EntityFramework.RabbitMq
- PackageId:
PubSea.Messaging.EntityFramework.RabbitMq - Provides: RabbitMQ broker transport integration for the messaging module
- Depends on:
PubSea.Messaging.EntityFramework - Install:
dotnet add package PubSea.Messaging.EntityFramework.RabbitMq
- PackageId:
Feature to package mapping:
- Mediator (Send/Publish):
PubSea.Mediator - Domain events and dispatcher:
PubSea.Framework - API response envelope, middlewares, mapper, health checks, HttpClient logging, services (Snowflake/HashId/DateTime), MinIO file store:
PubSea.Framework - Messaging (Outbox/Inbox,
ISeaPublisher, consumers, background services):PubSea.Messaging.EntityFramework - Kafka transport:
PubSea.Messaging.EntityFramework.Kafka - RabbitMQ transport:
PubSea.Messaging.EntityFramework.RabbitMq
Getting started
Open the solution SeaFramework.sln in Visual Studio (or run dotnet build). You can reference projects directly or pack them as NuGet packages with dotnet pack (or pack.bat) and consume them from a local feed.
Minimal wiring (Web API)
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
// Mediator
builder.Services.AddSeaMediator();
// Domain events (auto-registers IEventHandler<T>)
builder.Services.AddSeaEventDispatcher();
// Mapper (Mapster)
builder.Services.AddSeaMapper();
// Optional utilities
builder.Services.AddSnowflakeService();
builder.Services.AddHashIdService();
builder.Services.AddDateTimeService();
var app = builder.Build();
app.UseSeaMapper();
app.UseSeaEventDispatcher();
app.MapControllers();
app.Run();
Mediator
public sealed record CreateUser(string Email) : ISeaRequest;
public sealed class CreateUserHandler : ISeaRequestHandler<CreateUser>
{
public Task Handle(CreateUser request, CancellationToken ct = default)
=> Task.CompletedTask;
}
public sealed record GetUser(long Id) : ISeaRequest<UserDto>;
public sealed class GetUserHandler : ISeaRequestHandler<GetUser, UserDto>
{
public Task<UserDto> Handle(GetUser request, CancellationToken ct = default)
=> Task.FromResult(new UserDto());
}
await mediator.Send(new CreateUser("a@b.com"), ct);
var dto = await mediator.Send<GetUser, UserDto>(new GetUser(1), ct);
await mediator.Publish(new CreateUser("c@d.com"), ct); // fan-out
Handlers are auto-registered from loaded assemblies by AddSeaMediator.
Domain Events
builder.Services.AddSeaEventDispatcher();
app.UseSeaEventDispatcher();
Implement IEventHandler<TEvent> anywhere in your assemblies. Handlers are discovered and registered automatically.
Messaging (EF Outbox/Inbox)
Add messaging and choose a broker provider. Your application DbContext is used for Outbox/Inbox; an EF interceptor persists outbox messages transactionally.
builder.Services.AddSeaMessaging<AppDbContext>(cfg =>
{
// For dev/local you can start with the default (no-op) broker:
// cfg.UseDefaultBroker();
// Tuning:
// cfg.PublishOutboxInstantly = false;
// cfg.OutboxPollingInterval = TimeSpan.FromSeconds(2);
// cfg.InboxRetentionInterval = TimeSpan.FromMinutes(5);
// cfg.InboxMessageTtl = TimeSpan.FromHours(24);
});
Publish events inside your application workflows:
public sealed class UserCreated : IEvent { /* ... */ }
public sealed class UserService
{
private readonly ISeaPublisher _publisher;
private readonly ISeaMessagingDbContext _db; // your DbContext
public async Task CreateUser(CancellationToken ct)
{
// ... persist user
await _publisher.Publish([ new UserCreated(/*...*/) ], ct);
await _db.SaveChangesAsync(ct); // outbox stored in same transaction
}
}
Consume messages by implementing ISeaConsumer<TMessage>:
public interface IUserCreated
{
long Id { get; set; }
string Email { get; set; }
}
public sealed class UserCreatedConsumer : ISeaConsumer<IUserCreated>
{
public Task Consume(IUserCreated message, CancellationToken ct = default)
=> Task.CompletedTask;
}
Background services included:
OutboxPollingPublisher: scans and enqueues unpublished outbox messagesOutboxMessageTransporter: ships payloads to the configured brokerInboxCleaner: retention/TTL of inbox messages
Implement your DbContext for messaging by implementing ISeaMessagingDbContext:
public sealed class AppDbContext : DbContext, ISeaMessagingDbContext
{
public DbSet<OutboxMessage> OutboxMessages { get; set; } = null!;
public DbSet<InboxMessage> InboxMessages { get; set; } = null!;
public DbSet<ConsumedFaultMessage> ConsumedFaultMessages { get; set; } = null!;
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
base.OnModelCreating(modelBuilder);
// Optional: override mappings for messaging tables if needed
// modelBuilder.Entity<OutboxMessage>().ToTable("outbox_messages", "messaging");
}
}
Notes:
- The EF interceptor (
OutboxSavedChangesInterceptor) is wired automatically viaUseSeaMessagingwhen you callAddSeaMessaging<TDbContext>(). SeaPublisher.Publish(...)stages outbox records; the actual broker transport happens by background services.- If
PublishOutboxInstantlyis true, messages are sent to broker immediately during save; otherwise polling handles them.
Kafka provider
services.Configure<KafkaConfig>(cfg =>
{
cfg.ClientId = "my-app";
cfg.ConnectionString = "PLAINTEXT://localhost:9092";
cfg.TopicName = "outbox";
cfg.ConsumingTopicNames = new() { "outbox" };
cfg.Partitions = 10;
cfg.ConsumerGroupId = "my-app-group";
cfg.ConcurrentConsumers = 2;
});
RabbitMQ provider
services.Configure<RabbitMqConfig>(cfg =>
{
cfg.ClientId = "my-app";
cfg.ConnectionString = "amqp://user:pass@localhost:5672";
});
Additional utilities (00-Framework)
Package Reference
00-Framework
- Namespaces and key entry points
PubSea.Framework.DomainModel.FrameworkBootstarpperAddSeaEventDispatcher(),UseSeaEventDispatcher()AddSeaMapper(),UseSeaMapper()AddEfUnitOfWork<TDbContext>()AddSeaFileStore(Action<SeaFileStoreConfig>)AddHttpErrorLoggerMessageHandler()
- Middlewares
ExceptionMiddleware,WebErrorHandlerMiddleware
- API response
ApiActionResult,ApiResult,ApiError
- Services
ISnowflakeService,IHashIdService,IDateTimeService,ISeaFileStore
Recommended usage:
builder.Services.AddSeaEventDispatcher();
builder.Services.AddSeaMapper();
builder.Services.AddEfUnitOfWork<AppDbContext>();
builder.Services.AddHttpErrorLoggerMessageHandler();
builder.Services.AddSeaFileStore(cfg => { /* MinIO settings */ });
app.UseSeaEventDispatcher();
app.UseSeaMapper();
See also: Qualifiers/00-RestApi
01-Messaging.EntityFramework
- Bootstrapping:
MessagingEntityFramworkBootstrapper.AddSeaMessaging<TDbContext>(Action<SeaMessagingConfig>) - Configuration:
SeaMessagingConfigPublishOutboxInstantly,OutboxPollingInterval,InboxRetentionInterval,InboxMessageTtl,UseDefaultBroker()
- EF integration
ISeaMessagingDbContextto be implemented by yourDbContextUseSeaMessaging(internal) addsOutboxSavedChangesInterceptor
- Services
ISeaPublisher(stages or directly transports messages)- Background services:
OutboxPollingPublisher,OutboxMessageTransporter,InboxCleaner
- Message consumption
- Implement
ISeaConsumer<TMessage>; consumers are auto-discovered - Interface-based contracts are materialized at runtime and mapped from payloads
- Implement
Minimal setup:
builder.Services.AddSeaMessaging<AppDbContext>(cfg =>
{
// cfg.UseDefaultBroker();
cfg.OutboxPollingInterval = TimeSpan.FromSeconds(2);
});
Domain to integration flow example: see Messaging (EF Outbox/Inbox)
02-Messaging.EntityFramework.Kafka
- Configure via
KafkaConfig(ClientId,ConnectionString,TopicName,ConsumingTopicNames,Partitions,ConsumerGroupId,ConcurrentConsumers) - Supports partitioned publishing and multiple concurrent consumers
services.Configure<KafkaConfig>(cfg =>
{
cfg.ClientId = "my-app";
cfg.ConnectionString = "PLAINTEXT://localhost:9092";
cfg.TopicName = "outbox";
cfg.ConsumingTopicNames = new() { "outbox" };
cfg.Partitions = 10;
cfg.ConsumerGroupId = "my-app-group";
cfg.ConcurrentConsumers = 2;
});
03-Messaging.EntityFramework.RabbitMq
- Configure via
RabbitMqConfig(ClientId,ConnectionString)
services.Configure<RabbitMqConfig>(cfg =>
{
cfg.ClientId = "my-app";
cfg.ConnectionString = "amqp://user:pass@localhost:5672";
});
04-Mediator
- Bootstrapping:
SeaMediatorBootstrapper.AddSeaMediator() - Contracts:
ISeaRequest,ISeaRequest<TResponse> - Handlers:
ISeaRequestHandler<TRequest>,ISeaRequestHandler<TRequest,TResponse> - Operations:
Send,Send<TReq,TRes>,Publish
Notes:
Send<TReq>resolves a single handler;Publish<TReq>invokes all matching handlers (fan‑out)- Auto‑registration scans loaded assemblies for handler implementations
Qualifiers (Samples)
Qualifiers/00-RestApi: end‑to‑end wiring of controllers, mediator, events, mapper, health checks, logging, file store, cachingQualifiers/01-MessagingEfKafka: EF messaging with Kafka; checkProgram.csfor DI wiring
Design Notes
- Outbox pattern is implemented using EF Core interceptor to ensure message persistence within the application transaction
- Consumers are interface‑based to allow schema stability and dynamic contract types at runtime
- Mediator is intentionally minimal and DI‑backed for simplicity and performance
Troubleshooting
- Messages not leaving the DB
- Ensure background services are running:
OutboxPollingPublisher,OutboxMessageTransporter - If using
PublishOutboxInstantly=false, verifyOutboxPollingIntervaland DB connectivity - Confirm your
DbContextimplementsISeaMessagingDbContext
- Ensure background services are running:
- Consumer not invoked
- Verify a single
ISeaConsumer<T>per message type exists (only one is used per contract) - Ensure the message payload type matches the consumer’s generic interface
- Confirm the assembly containing the consumer is loaded at runtime
- Verify a single
InvalidOperationException: No service for type ...- Make sure you called the relevant bootstrappers (
AddSeaMediator,AddSeaEventDispatcher,AddSeaMessaging<TDbContext>, etc.)
- Make sure you called the relevant bootstrappers (
FAQ
Why interface‑based message contracts?
- Interfaces allow generating lightweight runtime types, keeping payload shape stable and independent from concrete classes.
Can I override messaging table names?
- Yes. Use EF Fluent API in your
OnModelCreatingforOutboxMessage,InboxMessage, andConsumedFaultMessage.
- Yes. Use EF Fluent API in your
How do I ensure idempotency?
- Inbox storage with TTL/retention tracks processed messages; design consumers to be idempotent.
Can I use the mediator without messaging?
- Yes. All packages are decoupled; reference only what you need.
Middlewares: exception handler,
WebErrorHandlerMiddlewareAPI response helpers:
ApiActionResult,ApiError,ApiResultHealth checks:
AddCurlHealthCheckfor external endpointsHTTP:
HttpErrorLoggerMessageHandlerMapping:
AddSeaMapper+SeaTypeAdapterConfigshortcutsServices:
AddSnowflakeService,AddHashIdService,AddDateTimeServiceFile store (MinIO):
AddSeaFileStore+ health check integrationCaching & SLO:
AddSeaRedisHybridCache,AddSeaIdentityProviderSlo(seeQualifiers/00-RestApi)
API responses
Wrap controller responses into a unified envelope using ApiActionResult:
[ApiController]
[Route("users")]
public sealed class UsersController : ControllerBase
{
[HttpGet("{id}")]
public IActionResult Get(long id)
{
var dto = new { id, email = "a@b.com" };
return dto.ToActionResult(); // 200 with { result: {...} }
}
[HttpPost]
public IActionResult Create([FromBody] CreateUserRequest rq)
{
var resource = new { id = 1L };
return resource.ToActionResult($"/users/{resource.id}"); // 201 Created
}
}
Error handling middleware
Add the unified error pipeline for REST APIs:
app.UseMiddleware<WebErrorHandlerMiddleware>(); // or app.UseExecptionHandler();
Throw SeaException with codes/trace ids when needed; other exceptions are normalized with consistent structure by the middleware.
Health checks and cURL checks
builder.Services.AddHealthChecks()
.AddCurlHealthCheck([
new HealthCheckEndpoint { Name = "google", Url = "https://google.com" }
]);
app.MapHealthChecks("/healthz", new HealthCheckOptions
{
Predicate = _ => true,
ResponseWriter = UIResponseWriter.WriteHealthCheckUIResponse,
});
HttpClient error logging
Register the logging handler and add it to your clients:
builder.Services.AddHttpErrorLoggerMessageHandler();
builder.Services.AddHttpClient("backend")
.AddHttpMessageHandler<HttpErrorLoggerMessageHandler>();
var client = httpClientFactory.CreateClient("backend");
Mapster-based mapping
Register Sea mapper and customize mappings via SeaTypeAdapterConfig:
builder.Services.AddSeaMapper(cfg =>
{
cfg.NewConfig<User, UserDto>()
.Map(d => d.Id, s => s.Id)
.Map(d => d.Email, s => s.Email);
});
app.UseSeaMapper(); // adds default local date/time conversions
Snowflake and HashId services
builder.Services.AddSnowflakeService(options =>
{
options.GeneratorId = 7;
options.Epoch = new DateTime(2025, 1, 1, 0, 0, 0, DateTimeKind.Utc);
options.IdStructure = (43, 6, 14);
});
builder.Services.AddHashIdService(options =>
{
options.MinHashLength = 12;
});
public sealed class IdService
{
private readonly ISnowflakeService _snowflake;
private readonly IHashIdService _hash;
public long NewId() => _snowflake.CreateId();
public string Encode(long id) => _hash.Encode(id);
public long Decode(string hash) => _hash.Decode(hash);
}
Unit of Work with EF Core
builder.Services.AddEfUnitOfWork<AppDbContext>();
public sealed class OrderService
{
private readonly IEfUnitOfWork _uow;
public OrderService(IEfUnitOfWork uow)
{
_uow = uow;
}
public async Task PlaceOrder(CancellationToken ct)
{
await _uow.WithExecutionStrategy(async token =>
{
await _uow.BeginTransaction(IsolationLevel.ReadCommitted, token);
try
{
// ... mutate aggregates, add entities
await _uow.SaveChanges(token);
await _uow.CommitTransaction(token);
}
catch
{
await _uow.RollbackTransaction(token);
throw;
}
}, ct);
}
}
File store (MinIO)
builder.Services.AddSeaFileStore(cfg =>
{
cfg.BaseUrl = "http://localhost:9000";
cfg.UserName = "ROOTUSER";
cfg.Password = "CHANGEME123";
cfg.RootName = "users";
});
public sealed class AvatarService
{
private readonly ISeaFileStore _files;
public AvatarService(ISeaFileStore files) => _files = files;
public async Task<string> Upload(byte[] content, CancellationToken ct)
=> await _files.SaveFile("avatars/u1.png", "image/png", content, ct);
public Task<string> GetPutUrl(string key) => _files.ConstructPutPresignedUrl(key, TimeSpan.FromMinutes(5));
public Task<string> GetGetUrl(string key) => _files.ConstructGetPresignedUrl(key, TimeSpan.FromMinutes(5));
}
Build & pack
- Build:
dotnet buildor openSeaFramework.slnin Visual Studio - Pack:
dotnet pack(per project) or runpack.batto produce local NuGet packages
Samples
Qualifiers/00-RestApi: integrates mediator, events, mapper, health checks, logging, file store, cacheQualifiers/01-MessagingEfKafka: EF messaging with Kafka
Contributing
Contributions are welcome. Please keep changes focused, follow existing style, update XML docs where appropriate, and include/adjust a qualifier sample if you change public APIs.
License
This project is licensed under the MIT License. See the LICENSE file for details.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net9.0
- Confluent.Kafka (>= 2.12.0)
- PubSea.Messaging.EntityFramework (>= 9.2.17)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 10.0.9 | 101 | 3/30/2026 |
| 10.0.8 | 107 | 3/10/2026 |
| 10.0.7 | 108 | 2/26/2026 |
| 10.0.6 | 122 | 1/30/2026 |
| 10.0.5 | 126 | 1/8/2026 |
| 10.0.4 | 119 | 12/29/2025 |
| 10.0.3 | 311 | 12/18/2025 |
| 10.0.2 | 439 | 12/11/2025 |
| 10.0.1 | 220 | 11/26/2025 |
| 10.0.0 | 321 | 11/13/2025 |
| 9.2.18 | 163 | 11/7/2025 |
| 9.2.17 | 184 | 11/7/2025 |
| 9.2.16 | 190 | 11/7/2025 |