Confluent.Kafka.Dataflow 2.0.0

There is a newer version of this package available.
See the version list below for details.
dotnet add package Confluent.Kafka.Dataflow --version 2.0.0                
NuGet\Install-Package Confluent.Kafka.Dataflow -Version 2.0.0                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Confluent.Kafka.Dataflow" Version="2.0.0" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add Confluent.Kafka.Dataflow --version 2.0.0                
#r "nuget: Confluent.Kafka.Dataflow, 2.0.0"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install Confluent.Kafka.Dataflow as a Cake Addin
#addin nuget:?package=Confluent.Kafka.Dataflow&version=2.0.0

// Install Confluent.Kafka.Dataflow as a Cake Tool
#tool nuget:?package=Confluent.Kafka.Dataflow&version=2.0.0                

Kafka Dataflow

An extension of Confluent's Kafka client for use with System.Threading.Tasks.Dataflow.

Features

  • Represent consumers/producers as dataflow blocks.
  • Process Kafka messages using a dataflow pipeline.
  • Configure transactional producing and EoS stream processing.

Installation

Add the NuGet package to your project:

$ dotnet add package kafka-dataflow

Usage

Consuming using ISourceBlock<T>

Use IConsumer<TKey,TValue>.AsSourceBlock(...) to initialize a Kafka message pipeline.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Confluent.Kafka;
using Confluent.Kafka.Dataflow;

using var consumer = new ConsumerBuilder<string, string>(
    new ConsumerConfig
    {
        BootstrapServers = "localhost:9092",
        GroupId = "example",
    }).Build();

consumer.Subscribe("my-topic");

// Define a target block to process Kafka messages.
var processor = new ActionBlock<Message<string, string>>(
    message => Console.WriteLine($"Message received: {message.Timestamp}"));

// Initialize source and link to target.
var blockOptions = new DataflowBlockOptions
{
    // It's a good idea to limit buffered messages (in case processing falls behind).
    // Otherwise, all messages are offered as soon as they are available.
    BoundedCapacity = 8,
};

var source = consumer.AsSourceBlock(options: blockOptions);
source.LinkTo(processor, new DataflowLinkOptions { PropagateCompletion = true });

// Optionally, request to stop processing.
await Task.Delay(10000);
source.Complete();

// Wait for processing to finish.
await processor.Completion;
consumer.Close();

Committing message offsets

The Kafka client auto-commits periodically by default. It can automatically store/queue messages for the next commit as soon as they are loaded into memory.

Alternatively, you can set enable.auto.offset.store to false and store offsets manually after processing is finished. This prevents unprocessed messages from being committed in exceptional scenarios.


// Use a transform block to emit processed messages.
var processor = new TransformBlock<Message<string, string>, Message<string, string>>(
    async message =>
    {
        // Process message asynchronously.
        // ...
        return message;
    },
    new ExecutionDataflowBlockOptions
    {
        // Parallelism is OK as long as order is preserved.
        MaxDegreeOfParallelism = 8,
        EnsureOrdered = true,
    });

// Link the processor to the source and commit target.
var source = consumer.AsSourceBlock(out commitTarget, options: blockOptions);

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
source.LinkTo(processor, linkOptions);
processor.LinkTo(commitTarget, linkOptions);

Producing using ITargetBlock<T>

Use IProducer<TKey, TValue>.AsTargetBlock(...) to direct a message pipeline into a destination Kafka topic:

using System.Threading.Tasks.Dataflow;
using Confluent.Kafka;
using Confluent.Kafka.Dataflow;

using var producer = new ProducerBuilder<string, string>(
    new ProducerConfig
    {
        BootstrapServers = "localhost:9092",
    }).Build();

var target = producer.AsTargetBlock(new TopicPartition("my-topic", Partition.Any));

var generator = new TransformBlock<int, Message<string, string>>(
    i => new Message<string, string>
    {
        Key = i.ToString(),
        Value = $"Value #{i}"
    });

generator.LinkTo(target, new DataflowLinkOptions { PropagateCompletion = true });

for (var i = 0; i < 10; i++)
{
    generator.Post(i);
}

generator.Complete();
await target.Completion;

Transactions and stream processing using TargetBuilder<T>

Kafka supports producing messages and committing offsets in transactions. A common use case is for "stream processing," so a processed message offset is committed atomically with the corresponding messages it produced ("exactly once semantics").

Representing all data you want included within the same transaction as a custom data type, you can use TargetBuilder<T> to create a single composite target:


// Payment and shipping requests should be created transactionally.
class OrderData
{
    public OrderData(PaymentRequest payment, ShippingRequest shipping)
    {
        this.Payment = payment;
        this.Shipping = shipping;
    }

    public PaymentRequest Payment { get; }

    public ShippingRequest Shipping { get; }
}

// Configure mappings to Kafka messages to be produced.
var builder = new TargetBuilder<OrderData>()
    .WithMessages(
        myProducer,
        order => new[] { new Message<Null, PaymentRequest> { Value = order.Payment } },
        new TopicPartition("payment-requests", Partition.Any))
    .WithMessages(
        myProducer,
        order => new[] { new Message<Null, ShippingRequest> { Value = order.Shipping } },
        new TopicPartition("shipping-requests", Partition.Any));

// Optionally, link a Kafka source to include the processed offset for each order.
// Payment and shipping will be created if and only if the order is committed.
builder.AsStream(myConsumer, out var orderSource)
var orderProcessor = new TransformBlock<OrderRequest, OrderData>(this.ProcessOrder);
orderSource.LinkTo(orderProcessor, LinkOptions);

// Finish linking Kafka stream. 
var orderTarget = builder.Build();
orderProcessor.LinkTo(orderTarget, LinkOptions);

await orderTarget.Completion;
Product Compatible and additional computed target framework versions.
.NET net5.0 is compatible.  net5.0-windows was computed.  net6.0 was computed.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
.NET Core netcoreapp2.0 was computed.  netcoreapp2.1 was computed.  netcoreapp2.2 was computed.  netcoreapp3.0 was computed.  netcoreapp3.1 was computed. 
.NET Standard netstandard2.0 is compatible.  netstandard2.1 was computed. 
.NET Framework net461 was computed.  net462 was computed.  net463 was computed.  net47 was computed.  net471 was computed.  net472 was computed.  net48 was computed.  net481 was computed. 
MonoAndroid monoandroid was computed. 
MonoMac monomac was computed. 
MonoTouch monotouch was computed. 
Tizen tizen40 was computed.  tizen60 was computed. 
Xamarin.iOS xamarinios was computed. 
Xamarin.Mac xamarinmac was computed. 
Xamarin.TVOS xamarintvos was computed. 
Xamarin.WatchOS xamarinwatchos was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
2.0.1 3,600 11/4/2021
2.0.0 326 11/3/2021
1.0.0 453 5/24/2021
0.1.4 531 3/31/2021
0.1.3 371 3/26/2021
0.1.2 391 3/22/2021
0.1.1 431 2/28/2021
0.1.0 393 2/24/2021