Net.Kafka.ReactiveOrm
1.0.1
dotnet add package Net.Kafka.ReactiveOrm --version 1.0.1
NuGet\Install-Package Net.Kafka.ReactiveOrm -Version 1.0.1
<PackageReference Include="Net.Kafka.ReactiveOrm" Version="1.0.1" />
<PackageVersion Include="Net.Kafka.ReactiveOrm" Version="1.0.1" />
<PackageReference Include="Net.Kafka.ReactiveOrm" />
paket add Net.Kafka.ReactiveOrm --version 1.0.1
#r "nuget: Net.Kafka.ReactiveOrm, 1.0.1"
#:package Net.Kafka.ReactiveOrm@1.0.1
#addin nuget:?package=Net.Kafka.ReactiveOrm&version=1.0.1
#tool nuget:?package=Net.Kafka.ReactiveOrm&version=1.0.1
Net.Kafka.ReactiveOrm
Net.Kafka.ReactiveOrm is a lightweight Reactive Object Mapper (ROM) for Kafka in .NET.
It maps Kafka topics to strongly typed, reactive entities inspired by Entity Framework (DbContext
/ DbSet<T>
), enabling declarative subscription and publishing with LINQ-style syntax and reactive patterns.
๐ Key Features
- Entity Mapping via Attributes: Use
[Topic]
attributes to bind C# classes to Kafka topics. - Reactive Subscriptions: Subscribe reactively using
IObservable<T>
,.Where()
,.Subscribe()
. - Declarative Publishing: Publish entities directly with
.Publish(entity)
without dealing with low-level Kafka APIs. - Context API: Manage your Kafka topics via a
KafkaOrmContext
, similar to EF's DbContext. - LINQ Filtering: Filter and transform streams with
.Where()
,.Select()
. - Broker Agnostic: Abstracts the Kafka client, compatible with any standard Kafka broker.
๐ง Use Case Scenarios
- Real-time IoT data processing
- Industry 4.0 monitoring & control
- Edge analytics and transformations
- Reactive home automation systems
๐งฑ Architecture Overview
Component | Description |
---|---|
TopicSet<T> |
Represents a typed topic with publish/subscribe capabilities |
KafkaOrmContext |
Central registry managing all topic sets |
IKafkaBus |
Abstraction over the Kafka client API |
TopicAttribute |
Declares the Kafka topic and consumer group for an entity |
๐ค Philosophy
Instead of low-level Kafka consumers and producers with string topics and byte arrays, Net.Kafka.ReactiveOrm provides a strongly typed, reactive, and declarative API that treats Kafka topics as first-class reactive data streams.
It minimizes boilerplate and lets you focus on your domain logic using familiar C# idioms.
๐ Developer Documentation โ Net.Kafka.ReactiveOrm
โ 1. Introduction
Net.Kafka.ReactiveOrm
simplifies Kafka integration by mapping topics to observable entity streams, enabling LINQ-style reactive subscriptions and declarative publishing.
๐ 2. Installation & Setup
dotnet add package Net.Kafka.ReactiveOrm
dotnet add package Confluent.Kafka
dotnet add package System.Reactive
๐งฉ 3. Defining Kafka Entities
using Net.Kafka.ReactiveOrm.Attributes;
[Topic("orders.created", ConsumerGroup = "order-service")]
public class OrderCreated
{
public int Id { get; set; }
public string Customer { get; set; } = "";
public double Amount { get; set; }
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
}
๐๏ธ 4. Creating the Kafka Context
using Net.Kafka.ReactiveOrm;
public class KafkaContext : KafkaOrmContext
{
[Topic("orders.created", ConsumerGroup = "order-service")]
public TopicSet<OrderCreated> OrdersCreated { get; private set; }
public KafkaContext(IKafkaBus bus) : base(bus)
{
// TopicSets auto-initialized by base
}
}
๐ 5. Subscribing to Messages
var context = new KafkaContext(new KafkaBus("localhost:9092", "order-service"));
context.OrdersCreated
.Where(o => o.Amount > 100)
.Subscribe(o => Console.WriteLine($"Order {o.Id} from {o.Customer} - {o.Amount}"));
๐ค 6. Publishing Messages
await context.OrdersCreated.Publish(new OrderCreated
{
Id = 123,
Customer = "JoeDevSharp",
Amount = 250.5
});
๐งช 7. Full Console Example
using System;
using System.Reactive.Linq;
using System.Threading.Tasks;
using Net.Kafka.ReactiveOrm;
class Program
{
static async Task Main()
{
using var bus = new KafkaBus("localhost:9092", "order-service");
var context = new KafkaContext(bus);
// Reactive subscription filtering by amount
var subscription = context.OrdersCreated
.Where(o => o.Amount > 100)
.Subscribe(o =>
{
Console.WriteLine($"[RECEIVED] Order {o.Id} from {o.Customer} with amount {o.Amount}");
});
// Publish a new order
await context.OrdersCreated.Publish(new OrderCreated
{
Id = 1,
Customer = "JoeDevSharp",
Amount = 150.75
});
Console.WriteLine("[PUBLISHED] New order sent.");
Console.WriteLine("Press Enter to exit...");
Console.ReadLine();
subscription.Dispose();
}
}
โ๏ธ Kafka Local Setup (Docker)
docker run -d --name zookeeper -p 2181:2181 confluentinc/cp-zookeeper:latest
docker run -d --name kafka -p 9092:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=localhost:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka:latest
๐ Best Practices
- Ensure Kafka is running before starting the app.
- Filter aggressively with
.Where()
to reduce message processing overhead. - Dispose subscriptions when no longer needed.
- Use consumer groups to balance load among instances.
โ FAQ
Q: Can I use multiple consumer groups? Yes, specify the consumer group in
[Topic]
attribute or inKafkaBus
constructor.
Q: Is this compatible with Kafka cloud providers? Yes, as long as they support standard Kafka protocol and you provide connection config.
Q: How does serialization work? By default, JSON serialization is used for message payloads.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- Confluent.Kafka (>= 2.11.0)
- System.Reactive (>= 6.0.1)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last Updated |
---|---|---|
1.0.1 | 68 | 7/5/2025 |