Tributary.Kafka.Consumer
1.0.1
dotnet add package Tributary.Kafka.Consumer --version 1.0.1
NuGet\Install-Package Tributary.Kafka.Consumer -Version 1.0.1
<PackageReference Include="Tributary.Kafka.Consumer" Version="1.0.1" />
<PackageVersion Include="Tributary.Kafka.Consumer" Version="1.0.1" />
<PackageReference Include="Tributary.Kafka.Consumer" />
paket add Tributary.Kafka.Consumer --version 1.0.1
#r "nuget: Tributary.Kafka.Consumer, 1.0.1"
#:package Tributary.Kafka.Consumer@1.0.1
#addin nuget:?package=Tributary.Kafka.Consumer&version=1.0.1
#tool nuget:?package=Tributary.Kafka.Consumer&version=1.0.1
Tributary.Kafka.Consumer
A .NET Kafka consumer with attribute-driven topic binding. Wraps Akka.NET Streams Kafka under an opinionated, config-driven API.
dotnet add package Tributary.Kafka.Consumer
Write a consumer
One class, one or more [ConsumeTopic<>]-attributed methods. Topic names come from a strongly-typed config class — refactor-safe via nameof:
public sealed class OrderTopicsConfig
{
public string OrderPlacedTopic { get; set; } = default!;
public string OrderShippedTopic { get; set; } = default!;
}
public sealed class OrdersConsumer : KafkaConsumerBase
{
private readonly ILogger<OrdersConsumer> _log;
public OrdersConsumer(ILogger<OrdersConsumer> log) => _log = log;
[ConsumeTopic<OrderTopicsConfig>(nameof(OrderTopicsConfig.OrderPlacedTopic))]
public Task OnOrderPlaced(EventEnvelope<OrderPlacedEvent> envelope, CancellationToken ct)
{
_log.LogInformation("Order {Id} placed", envelope.Data.OrderId);
return Task.CompletedTask;
}
[ConsumeTopic<OrderTopicsConfig>(nameof(OrderTopicsConfig.OrderShippedTopic))]
public Task OnOrderShipped(OrderShippedEvent dto, CancellationToken ct) // raw DTO also OK
{
_log.LogInformation("Order {Id} shipped", dto.OrderId);
return Task.CompletedTask;
}
}
Bootstrap
await KafkaHost.CreateDefaultBuilder(args)
.ConfigureServices((ctx, services) =>
{
services.Configure<OrderTopicsConfig>(ctx.Configuration.GetSection("OrderTopicsConfig"));
services.AddKafkaConsumer<OrdersConsumer>(ctx.Configuration);
})
.Build()
.RunAsync();
AddKafkaConsumer<T> runs startup validation — it scans the consumer class, validates every [ConsumeTopic<>] resolves to a non-empty topic, every method has a valid signature, and no two methods bind to the same topic. Misconfiguration throws at registration time, not on first message.
Config
{
"Kafka": {
"BootstrapServers": "your-broker:9092",
"SecurityProtocol": "SASL_SSL",
"SaslMechanism": "SCRAM-SHA-256",
"SaslUsername": "your-user",
"SaslPassword": "your-password",
"GroupId": "my-service-orders",
"ClientId": "my-service-consumer",
"AutoOffsetReset": "earliest",
"CommitMode": "AfterHandler"
},
"OrderTopicsConfig": {
"OrderPlacedTopic": "orders.placed",
"OrderShippedTopic": "orders.shipped"
}
}
The consumer applies only the properties you set, so the same code works against Aiven, AWS MSK, Confluent Cloud, or self-hosted Kafka — just change config.
For mTLS, use SslCaLocation, SslCertificateLocation, SslKeyLocation instead of the SASL fields.
Handler signatures
The SDK supports both shapes — detected automatically by parameter type:
// Full envelope: gives you event-id, tenant-id, occurred-at metadata
Task OnPlaced(EventEnvelope<OrderPlacedEvent> envelope, CancellationToken ct);
// Raw DTO: payload only, simpler if you don't need metadata
Task OnPlaced(OrderPlacedEvent dto, CancellationToken ct);
// CancellationToken is optional in both forms
Task OnPlaced(EventEnvelope<OrderPlacedEvent> envelope);
Return Task or Task<T>. Result is ignored. Sync void is rejected at startup.
Commit mode
Two modes, configured via Kafka:CommitMode:
| Mode | Behavior | When to use |
|---|---|---|
AfterHandler (default) |
enable.auto.commit=false. Offset committed only after the handler Task completes successfully. |
Handler does load-bearing work — DB writes, HTTP calls, ledger entries. At-least-once delivery. |
AutoPeriodic |
enable.auto.commit=true. librdkafka commits offsets on a timer. |
Handler is a pure dispatcher to an actor/queue and returns immediately. Fire-and-forget. |
In AfterHandler mode, exceptions in the handler skip the commit, so Kafka redelivers on the next poll — at the cost of potentially blocking partition progress on a poison message until you fix it.
Idempotency (optional)
If you register an IProcessedEventStore, the SDK will:
- Check
envelope.EventIdbefore invoking your handler - Skip if already processed
- Mark as processed after success
services.AddSingleton<IProcessedEventStore, MyRedisProcessedEventStore>();
The SDK ships interface-only — you provide the implementation (Redis, Postgres, in-memory).
Pair with the producer
See Tributary.Kafka.Producer for the matching producer-side SDK.
License
MIT
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- Akka.Streams.Kafka (>= 1.5.67)
- Microsoft.Extensions.Configuration.Abstractions (>= 10.0.8)
- Microsoft.Extensions.DependencyInjection (>= 8.0.1)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.8)
- Microsoft.Extensions.Hosting (>= 8.0.1)
- Microsoft.Extensions.Hosting.Abstractions (>= 10.0.8)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.8)
- Microsoft.Extensions.Options.ConfigurationExtensions (>= 10.0.8)
- Tributary.Kafka.Abstractions (>= 1.0.1)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 1.0.1 | 107 | 5/27/2026 |