YAMCqrs.EventBus.Provider.Kafka
10.0.2
dotnet add package YAMCqrs.EventBus.Provider.Kafka --version 10.0.2
NuGet\Install-Package YAMCqrs.EventBus.Provider.Kafka -Version 10.0.2
<PackageReference Include="YAMCqrs.EventBus.Provider.Kafka" Version="10.0.2" />
<PackageVersion Include="YAMCqrs.EventBus.Provider.Kafka" Version="10.0.2" />
<PackageReference Include="YAMCqrs.EventBus.Provider.Kafka" />
paket add YAMCqrs.EventBus.Provider.Kafka --version 10.0.2
#r "nuget: YAMCqrs.EventBus.Provider.Kafka, 10.0.2"
#:package YAMCqrs.EventBus.Provider.Kafka@10.0.2
#addin nuget:?package=YAMCqrs.EventBus.Provider.Kafka&version=10.0.2
#tool nuget:?package=YAMCqrs.EventBus.Provider.Kafka&version=10.0.2
YAMCqrs.EventBus.Provider.Kafka
Kafka provider for the YAMCqrs.EventBus ecosystem.
This package adds support for publishing and consuming integration events using Apache Kafka as the messaging broker.
The implementation integrates with YAMCqrs.EventBus.Core and uses a decoupled architecture based on persistence, asynchronous processing, and internal background workers.
βοΈ Installation
dotnet add package YAMCqrs.EventBus.Provider.Kafka
π Quick Start
Register Kafka in the dependency injection container:
builder.Services.AddEventBus(opt =>
{
// Base EventBus configuration
})
.UseKafka(new KafkaConfigurationOptions()
{
ConnectionString = "cs_Kafka",
KafkaClientName = "TestApp",
KafkaGroupName = "TestApp",
});
Using "cs_Kafka" as the ConnectionString instructs the library to resolve the actual value from ConnectionStrings:Kafka following ADR 13.
βοΈ Configuration
KafkaConfigurationOptions
ConnectionStringConnection string used to connect to Kafka.KafkaClientNameIdentifier used by Kafka for logs and metrics.KafkaGroupNameConsumer Group used for distributed topic consumption.MaxConcurrentConsumersMaximum number of concurrent consumers listening for messages.
π οΈ Main Features
Decoupled persistence
Messages are first persisted and later processed in independent scopes.
Benefits:
- resilience
- asynchronous processing
- infrastructure decoupling
- controlled retries
Safe consumption
Message downloads are content-agnostic.
If a message contains invalid payload data:
- the failure occurs inside the application
- Kafka infinite retry loops are avoided
- offsets can continue advancing
This prevents permanently blocking the topic.
Automatic connection retries
Kafka requires all topics to exist before consumers can start.
If a topic is missing:
- the connection retries indefinitely
- failures are logged
- the system self-recovers once the topic exists
Source Generation
The library uses Source Generators to:
- automatically discover topics
- register consumers
- avoid Reflection
- improve startup performance
π Dependencies
Confluent.KafkaYAMCqrs.EventBus.Core
π€ Publishing Events
To publish an event:
- Inherit from
KafkaPublishEvent - Define the Topic
- Use
IEventPublisher
Actual publishing happens in a separate scope. The handler only registers the publishing intent.
π‘ Publish Event Example
internal sealed class MyKafkaPublishEvent : KafkaPublishEvent
{
public const string TopicName = "kafka.event.test";
public int Numerito { get; init; }
public override Dictionary<string, string>? GetCustomHeaders()
{
return null;
}
public override string Topic()
{
return TopicName;
}
}
π‘ Publishing the Event
internal sealed class MyLogicCommandHandler(
IEventPublisher eventPublisher)
: ICommandHandler<MyLogicCommand, string>
{
public async Task<Result<string>> HandleAsync(
MyLogicCommand command,
CancellationToken cancellationToken = default)
{
await eventPublisher.PublishAsync(
new MyKafkaPublishEvent(),
cancellationToken);
return Result<string>.Ok(command.Name);
}
}
π₯ Consuming Events
To consume events:
- Inherit from
KafkaSubscribeEvent - Define a constant topic
- Implement
ICommandHandler<TEvent, bool>
The topic name must be:
- a string literal
- or a constant reference
This is required for Source Generators to automatically discover topics.
π‘ Consume Event Example
internal sealed class MyKafkaSubscribeEvent()
: KafkaSubscribeEvent(MyKafkaPublishEvent.TopicName)
{
public int Numerito { get; init; }
}
π‘ Event Processing Example
internal sealed partial class MyKafkaSubscribeEventHanlder
: ICommandHandler<MyKafkaSubscribeEvent, bool>
{
public Task<Result<bool>> HandleAsync(
MyKafkaSubscribeEvent command,
CancellationToken cancellationToken = default)
{
return Task.FromResult(Result<bool>.Success(true));
}
}
β‘ Architectural Features
- Kafka integration
- Event-driven architecture
- Domain Events
- Integration Events
- Outbox-like processing
- Asynchronous consumers
- Retry support
- Low coupling
- Source-generated discovery
- Reflection-free startup
- Distributed messaging
- Background processing
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Confluent.Kafka (>= 2.14.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.7)
- YAMCqrs.Core (>= 10.0.2)
- YAMCqrs.EventBus.Core (>= 10.0.2)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.