EventStreamProcessing.Kafka
1.0.0
dotnet add package EventStreamProcessing.Kafka --version 1.0.0
NuGet\Install-Package EventStreamProcessing.Kafka -Version 1.0.0
<PackageReference Include="EventStreamProcessing.Kafka" Version="1.0.0" />
paket add EventStreamProcessing.Kafka --version 1.0.0
#r "nuget: EventStreamProcessing.Kafka, 1.0.0"
// Install EventStreamProcessing.Kafka as a Cake Addin #addin nuget:?package=EventStreamProcessing.Kafka&version=1.0.0 // Install EventStreamProcessing.Kafka as a Cake Tool #tool nuget:?package=EventStreamProcessing.Kafka&version=1.0.0
Event Stream Processing Micro-Framework
Single event stream processing micro-framework for Apache Kafka using .NET Core
Introduction
This framework provides a set of interfaces and abstract base classes for building an event stream processing pipeline. These are contained in the EventStreamProcessing.Abstractions package, are generic in nature, and are not tied to any one streaming platform, such as Apache Kafka. To use these abstractions simply create a class that extends EventProcessor
and supply the required consumers, producers and message handlers.
While the abstractions are not coupled to any streaming platform, the EventStreamProcessing.Kafka package provides an implementation that uses the Confluent.Kafka package to read and write event streams using Apache Kafka.
Sample Description
The best way to become familiar with this framework is to examine the EventStreamProcessing.Sample.Worker project in the samples folder. You can use Docker to run a local instance of the Kafka broker, then run the sample worker, consumer and producer apps.
Here is a diagram depicting how an event stream is processed by the Sample Worker service to validate, enrich and filter messages before writing them back to Kafka.
- The Sample Producer console app lets the user write a stream of events to the Kafka broker using the "raw-events" topic. The numeral represents the event key, and the text "Hello World" presents the event value.
- The Sample Worker service injects an
IEventProcessor
into theKafkaWorker
class constuctor. ThenExecuteAsync
method callseventProcessor.Process
in awhile
loop until the operation is cancelled. - The
Program.CreateHostBuilder
method registers anIEventProcessor
for dependency injection with aKafkaEventProcessor
that usesKafkaEventConsumer
,KafkaEventProducer
and an array ofMessageHandler
withValidationHandler
,EnrichmentHandler
andFilterHandler
.
// Add event processor
services.AddSingleton<IEventProcessor>(sp =>
{
// Create logger, consumer, producers
var logger = sp.GetRequiredService<ILogger>();
var kafkaConsumer = KafkaUtils.CreateConsumer(
consumerOptions.Brokers, consumerOptions.TopicsList,
sp.GetRequiredService<ILogger>());
var producerOptions = sp.GetRequiredService<ProducerOptions>();
var kafkaErrorProducer = KafkaUtils.CreateProducer(
producerOptions.Brokers, producerOptions.ValidationTopic,
sp.GetRequiredService<ILogger>());
var kafkaFinalProducer = KafkaUtils.CreateProducer(
producerOptions.Brokers, producerOptions.FinalTopic,
sp.GetRequiredService<ILogger>());
// Create handlers
var handlers = new List<MessageHandler>
{
new ValidationHandler(
sp.GetRequiredService<IDictionary<int, string>>(),
new KafkaEventProducer<int, string>(kafkaErrorProducer, producerOptions.ValidationTopic, logger),
logger),
new EnrichmentHandler(
sp.GetRequiredService<IDictionary<int, string>>(), logger),
new FilterHandler(
m => !m.Value.Contains("Hello"), logger) // Filter out English greetings
};
// Create event processor
return new KafkaEventProcessor<int, string, int, string>(
new KafkaEventConsumer<int, string>(kafkaConsumer, logger),
new KafkaEventProducer<int, string>(kafkaFinalProducer, producerOptions.FinalTopic, logger),
handlers.ToArray());
});
- The
KafkaEventConsumer
in Sample Worker subscribes to the "raw-events" topic of the Kafka broker running onlocalhost:9092
. The message handlers validate, enrich and filter the events one at a time. If there are validation errors, those are written back to Kafka with a "validation-errors" topic. This takes place if the message key does not correlate to a key in the language store. TheEnrichmentHandler
looks up a translation for "Hello" in the language store and transforms the message with the selected translation. TheFilterHandler
accepts a lambda expression for filtering messages. In this case the English phrase "Hello" is filtered out. Lastly, theKafkaEventProducer
writes processed events back to Kafka using the "final-events" topic. - The Sample Consumer console app reads the "validation-errors" and "final-events" topics, displaying them in the console.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net5.0 was computed. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
.NET Core | netcoreapp2.0 was computed. netcoreapp2.1 was computed. netcoreapp2.2 was computed. netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
.NET Standard | netstandard2.0 is compatible. netstandard2.1 was computed. |
.NET Framework | net461 was computed. net462 was computed. net463 was computed. net47 was computed. net471 was computed. net472 was computed. net48 was computed. net481 was computed. |
MonoAndroid | monoandroid was computed. |
MonoMac | monomac was computed. |
MonoTouch | monotouch was computed. |
Tizen | tizen40 was computed. tizen60 was computed. |
Xamarin.iOS | xamarinios was computed. |
Xamarin.Mac | xamarinmac was computed. |
Xamarin.TVOS | xamarintvos was computed. |
Xamarin.WatchOS | xamarinwatchos was computed. |
-
.NETStandard 2.0
- Confluent.Kafka (>= 1.4.3)
- EventStreamProcessing.Abstractions (>= 1.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 3.1.5)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
1.0.0 | 1,527 | 6/25/2020 |