Itmo.Dev.Platform.Kafka
1.0.60
Prefix Reserved
See the version list below for details.
dotnet add package Itmo.Dev.Platform.Kafka --version 1.0.60
NuGet\Install-Package Itmo.Dev.Platform.Kafka -Version 1.0.60
<PackageReference Include="Itmo.Dev.Platform.Kafka" Version="1.0.60" />
paket add Itmo.Dev.Platform.Kafka --version 1.0.60
#r "nuget: Itmo.Dev.Platform.Kafka, 1.0.60"
// Install Itmo.Dev.Platform.Kafka as a Cake Addin #addin nuget:?package=Itmo.Dev.Platform.Kafka&version=1.0.60 // Install Itmo.Dev.Platform.Kafka as a Cake Tool #tool nuget:?package=Itmo.Dev.Platform.Kafka&version=1.0.60
Itmo.Dev.Platform.Kafka
Platform extensions for working with Kafka
Consumer
Implement message handler
public class MyMessageHandler : IKafkaMessageHandler<int, string>
{
public ValueTask HandleAsync(
IEnumerable<ConsumerKafkaMessage<TKey, TValue>> messages,
CancellationToken cancellationToken)
{
foreach (var message in messages)
{
Console.WriteLine($"Received message, Key = {message.Key}, Value = {message.Value}");
}
return ValueTask.CompletedTask;
}
}
Implement configuration type
public class Configuration : IKafkaConsumerConfiguration
{
public bool IsDisabled { get; init; }
public TimeSpan DisabledConsumerTimeout { get; init; }
public string Host { get; set; } = string.Empty;
public string Topic { get; init; } = string.Empty;
public string Group { get; init; } = string.Empty;
public int ParallelismDegree { get; init; }
public int BufferSize { get; init; }
public TimeSpan BufferWaitLimit { get; init; }
public bool ReadLatest { get; init; }
}
At runtime, configuration would be received as IOptionsMonitor<TConfiguration>
, so you should manually register it
to DI container as options.
Add consumer to your ASP.NET server services
Consumer requires an ASP.NET host, as it is implemented as hosted service
Use extension methods to register your consumer.
collection.AddKafkaConsumer<int, string>(builder => builder
.HandleWith<MyMessageHandler>()
.DeserializeKeyWithNewtonsoft()
.DeserializeValueWithNewtonsoft()
.UseConfiguration<Configuration>());
If you want to use custom deserializer, you can call DeserializeKeyWith<T> where T : IDeserializer<T>
method,
same way with value deserialization.
If your topic model is defined as protobuf, you can call DeserializeKeyWithProto
extension method,
same way with value deserialization.
Producer
Add producer to your service collection
collection.AddKafkaProducer<int, string>(builder => builder
.SerializeKeyWithNewtonsoft()
.SerializeValueWithNewtonsoft()
.UseConfiguration<Configuration>());
You can use protobuf serialization same way as it is with consumer.
Resolve producer instance from service provider
var producer = provider.GetRequiredService<IKafkaMessageProducer<int, string>>();
await producer.ProduceAsync(messages, default);
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net7.0 is compatible. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
-
net7.0
- Confluent.Kafka (>= 2.2.0)
- Google.Protobuf (>= 3.23.4)
- Itmo.Dev.Platform.Common (>= 1.1.60)
- Microsoft.Extensions.Hosting (>= 7.0.1)
- Microsoft.Extensions.Options (>= 7.0.1)
- Newtonsoft.Json (>= 13.0.3)
- Serilog (>= 3.0.1)
- System.Linq.Async (>= 6.0.1)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
2.2.234 | 71 | 11/18/2024 |
2.2.232 | 66 | 11/18/2024 |
2.2.228 | 71 | 10/31/2024 |
2.2.206 | 100 | 10/30/2024 |
2.2.204 | 80 | 10/29/2024 |
2.2.202 | 69 | 10/29/2024 |
2.1.196 | 68 | 10/29/2024 |
2.1.193 | 73 | 10/29/2024 |
2.1.190 | 76 | 10/26/2024 |
2.0.174 | 74 | 10/25/2024 |
2.0.133 | 68 | 10/25/2024 |
2.0.132 | 69 | 10/24/2024 |
2.0.131 | 86 | 10/24/2024 |
2.0.130 | 67 | 10/24/2024 |
2.0.129 | 73 | 10/24/2024 |
2.0.128 | 75 | 10/24/2024 |
2.0.127 | 71 | 10/23/2024 |
2.0.126 | 78 | 10/11/2024 |
2.0.125 | 76 | 10/11/2024 |
2.0.124 | 71 | 10/11/2024 |
2.0.123 | 71 | 10/11/2024 |
2.0.122 | 75 | 10/11/2024 |
2.0.121 | 75 | 10/11/2024 |
2.0.120 | 79 | 10/11/2024 |
2.0.119 | 77 | 10/11/2024 |
2.0.118 | 86 | 10/11/2024 |
2.0.117 | 78 | 10/10/2024 |
2.0.115 | 80 | 10/10/2024 |
2.0.113 | 76 | 10/6/2024 |
2.0.112 | 128 | 8/22/2024 |
2.0.111 | 139 | 5/14/2024 |
2.0.110 | 110 | 5/13/2024 |
2.0.109 | 105 | 5/13/2024 |
2.0.108 | 126 | 5/6/2024 |
2.0.107 | 107 | 5/6/2024 |
2.0.106 | 108 | 5/6/2024 |
2.0.105 | 108 | 5/4/2024 |
2.0.102 | 115 | 5/4/2024 |
2.0.101 | 105 | 4/26/2024 |
2.0.100 | 121 | 4/22/2024 |
2.0.99 | 107 | 4/21/2024 |
2.0.98 | 108 | 4/21/2024 |
2.0.97 | 102 | 4/19/2024 |
2.0.96 | 105 | 4/18/2024 |
2.0.95 | 102 | 4/18/2024 |
2.0.94 | 125 | 2/13/2024 |
2.0.93 | 119 | 2/10/2024 |
2.0.91 | 98 | 2/10/2024 |
2.0.90 | 113 | 2/10/2024 |
2.0.89 | 256 | 2/3/2024 |
2.0.86 | 103 | 2/1/2024 |
1.1.85 | 105 | 1/30/2024 |
1.1.83 | 152 | 1/27/2024 |
1.1.82 | 192 | 1/5/2024 |
1.1.81 | 280 | 1/3/2024 |
1.0.80 | 142 | 12/30/2023 |
1.0.79 | 122 | 12/30/2023 |
1.0.78 | 125 | 12/30/2023 |
1.0.77 | 135 | 12/28/2023 |
1.0.76 | 122 | 12/27/2023 |
1.0.75 | 158 | 12/9/2023 |
1.0.73 | 160 | 11/30/2023 |
1.0.72 | 140 | 11/27/2023 |
1.0.71 | 139 | 11/27/2023 |
1.0.70 | 128 | 11/27/2023 |
1.0.69 | 110 | 11/27/2023 |
1.0.67 | 142 | 11/25/2023 |
1.0.66 | 129 | 11/25/2023 |
1.0.65 | 135 | 11/23/2023 |
1.0.64 | 123 | 11/23/2023 |
1.0.62 | 127 | 11/18/2023 |
1.0.61 | 131 | 11/18/2023 |
1.0.60 | 118 | 11/17/2023 |
1.0.59 | 120 | 11/16/2023 |
1.0.58 | 116 | 11/13/2023 |
1.0.57 | 107 | 11/12/2023 |
1.0.56 | 118 | 11/12/2023 |
1.0.54 | 123 | 11/7/2023 |
1.0.53 | 125 | 11/4/2023 |
1.0.52 | 152 | 10/29/2023 |
1.0.51 | 135 | 10/29/2023 |
1.0.50 | 143 | 10/29/2023 |
1.0.48 | 127 | 10/29/2023 |
Added options post configure