Ksql.Linq
0.9.8
See the version list below for details.
dotnet add package Ksql.Linq --version 0.9.8
NuGet\Install-Package Ksql.Linq -Version 0.9.8
<PackageReference Include="Ksql.Linq" Version="0.9.8" />
<PackageVersion Include="Ksql.Linq" Version="0.9.8" />
<PackageReference Include="Ksql.Linq" />
paket add Ksql.Linq --version 0.9.8
#r "nuget: Ksql.Linq, 0.9.8"
#:package Ksql.Linq@0.9.8
#addin nuget:?package=Ksql.Linq&version=0.9.8
#tool nuget:?package=Ksql.Linq&version=0.9.8
Overview
Ksql.Linq is a C# library that unifies Kafka/ksqlDB and Avro/Schema Registry usage. It lets you control Kafka Streams and ksqlDB in a LINQ style and offers the following capabilities.
- Operate Kafka and ksqlDB through a LINQ-based DSL.
- Design type-safe schemas with Avro and Schema Registry.
- Detect Streams/Tables and Pull/Push modes automatically.
- Support operations with DLQ, retry, and commit helpers.
- Self-healing persistent queries: automatically stabilizes CTAS/CSAS queries by retrying, pre-creating internal topics, and recovering from transient errors.
- Market-schedule–aware OHLC bars (support feature): Generate OHLC bars (e.g., 1s/1m/5m/15m/1h) strictly aligned to exchange trading sessions. The engine skips closed hours and holidays, handles DST correctly, and offers gap policies (skip, carry-forward close, or emit sentinel). Pre-/post-market can be toggled per schedule.
Release Notes
Version-specific changes (including v0.9.7 and later) are documented in the Release notes section of this NuGet page.
Documentation
For full documentation, advanced usage, and design notes, see the project wiki:
➡ Ksql.Linq Wiki
https://github.com/synthaicode/Ksql.Linq/wiki
Minimal Quick Start
NOTE: In this repo's docker-compose test environment, use
127.0.0.1:39092(Kafka) /18081(Schema Registry) /18088(ksqlDB).
Samples below align to these ports. Adjust URLs when using external services.
This document is a minimal quick start guide for Ksql.Linq NuGet consumers.
Prerequisites
- .NET 8 SDK
- Kafka / Schema Registry / ksqlDB running
Minimal appsettings.json
{
"KsqlDsl": {
"Common": {
"BootstrapServers": "127.0.0.1:39092",
"ClientId": "my-app"
},
"SchemaRegistry": {
"Url": "http://127.0.0.1:18081"
},
"KsqlDbUrl": "http://127.0.0.1:18088"
}
}
Minimal code (produce / consume)
using Ksql.Linq;
using Ksql.Linq.Core.Attributes;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
[KsqlTopic("quickstart-basic")]
public class Hello
{
public int Id { get; set; }
public string Text { get; set; } = "";
}
public class AppCtx : KsqlContext
{
public AppCtx(IConfiguration cfg, ILoggerFactory? lf = null)
: base(cfg, lf) { }
public EventSet<Hello> Hellos { get; set; } = null!;
protected override void OnModelCreating(IModelBuilder b)
=> b.Entity<Hello>();
}
var cfg = new ConfigurationBuilder()
.AddJsonFile("appsettings.json")
.Build();
await using var ctx = new AppCtx(
cfg,
LoggerFactory.Create(b => b.AddConsole())
);
await ctx.Hellos.AddAsync(new Hello
{
Id = 1,
Text = "Hello Ksql.Linq"
});
await ctx.Hellos.ForEachAsync(m =>
{
Console.WriteLine(m.Text);
return Task.CompletedTask;
});
Notes
Use KsqlDsl:Topics.{name}.Creation.* to control partitions / retention per topic.
For secured clusters, configure SecurityProtocol / Sasl* under KsqlDsl:Common.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- Apache.Avro (>= 1.12.0)
- Confluent.Kafka (>= 2.12.0)
- Confluent.SchemaRegistry (>= 2.12.0)
- Confluent.SchemaRegistry.Serdes.Avro (>= 2.12.0)
- Microsoft.Extensions.Configuration (>= 8.0.0)
- Microsoft.Extensions.Configuration.Abstractions (>= 8.0.0)
- Microsoft.Extensions.Configuration.Json (>= 8.0.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 8.0.2)
- Microsoft.Extensions.Logging (>= 8.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 8.0.2)
- Microsoft.Extensions.Logging.Configuration (>= 8.0.0)
- Microsoft.Extensions.Logging.Console (>= 8.0.0)
- Microsoft.Extensions.Logging.Debug (>= 8.0.0)
- Microsoft.Extensions.Options (>= 8.0.2)
- Streamiz.Kafka.Net (>= 1.7.1)
- Streamiz.Kafka.Net.SchemaRegistry.SerDes.Avro (>= 1.7.1)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.