YAMCqrs.EventBus.Provider.Kafka 10.0.2

dotnet add package YAMCqrs.EventBus.Provider.Kafka --version 10.0.2
                    
NuGet\Install-Package YAMCqrs.EventBus.Provider.Kafka -Version 10.0.2
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="YAMCqrs.EventBus.Provider.Kafka" Version="10.0.2" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="YAMCqrs.EventBus.Provider.Kafka" Version="10.0.2" />
                    
Directory.Packages.props
<PackageReference Include="YAMCqrs.EventBus.Provider.Kafka" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add YAMCqrs.EventBus.Provider.Kafka --version 10.0.2
                    
#r "nuget: YAMCqrs.EventBus.Provider.Kafka, 10.0.2"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package YAMCqrs.EventBus.Provider.Kafka@10.0.2
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=YAMCqrs.EventBus.Provider.Kafka&version=10.0.2
                    
Install as a Cake Addin
#tool nuget:?package=YAMCqrs.EventBus.Provider.Kafka&version=10.0.2
                    
Install as a Cake Tool

YAMCqrs.EventBus.Provider.Kafka

Documentacion en espaΓ±ol

Kafka provider for the YAMCqrs.EventBus ecosystem.

This package adds support for publishing and consuming integration events using Apache Kafka as the messaging broker.

The implementation integrates with YAMCqrs.EventBus.Core and uses a decoupled architecture based on persistence, asynchronous processing, and internal background workers.


βš™οΈ Installation

dotnet add package YAMCqrs.EventBus.Provider.Kafka

πŸš€ Quick Start

Register Kafka in the dependency injection container:

builder.Services.AddEventBus(opt =>
{
    // Base EventBus configuration
})
.UseKafka(new KafkaConfigurationOptions()
{
    ConnectionString = "cs_Kafka",
    KafkaClientName = "TestApp",
    KafkaGroupName = "TestApp",
});

Using "cs_Kafka" as the ConnectionString instructs the library to resolve the actual value from ConnectionStrings:Kafka following ADR 13.


βš™οΈ Configuration

KafkaConfigurationOptions

  • ConnectionString Connection string used to connect to Kafka.

  • KafkaClientName Identifier used by Kafka for logs and metrics.

  • KafkaGroupName Consumer Group used for distributed topic consumption.

  • MaxConcurrentConsumers Maximum number of concurrent consumers listening for messages.


πŸ› οΈ Main Features

Decoupled persistence

Messages are first persisted and later processed in independent scopes.

Benefits:

  • resilience
  • asynchronous processing
  • infrastructure decoupling
  • controlled retries

Safe consumption

Message downloads are content-agnostic.

If a message contains invalid payload data:

  • the failure occurs inside the application
  • Kafka infinite retry loops are avoided
  • offsets can continue advancing

This prevents permanently blocking the topic.


Automatic connection retries

Kafka requires all topics to exist before consumers can start.

If a topic is missing:

  • the connection retries indefinitely
  • failures are logged
  • the system self-recovers once the topic exists

Source Generation

The library uses Source Generators to:

  • automatically discover topics
  • register consumers
  • avoid Reflection
  • improve startup performance

πŸ“‹ Dependencies

  • Confluent.Kafka
  • YAMCqrs.EventBus.Core

πŸ“€ Publishing Events

To publish an event:

  1. Inherit from KafkaPublishEvent
  2. Define the Topic
  3. Use IEventPublisher

Actual publishing happens in a separate scope. The handler only registers the publishing intent.


πŸ’‘ Publish Event Example

internal sealed class MyKafkaPublishEvent : KafkaPublishEvent
{
    public const string TopicName = "kafka.event.test";

    public int Numerito { get; init; }

    public override Dictionary<string, string>? GetCustomHeaders()
    {
        return null;
    }

    public override string Topic()
    {
        return TopicName;
    }
}

πŸ’‘ Publishing the Event

internal sealed class MyLogicCommandHandler(
    IEventPublisher eventPublisher)
    : ICommandHandler<MyLogicCommand, string>
{
    public async Task<Result<string>> HandleAsync(
        MyLogicCommand command,
        CancellationToken cancellationToken = default)
    {
        await eventPublisher.PublishAsync(
            new MyKafkaPublishEvent(),
            cancellationToken);

        return Result<string>.Ok(command.Name);
    }
}

πŸ“₯ Consuming Events

To consume events:

  1. Inherit from KafkaSubscribeEvent
  2. Define a constant topic
  3. Implement ICommandHandler<TEvent, bool>

The topic name must be:

  • a string literal
  • or a constant reference

This is required for Source Generators to automatically discover topics.


πŸ’‘ Consume Event Example

internal sealed class MyKafkaSubscribeEvent()
    : KafkaSubscribeEvent(MyKafkaPublishEvent.TopicName)
{
    public int Numerito { get; init; }
}

πŸ’‘ Event Processing Example

internal sealed partial class MyKafkaSubscribeEventHanlder
    : ICommandHandler<MyKafkaSubscribeEvent, bool>
{
    public Task<Result<bool>> HandleAsync(
        MyKafkaSubscribeEvent command,
        CancellationToken cancellationToken = default)
    {
        return Task.FromResult(Result<bool>.Success(true));
    }
}

⚑ Architectural Features

  • Kafka integration
  • Event-driven architecture
  • Domain Events
  • Integration Events
  • Outbox-like processing
  • Asynchronous consumers
  • Retry support
  • Low coupling
  • Source-generated discovery
  • Reflection-free startup
  • Distributed messaging
  • Background processing
Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
10.0.2 34 5/20/2026
10.0.1 87 5/18/2026
10.0.0 88 5/17/2026