EventStreamProcessing.Kafka 1.0.0

dotnet add package EventStreamProcessing.Kafka --version 1.0.0                
NuGet\Install-Package EventStreamProcessing.Kafka -Version 1.0.0                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="EventStreamProcessing.Kafka" Version="1.0.0" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add EventStreamProcessing.Kafka --version 1.0.0                
#r "nuget: EventStreamProcessing.Kafka, 1.0.0"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install EventStreamProcessing.Kafka as a Cake Addin
#addin nuget:?package=EventStreamProcessing.Kafka&version=1.0.0

// Install EventStreamProcessing.Kafka as a Cake Tool
#tool nuget:?package=EventStreamProcessing.Kafka&version=1.0.0                

Event Stream Processing Micro-Framework

Single event stream processing micro-framework for Apache Kafka using .NET Core

Introduction

This framework provides a set of interfaces and abstract base classes for building an event stream processing pipeline. These are contained in the EventStreamProcessing.Abstractions package, are generic in nature, and are not tied to any one streaming platform, such as Apache Kafka. To use these abstractions simply create a class that extends EventProcessor and supply the required consumers, producers and message handlers.

While the abstractions are not coupled to any streaming platform, the EventStreamProcessing.Kafka package provides an implementation that uses the Confluent.Kafka package to read and write event streams using Apache Kafka.

Sample Description

The best way to become familiar with this framework is to examine the EventStreamProcessing.Sample.Worker project in the samples folder. You can use Docker to run a local instance of the Kafka broker, then run the sample worker, consumer and producer apps.

Here is a diagram depicting how an event stream is processed by the Sample Worker service to validate, enrich and filter messages before writing them back to Kafka.

  1. The Sample Producer console app lets the user write a stream of events to the Kafka broker using the "raw-events" topic. The numeral represents the event key, and the text "Hello World" presents the event value.
  2. The Sample Worker service injects an IEventProcessor into the KafkaWorker class constuctor. Then ExecuteAsync method calls eventProcessor.Process in a while loop until the operation is cancelled.
  3. The Program.CreateHostBuilder method registers an IEventProcessor for dependency injection with a KafkaEventProcessor that uses KafkaEventConsumer, KafkaEventProducer and an array of MessageHandler with ValidationHandler, EnrichmentHandler and FilterHandler.
// Add event processor
services.AddSingleton<IEventProcessor>(sp =>
{
    // Create logger, consumer, producers
    var logger = sp.GetRequiredService<ILogger>();
    var kafkaConsumer = KafkaUtils.CreateConsumer(
        consumerOptions.Brokers, consumerOptions.TopicsList,
        sp.GetRequiredService<ILogger>());
    var producerOptions = sp.GetRequiredService<ProducerOptions>();
    var kafkaErrorProducer = KafkaUtils.CreateProducer(
        producerOptions.Brokers, producerOptions.ValidationTopic,
        sp.GetRequiredService<ILogger>());
    var kafkaFinalProducer = KafkaUtils.CreateProducer(
        producerOptions.Brokers, producerOptions.FinalTopic,
        sp.GetRequiredService<ILogger>());

    // Create handlers
    var handlers = new List<MessageHandler>
    {
        new ValidationHandler(
            sp.GetRequiredService<IDictionary<int, string>>(),
            new KafkaEventProducer<int, string>(kafkaErrorProducer, producerOptions.ValidationTopic, logger),
            logger),
        new EnrichmentHandler(
            sp.GetRequiredService<IDictionary<int, string>>(), logger),
        new FilterHandler(
            m => !m.Value.Contains("Hello"), logger) // Filter out English greetings
    };

    // Create event processor
    return new KafkaEventProcessor<int, string, int, string>(
        new KafkaEventConsumer<int, string>(kafkaConsumer, logger),
        new KafkaEventProducer<int, string>(kafkaFinalProducer, producerOptions.FinalTopic, logger),
        handlers.ToArray());
});
  1. The KafkaEventConsumer in Sample Worker subscribes to the "raw-events" topic of the Kafka broker running on localhost:9092. The message handlers validate, enrich and filter the events one at a time. If there are validation errors, those are written back to Kafka with a "validation-errors" topic. This takes place if the message key does not correlate to a key in the language store. The EnrichmentHandler looks up a translation for "Hello" in the language store and transforms the message with the selected translation. The FilterHandler accepts a lambda expression for filtering messages. In this case the English phrase "Hello" is filtered out. Lastly, the KafkaEventProducer writes processed events back to Kafka using the "final-events" topic.
  2. The Sample Consumer console app reads the "validation-errors" and "final-events" topics, displaying them in the console.
Product Compatible and additional computed target framework versions.
.NET net5.0 was computed.  net5.0-windows was computed.  net6.0 was computed.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
.NET Core netcoreapp2.0 was computed.  netcoreapp2.1 was computed.  netcoreapp2.2 was computed.  netcoreapp3.0 was computed.  netcoreapp3.1 was computed. 
.NET Standard netstandard2.0 is compatible.  netstandard2.1 was computed. 
.NET Framework net461 was computed.  net462 was computed.  net463 was computed.  net47 was computed.  net471 was computed.  net472 was computed.  net48 was computed.  net481 was computed. 
MonoAndroid monoandroid was computed. 
MonoMac monomac was computed. 
MonoTouch monotouch was computed. 
Tizen tizen40 was computed.  tizen60 was computed. 
Xamarin.iOS xamarinios was computed. 
Xamarin.Mac xamarinmac was computed. 
Xamarin.TVOS xamarintvos was computed. 
Xamarin.WatchOS xamarinwatchos was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
1.0.0 1,527 6/25/2020