Confluent.Kafka.Dataflow
2.0.1
dotnet add package Confluent.Kafka.Dataflow --version 2.0.1
NuGet\Install-Package Confluent.Kafka.Dataflow -Version 2.0.1
<PackageReference Include="Confluent.Kafka.Dataflow" Version="2.0.1" />
paket add Confluent.Kafka.Dataflow --version 2.0.1
#r "nuget: Confluent.Kafka.Dataflow, 2.0.1"
// Install Confluent.Kafka.Dataflow as a Cake Addin #addin nuget:?package=Confluent.Kafka.Dataflow&version=2.0.1 // Install Confluent.Kafka.Dataflow as a Cake Tool #tool nuget:?package=Confluent.Kafka.Dataflow&version=2.0.1
Kafka Dataflow
An extension of Confluent's Kafka client for use with System.Threading.Tasks.Dataflow
.
Features
- Represent consumers/producers as dataflow blocks.
- Process Kafka messages using a dataflow pipeline.
- Configure transactional producing and EoS stream processing.
Installation
Add the NuGet package to your project:
$ dotnet add package kafka-dataflow
Usage
Consuming using ISourceBlock<T>
Use IConsumer<TKey,TValue>.AsSourceBlock(...)
to initialize a Kafka message pipeline.
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Confluent.Kafka;
using Confluent.Kafka.Dataflow;
using var consumer = new ConsumerBuilder<string, string>(
new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "example",
}).Build();
consumer.Subscribe("my-topic");
// Define a target block to process Kafka messages.
var processor = new ActionBlock<Message<string, string>>(
message => Console.WriteLine($"Message received: {message.Timestamp}"));
// Initialize source and link to target.
var blockOptions = new DataflowBlockOptions
{
// It's a good idea to limit buffered messages (in case processing falls behind).
// Otherwise, all messages are offered as soon as they are available.
BoundedCapacity = 8,
};
var source = consumer.AsSourceBlock(options: blockOptions);
source.LinkTo(processor, new DataflowLinkOptions { PropagateCompletion = true });
// Optionally, request to stop processing.
await Task.Delay(10000);
source.Complete();
// Wait for processing to finish.
await processor.Completion;
consumer.Close();
Committing message offsets
The Kafka client auto-commits periodically by default. It can automatically store/queue messages for the next commit as soon as they are loaded into memory.
Alternatively, you can set enable.auto.offset.store
to false
and store offsets manually after processing is finished. This prevents unprocessed messages from being committed in exceptional scenarios.
// Use a transform block to emit processed messages.
var processor = new TransformBlock<Message<string, string>, Message<string, string>>(
async message =>
{
// Process message asynchronously.
// ...
return message;
},
new ExecutionDataflowBlockOptions
{
// Parallelism is OK as long as order is preserved.
MaxDegreeOfParallelism = 8,
EnsureOrdered = true,
});
// Link the processor to the source and commit target.
var source = consumer.AsSourceBlock(out commitTarget, options: blockOptions);
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
source.LinkTo(processor, linkOptions);
processor.LinkTo(commitTarget, linkOptions);
Producing using ITargetBlock<T>
Use IProducer<TKey, TValue>.AsTargetBlock(...)
to direct a message pipeline into a destination Kafka topic:
using System.Threading.Tasks.Dataflow;
using Confluent.Kafka;
using Confluent.Kafka.Dataflow;
using var producer = new ProducerBuilder<string, string>(
new ProducerConfig
{
BootstrapServers = "localhost:9092",
}).Build();
var target = producer.AsTargetBlock(new TopicPartition("my-topic", Partition.Any));
var generator = new TransformBlock<int, Message<string, string>>(
i => new Message<string, string>
{
Key = i.ToString(),
Value = $"Value #{i}"
});
generator.LinkTo(target, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 10; i++)
{
generator.Post(i);
}
generator.Complete();
await target.Completion;
Transactions and stream processing using TargetBuilder<T>
Kafka supports producing messages and committing offsets in transactions. A common use case is for "stream processing," so a processed message offset is committed atomically with the corresponding messages it produced ("exactly once semantics").
Representing all data you want included within the same transaction as a custom data type, you can use TargetBuilder<T>
to create a single composite target:
// Payment and shipping requests should be created transactionally.
class OrderData
{
public OrderData(PaymentRequest payment, ShippingRequest shipping)
{
this.Payment = payment;
this.Shipping = shipping;
}
public PaymentRequest Payment { get; }
public ShippingRequest Shipping { get; }
}
// Configure mappings to Kafka messages to be produced.
var builder = new TargetBuilder<OrderData>()
.WithMessages(
myProducer,
order => new[] { new Message<Null, PaymentRequest> { Value = order.Payment } },
new TopicPartition("payment-requests", Partition.Any))
.WithMessages(
myProducer,
order => new[] { new Message<Null, ShippingRequest> { Value = order.Shipping } },
new TopicPartition("shipping-requests", Partition.Any));
// Optionally, link a Kafka source to include the processed offset for each order.
// Payment and shipping will be created if and only if the order is committed.
builder.AsStream(myConsumer, out var orderSource)
var orderProcessor = new TransformBlock<OrderRequest, OrderData>(this.ProcessOrder);
orderSource.LinkTo(orderProcessor, LinkOptions);
// Finish linking Kafka stream.
var orderTarget = builder.Build();
orderProcessor.LinkTo(orderTarget, LinkOptions);
await orderTarget.Completion;
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net5.0 is compatible. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
.NET Core | netcoreapp2.0 was computed. netcoreapp2.1 was computed. netcoreapp2.2 was computed. netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
.NET Standard | netstandard2.0 is compatible. netstandard2.1 was computed. |
.NET Framework | net461 was computed. net462 was computed. net463 was computed. net47 was computed. net471 was computed. net472 was computed. net48 was computed. net481 was computed. |
MonoAndroid | monoandroid was computed. |
MonoMac | monomac was computed. |
MonoTouch | monotouch was computed. |
Tizen | tizen40 was computed. tizen60 was computed. |
Xamarin.iOS | xamarinios was computed. |
Xamarin.Mac | xamarinmac was computed. |
Xamarin.TVOS | xamarintvos was computed. |
Xamarin.WatchOS | xamarinwatchos was computed. |
-
.NETStandard 2.0
- Confluent.Kafka (>= 1.8.2)
- System.Threading.Tasks.Dataflow (>= 5.0.0)
-
net5.0
- Confluent.Kafka (>= 1.8.2)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.