ksqlDb.RestApi.Client
3.2.0
See the version list below for details.
dotnet add package ksqlDb.RestApi.Client --version 3.2.0
NuGet\Install-Package ksqlDb.RestApi.Client -Version 3.2.0
<PackageReference Include="ksqlDb.RestApi.Client" Version="3.2.0" />
paket add ksqlDb.RestApi.Client --version 3.2.0
#r "nuget: ksqlDb.RestApi.Client, 3.2.0"
// Install ksqlDb.RestApi.Client as a Cake Addin #addin nuget:?package=ksqlDb.RestApi.Client&version=3.2.0 // Install ksqlDb.RestApi.Client as a Cake Tool #tool nuget:?package=ksqlDb.RestApi.Client&version=3.2.0
This package enables the generation of KSQL push and pull queries from LINQ queries in your .NET C# code. It allows you to perform server-side filtering, projection, limiting, and other operations on push notifications using ksqlDB push queries. You can continually process computations over unbounded (potentially never-ending) streams of data. It also allows you to execute SQL statements via the REST API such as inserting records into streams and creating tables, types, etc. or execute admin operations such as listing streams.
ksqlDB.RestApi.Client is a contribution to Confluent ksqldb-clients
Install with NuGet package manager:
Install-Package ksqlDB.RestApi.Client
or with .NET CLI
dotnet add package ksqlDB.RestApi.Client
This adds a <PackageReference>
to your csproj file, similar to the following:
<PackageReference Include="ksqlDB.RestApi.Client" Version="3.0.0" />
Alternative option is to use Protobuf content type:
dotnet add package ksqlDB.RestApi.Client.ProtoBuf
The following example can be tried out with a .NET interactive Notebook:
using ksqlDB.RestApi.Client.KSql.Linq;
using ksqlDB.RestApi.Client.KSql.Query;
using ksqlDB.RestApi.Client.KSql.Query.Context;
using ksqlDB.RestApi.Client.KSql.Query.Options;
var ksqlDbUrl = @"http://localhost:8088";
var contextOptions = new KSqlDBContextOptions(ksqlDbUrl)
{
ShouldPluralizeFromItemName = true
};
await using var context = new KSqlDBContext(contextOptions);
using var subscription = context.CreateQueryStream<Tweet>()
.WithOffsetResetPolicy(AutoOffsetReset.Latest)
.Where(p => p.Message != "Hello world" || p.Id == 1)
.Select(l => new { l.Message, l.Id })
.Take(2)
.Subscribe(tweetMessage =>
{
Console.WriteLine($"{nameof(Tweet)}: {tweetMessage.Id} - {tweetMessage.Message}");
}, error => { Console.WriteLine($"Exception: {error.Message}"); }, () => Console.WriteLine("Completed"));
Console.WriteLine("Press any key to stop the subscription");
Console.ReadKey();
public class Tweet : Record
{
public int Id { get; set; }
public string Message { get; set; }
}
An entity class in ksqlDB.RestApi.Client represents the structure of a table or stream. An instance of the class represents a record in that stream while properties are mapped to columns respectively.
LINQ code written in C# from the sample is equivalent to this KSQL query:
SELECT Message, Id
FROM Tweets
WHERE Message != 'Hello world' OR Id = 1
EMIT CHANGES
LIMIT 2;
In the provided C# code snippet, most of the code executes on the server side except for the IQbservable<TEntity>.Subscribe
extension method. This method is responsible for subscribing to your ksqlDB
stream, which is created using the following approach:
using ksqlDB.RestApi.Client.KSql.RestApi.Http;
using ksqlDB.RestApi.Client.KSql.RestApi.Statements;
using ksqlDB.RestApi.Client.KSql.RestApi;
using ksqlDB.Api.Client.Samples.Models;
EntityCreationMetadata metadata = new()
{
KafkaTopic = nameof(Tweet),
Partitions = 3,
Replicas = 3
};
var httpClient = new HttpClient()
{
BaseAddress = new Uri(@"http://localhost:8088")
};
var httpClientFactory = new HttpClientFactory(httpClient);
var restApiClient = new KSqlDbRestApiClient(httpClientFactory);
var httpResponseMessage = await restApiClient.CreateOrReplaceStreamAsync<Tweet>(metadata);
CreateOrReplaceStreamAsync
executes the following statement:
CREATE OR REPLACE STREAM Tweets (
Id INT,
Message VARCHAR
) WITH ( KAFKA_TOPIC='Tweet', VALUE_FORMAT='Json', PARTITIONS='3', REPLICAS='3' );
Execute the following insert statements to publish messages using your ksqldb-cli
docker exec -it $(docker ps -q -f name=ksqldb-cli) ksql http://ksqldb-server:8088
INSERT INTO tweets (id, message) VALUES (1, 'Hello world');
INSERT INTO tweets (id, message) VALUES (2, 'ksqlDB rulez!');
or insert a record from C#:
var responseMessage = await new KSqlDbRestApiClient(httpClientFactory)
.InsertIntoAsync(new Tweet { Id = 2, Message = "ksqlDB rulez!" });
or with KSqlDbContext:
await using var context = new KSqlDBContext(ksqlDbUrl);
context.Add(new Tweet { Id = 1, Message = "Hello world" });
context.Add(new Tweet { Id = 2, Message = "ksqlDB rulez!" });
var saveChangesResponse = await context.SaveChangesAsync();
Sample projects can be found under Samples solution folder in ksqlDB.RestApi.Client.sln
External dependencies:
- kafka broker and ksqlDB-server 0.14.0
- the solution requires Docker desktop and Visual Studio 2019
- .NET 6.0
Clone the repository
git clone https://github.com/tomasfabian/ksqlDB.RestApi.Client-DotNet.git
CD to Samples
CD Samples\ksqlDB.RestApi.Client.Sample\
run in command line:
docker compose up -d
AspNet Blazor server side sample:
In Blazor Server, the application logic and UI rendering occur on the server. The client's web browser receives updates and UI changes through a SignalR connection.
This ensures smooth integration with the ksqlDB.RestApi.Client
library, allowing the Kafka broker and ksqlDB to remain hidden from direct exposure to clients.
- set
docker-compose.csproj
as startup project in InsideOut.sln for an embedded Kafka connect integration and stream processing examples.
IQbservable<T>
extension methods
As depicted bellow IObservable<T>
is the dual of IEnumerable<T>
and IQbservable<T>
is the dual of IQueryable<T>
. In all four cases LINQ providers are using deferred execution.
While the first two are executed locally the latter two are executed server side. The server side execution is possible thanks to traversing ASTs (Abstract Syntax Trees) with visitors. The KSqlDbProvider
will create the KSQL syntax for you from expression trees and pass it along to ksqlDB.
Both IObservable<T>
and IQbservable<T>
represent push-based sequences of asynchronous and potentially infinite events, while IEnumerable<T>
and IQueryable<T>
represent collections or pull-based sequences of items that can be iterated or queried, respectively.
<img src="https://www.codeproject.com/KB/cs/646361/WhatHowWhere.jpg" />
List of supported push query extension methods:
Register the KSqlDbContext
IKSqlDBContext
and IKSqlDbRestApiClient
can be provided with dependency injection. These services can be registered during app startup and components that require these services, are provided with these services via constructor parameters.
To register KsqlDbContext
as a service, open Program.cs
, and add the lines to the ConfigureServices
method shown bellow or see some more details in the workshop:
using ksqlDB.RestApi.Client.Sensors;
using ksqlDB.RestApi.Client.KSql.Query.Options;
using ksqlDb.RestApi.Client.DependencyInjection;
using ksqlDB.RestApi.Client.Sensors.KSqlDb;
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
var ksqlDbUrl = @"http://localhost:8088";
services.AddDbContext<ISensorsKSqlDbContext, SensorsKSqlDbContext>(
options =>
{
var setupParameters = options.UseKSqlDb(ksqlDbUrl);
setupParameters.SetAutoOffsetReset(AutoOffsetReset.Earliest);
}, ServiceLifetime.Transient, restApiLifetime: ServiceLifetime.Transient);
})
.Build();
await host.RunAsync();
Setting query parameters
Default settings: 'auto.offset.reset' is set to 'earliest' by default. New parameters could be added or existing ones changed in the following manner:
var contextOptions = new KSqlDBContextOptions(@"http://localhost:8088");
contextOptions.QueryStreamParameters["auto.offset.reset"] = "latest";
Overriding stream names
Stream names are generated based on the generic record types. They are pluralized with Pluralize.NET package.
By default the generated from item names such as stream and table names are pluralized. This behavior could be switched off with the following ShouldPluralizeStreamName
configuration.
context.CreateQueryStream<Person>();
FROM People
This can be disabled:
var contextOptions = new KSqlDBContextOptions(@"http://localhost:8088")
{
ShouldPluralizeFromItemName = false
};
new KSqlDBContext(contextOptions).CreateQueryStream<Person>();
FROM Person
Setting an arbitrary stream name (from_item name):
context.CreateQueryStream<Tweet>("custom_topic_name");
FROM custom_topic_name
Aggregation functions
List of supported ksqldb aggregation functions:
List of supported data types:
- Supported data types mapping
- Structs
- Maps
- Time types DATE, TIME AND TIMESTAMP
- System.GUID as ksqldb VARCHAR type
List of supported Joins:
- RightJoin
- Full Outer Join
- Left Join
- Inner Joins
- Multiple joins with query comprehension syntax (GroupJoin, SelectMany, DefaultIfEmpty)
List of supported pull query extension methods:
List of supported ksqlDB SQL statements:
- Pause and resume persistent qeries
- Added support for extracting field names and values (for insert and select statements)
- Assert topics
- Assert schemas
- Rename stream or table column names with the
JsonPropertyNameAttribute
- Create source streams and tables
- InsertIntoAsync
- Connectors
- Drop a stream
- Drop type
- Creating types
- Execute statement async
- PartitionBy
- Terminate push queries
- Drop a table
- Creating connectors
- Get topics
- Getting queries and termination of persistent queries
- Execute statements
- Create or replace table statements
- Creating streams and tables
- Get streams
- Get tables
KSqlDbContext
- Dependency injection with ServicesCollection
- Creating query streams
- Creating queries
- AddDbContext and AddDbContextFactory
- Logging info and ConfigureKSqlDb
- Basic auth
- Add and SaveChangesAsync
- KSqlDbContextOptionsBuilder
Config
Operators
- Operator LIKE
- Operator IN
- Operator BETWEEN
- Operator CASE
- Arithmetic operations on columns
- Lexical precedence
- WHERE IS NULL, IS NOT NULL
Data definitions
Miscelenaous
- Change data capture
- List of breaking changes
- Operators
- Invocation functions
- Setting JsonSerializerOptions
- Kafka stream processing example
- ksqlDB streams and tables
Functions
- String functions
- Numeric functions
- Date and time functions
- Lambda functions (Invocation functions) - Maps
LinqPad samples
Nuget
https://www.nuget.org/packages/ksqlDB.RestApi.Client/
ksqldb links
Acknowledgements:
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net5.0 was computed. net5.0-windows was computed. net6.0 is compatible. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 is compatible. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
.NET Core | netcoreapp2.0 was computed. netcoreapp2.1 was computed. netcoreapp2.2 was computed. netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
.NET Standard | netstandard2.0 is compatible. netstandard2.1 was computed. |
.NET Framework | net461 was computed. net462 was computed. net463 was computed. net47 was computed. net471 was computed. net472 was computed. net48 was computed. net481 was computed. |
MonoAndroid | monoandroid was computed. |
MonoMac | monomac was computed. |
MonoTouch | monotouch was computed. |
Tizen | tizen40 was computed. tizen60 was computed. |
Xamarin.iOS | xamarinios was computed. |
Xamarin.Mac | xamarinmac was computed. |
Xamarin.TVOS | xamarintvos was computed. |
Xamarin.WatchOS | xamarinwatchos was computed. |
-
.NETStandard 2.0
- Microsoft.Extensions.DependencyInjection (>= 7.0.0)
- Microsoft.Extensions.Http (>= 7.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 7.0.0)
- Pluralize.NET (>= 1.0.2)
- System.Interactive.Async (>= 6.0.1)
- System.Reactive (>= 6.0.0)
- System.Text.Json (>= 7.0.0)
-
net6.0
- Microsoft.Extensions.DependencyInjection (>= 7.0.0)
- Microsoft.Extensions.Http (>= 7.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 7.0.0)
- Pluralize.NET (>= 1.0.2)
- System.Interactive.Async (>= 6.0.1)
- System.Reactive (>= 6.0.0)
- System.Text.Json (>= 7.0.0)
-
net7.0
- Microsoft.Extensions.DependencyInjection (>= 7.0.0)
- Microsoft.Extensions.Http (>= 7.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 7.0.0)
- Pluralize.NET (>= 1.0.2)
- System.Interactive.Async (>= 6.0.1)
- System.Reactive (>= 6.0.0)
- System.Text.Json (>= 7.0.0)
NuGet packages (2)
Showing the top 2 NuGet packages that depend on ksqlDb.RestApi.Client:
Package | Downloads |
---|---|
SqlServer.Connector
SqlServer.Connector is a C# / .NET client API for consuming row-level table changes (CDC - Change Data Capture) from a SQL Server database with the Debezium connector streaming platform. With Kafka Connect and Debezium connectors you can stream data to and from Kafka and use it as your integral component of the ETL pipeline or create materialized views (caches) where it is needed, precompute the results of a query and store them for fast read access. See also https://www.nuget.org/packages/ksqlDb.RestApi.Client/ Targets .NET 5, .NET Core 3.1 and .NET Standard 2.0. Documentation for the library can be found at https://github.com/tomasfabian/ksqlDb.RestApi.Client-DotNet/blob/main/SqlServer.Connector/Wiki.md. |
|
ksqlDb.RestApi.Client.ProtoBuf
ksqlDB.RestApi.Client.ProtoBuf adds support for Protobuf content type. ksqlDB.RestApi.Client is a C# LINQ-enabled client API for issuing and consuming ksqlDB push queries. Targets .NET 6, .NET 7, and .NET 8. Documentation for the library can be found at https://github.com/tomasfabian/ksqlDB.RestApi.Client-DotNet/blob/main/README.md. |
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
6.5.0 | 1,161 | 10/27/2024 |
6.4.0 | 1,862 | 10/15/2024 |
6.3.0 | 531 | 10/13/2024 |
6.3.0-rc.1 | 53 | 10/11/2024 |
6.2.1 | 4,626 | 9/9/2024 |
6.2.0 | 4,562 | 8/3/2024 |
6.1.0 | 11,229 | 6/7/2024 |
6.1.0-rc.2 | 62 | 6/1/2024 |
6.1.0-rc.1 | 63 | 5/31/2024 |
6.0.2 | 1,368 | 5/29/2024 |
6.0.1 | 3,186 | 5/10/2024 |
6.0.0 | 1,797 | 4/30/2024 |
6.0.0-rc.2 | 70 | 4/26/2024 |
6.0.0-rc.1 | 49 | 4/24/2024 |
5.1.0 | 767 | 4/24/2024 |
5.0.0 | 2,272 | 4/15/2024 |
5.0.0-rc.3 | 62 | 4/11/2024 |
5.0.0-rc.2 | 66 | 4/6/2024 |
5.0.0-rc.1 | 68 | 4/5/2024 |
4.0.2 | 6,503 | 4/9/2024 |
4.0.1 | 2,581 | 3/22/2024 |
4.0.0 | 1,692 | 3/19/2024 |
4.0.0-rc.4 | 67 | 3/13/2024 |
4.0.0-rc.3 | 102 | 3/10/2024 |
4.0.0-rc.2 | 59 | 3/10/2024 |
4.0.0-rc.1 | 66 | 3/9/2024 |
3.6.2 | 1,646 | 3/9/2024 |
3.6.1 | 6,538 | 3/4/2024 |
3.6.0 | 374 | 3/1/2024 |
3.6.0-rc.2 | 161 | 2/21/2024 |
3.5.0 | 5,024 | 2/1/2024 |
3.4.0 | 5,775 | 12/6/2023 |
3.3.0 | 4,341 | 11/15/2023 |
3.2.2 | 6,328 | 9/12/2023 |
3.2.1 | 124,823 | 8/25/2023 |
3.2.0 | 4,630 | 7/14/2023 |
3.1.0 | 4,263 | 6/17/2023 |
3.0.1 | 7,549 | 3/30/2023 |
3.0.0 | 6,373 | 2/25/2023 |
3.0.0-rc.2 | 421 | 2/13/2023 |
3.0.0-rc.1 | 121 | 2/11/2023 |
2.7.0 | 23,249 | 1/14/2023 |
2.7.0-rc.2 | 377 | 1/10/2023 |
2.7.0-rc.1 | 388 | 1/2/2023 |
2.6.0 | 3,862 | 12/23/2022 |
2.5.2 | 2,482 | 12/8/2022 |
2.5.1 | 2,469 | 11/17/2022 |
2.5.0 | 2,996 | 11/3/2022 |
2.5.0-rc1 | 862 | 10/27/2022 |
2.4.0 | 6,700 | 9/26/2022 |
2.4.0-rc.3 | 124 | 9/26/2022 |
2.4.0-rc.2 | 133 | 9/25/2022 |
2.4.0-rc.1 | 113 | 9/21/2022 |
2.3.2 | 2,595 | 9/8/2022 |
2.3.1 | 9,330 | 8/17/2022 |
2.3.1-rc.1 | 252 | 8/5/2022 |
2.3.0 | 2,243 | 8/5/2022 |
2.3.0-rc.3 | 119 | 8/2/2022 |
2.3.0-rc.2 | 146 | 7/29/2022 |
2.2.1 | 2,325 | 7/27/2022 |
2.2.0 | 1,822 | 7/24/2022 |
2.2.0-rc.1 | 169 | 7/23/2022 |
2.1.4 | 2,299 | 7/16/2022 |
2.1.3 | 3,041 | 6/28/2022 |
2.1.1 | 1,517 | 6/23/2022 |
2.1.0 | 1,604 | 6/16/2022 |
2.1.0-rc.1 | 180 | 6/10/2022 |
2.0.1 | 2,066 | 5/27/2022 |
2.0.1-rc.1 | 193 | 5/26/2022 |
2.0.0 | 1,284 | 5/22/2022 |
2.0.0-rc.1 | 140 | 5/21/2022 |
1.7.0-rc.1 | 165 | 5/11/2022 |
1.6.0 | 5,941 | 3/25/2022 |
1.6.0-rc.1 | 184 | 3/20/2022 |
1.5.0 | 7,225 | 12/13/2021 |
1.5.0-rc.1 | 225 | 12/11/2021 |
1.4.0 | 1,825 | 12/1/2021 |
1.4.0-rc.1 | 170 | 11/27/2021 |
1.3.1 | 1,153 | 11/22/2021 |
1.3.0 | 2,111 | 11/20/2021 |
1.3.0-rc.1 | 229 | 11/15/2021 |
1.2.0 | 1,600 | 11/8/2021 |
1.2.0-rc.1 | 204 | 11/5/2021 |
1.1.0 | 1,079 | 11/2/2021 |
1.1.0-rc.1 | 196 | 10/27/2021 |
1.0.0 | 4,896 | 10/19/2021 |
1.0.0-rc.1 | 489 | 10/19/2021 |