JohBloch.ConfluentKafka.SchemaRegistryExtClient 1.1.0

There is a newer version of this package available.
See the version list below for details.
dotnet add package JohBloch.ConfluentKafka.SchemaRegistryExtClient --version 1.1.0
                    
NuGet\Install-Package JohBloch.ConfluentKafka.SchemaRegistryExtClient -Version 1.1.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="JohBloch.ConfluentKafka.SchemaRegistryExtClient" Version="1.1.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="JohBloch.ConfluentKafka.SchemaRegistryExtClient" Version="1.1.0" />
                    
Directory.Packages.props
<PackageReference Include="JohBloch.ConfluentKafka.SchemaRegistryExtClient" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add JohBloch.ConfluentKafka.SchemaRegistryExtClient --version 1.1.0
                    
#r "nuget: JohBloch.ConfluentKafka.SchemaRegistryExtClient, 1.1.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package JohBloch.ConfluentKafka.SchemaRegistryExtClient@1.1.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=JohBloch.ConfluentKafka.SchemaRegistryExtClient&version=1.1.0
                    
Install as a Cake Addin
#tool nuget:?package=JohBloch.ConfluentKafka.SchemaRegistryExtClient&version=1.1.0
                    
Install as a Cake Tool

JohBloch.ConfluentKafka.SchemaRegistryExtClient

Extended client library for Confluent Kafka Schema Registry with DI integration, caching, token refresh support, and optional OpenTelemetry metrics.

Features

  • Targets net8.0 and net10.0
  • Easy integration with Microsoft DI (AddSchemaRegistryExtClient)
  • Pluggable schema cache: in-memory by default, or replace with an external/distributed cache (useful for Azure Functions and other scale-out hosts)
  • Optional token refresh (OAuth/custom bearer token) via delegate or DI-provided ITokenManager (keeps a singleton client alive for a long time)
  • Subject naming strategies aligned with Confluent serializer configuration
  • Optional OpenTelemetry-compatible metrics

What is “extended” compared to Confluent’s client?

SchemaRegistryExtClient wraps Confluent’s Schema Registry client and adds:

  • A caching layer (ISchemaCache) so repeated lookups/registrations don’t always hit Schema Registry.
  • A token refresh mechanism (ITokenManager / token refresh delegate) so the client can run as a long-lived singleton without restarting when tokens expire.
  • DI-friendly registration so you can inject ISchemaRegistryExtClient / ISchemaRegistrar in your app.

Caching (in-memory vs external)

By default, AddSchemaRegistryExtClient registers an in-memory cache (InMemorySchemaCache). This is great for single-process apps.

If you run in a scale-out environment (for example Azure Functions with multiple instances), you typically want an external/distributed cache so cache entries are shared across instances. You can do that by implementing ISchemaCache and overriding the DI registration:

services.AddSchemaRegistryExtClient(config, tokenRefreshFunc: null);

// Override the default in-memory cache registration (last registration wins).
services.AddSingleton<ISchemaCache>(sp => new MyExternalSchemaCache(/* Redis, etc. */));

The cache stores schema lookups (by subject/version and by schema id) and supports TTL and negative caching.

Example: Redis-backed cache (StackExchange.Redis)

This is a minimal example of using Redis as the shared cache (useful for Azure Functions / multiple instances).

  1. Add package:
dotnet add package StackExchange.Redis
  1. Register the client, Redis connection, and override ISchemaCache:
using System.Text.Json;
using StackExchange.Redis;
using JohBloch.ConfluentKafka.SchemaRegistryExtClient.Interfaces;
using JohBloch.ConfluentKafka.SchemaRegistryExtClient.Models;

services.AddSchemaRegistryExtClient(config, tokenRefreshFunc: null, configure: opts =>
{
  opts.CacheOptions.TimeToLiveSeconds = 300;
});

services.AddSingleton<IConnectionMultiplexer>(_ => ConnectionMultiplexer.Connect("localhost:6379"));

// Override the default in-memory cache registration (last registration wins).
services.AddSingleton<ISchemaCache>(sp =>
  new RedisSchemaCache(
    sp.GetRequiredService<IConnectionMultiplexer>(),
    sp.GetRequiredService<SchemaClientOptions>().CacheOptions,
    prefix: "schema-registry:"));

sealed class RedisSchemaCache : ISchemaCache
{
  private readonly IDatabase _db;
  private readonly SchemaCacheOptions _options;
  private readonly string _prefix;
  private int _hitCount;
  private int _missCount;

  public RedisSchemaCache(IConnectionMultiplexer mux, SchemaCacheOptions options, string prefix)
  {
    _db = mux.GetDatabase();
    _options = options ?? new SchemaCacheOptions();
    _prefix = string.IsNullOrWhiteSpace(prefix) ? "schema-registry:" : prefix;
  }

  public int HitCount => _hitCount;
  public int MissCount => _missCount;
  public int Count => -1; // Redis doesn't expose a cheap count per prefix
  public event EventHandler<string?>? CacheHit;
  public event EventHandler<string?>? CacheMiss;

  private string K(string key) => _prefix + key;

  public bool TryGet(string? key, out CachedSchemaInfo? schema)
  {
    schema = null;
    if (string.IsNullOrWhiteSpace(key))
    {
      Interlocked.Increment(ref _missCount);
      CacheMiss?.Invoke(this, key);
      return false;
    }

    var val = _db.StringGet(K(key));
    if (!val.HasValue)
    {
      Interlocked.Increment(ref _missCount);
      CacheMiss?.Invoke(this, key);
      return false;
    }

    // Supports negative caching by allowing null payload.
    schema = val == "null" ? null : JsonSerializer.Deserialize<CachedSchemaInfo>(val!);
    Interlocked.Increment(ref _hitCount);
    CacheHit?.Invoke(this, key);
    return true;
  }

  public void Set(string? key, CachedSchemaInfo? schema)
  {
    if (string.IsNullOrWhiteSpace(key)) return;
    var ttl = TimeSpan.FromSeconds(_options.TimeToLiveSeconds);
    var payload = schema == null ? "null" : JsonSerializer.Serialize(schema);
    _db.StringSet(K(key), payload, ttl);
  }

  public void Remove(string? key)
  {
    if (string.IsNullOrWhiteSpace(key)) return;
    _db.KeyDelete(K(key));
  }

  public void Clear()
  {
    // Clearing by prefix requires server-side key scan; keep it host-specific.
    // Many apps skip implementing this in Redis-backed caches.
  }

  public IEnumerable<string> KeysMatchingPrefix(string prefix) => Array.Empty<string>();

  public void Dispose() { }
}

Notes:

  • KeysMatchingPrefix / Clear are intentionally left as no-ops in this minimal sample because prefix scans can be expensive in Redis.
  • For production, prefer key versioning (prefix changes) instead of scanning deletes.

Token refresh (long-lived clients)

Schema Registry auth tokens often expire. If you pass a token refresh delegate (Func<Task<(string token, DateTime expiresAt)>>) or register an ITokenManager, the client will refresh tokens when needed and recreate the underlying Confluent client when the bearer token changes.

This enables running SchemaRegistryExtClient as a singleton in DI for days/weeks without manual recycling.

What if I don't provide a token refresh method?

Nothing special happens: the library will not create/use a TokenManager, and it will call Schema Registry using only what you configured on SchemaRegistryConfig (for example API key / basic auth).

  • If your Schema Registry uses OAuth/Bearer tokens, you must provide tokenRefreshFunc (or register your own ITokenManager) or requests will fail with auth errors.
  • If you use API Key / Basic Auth, you can omit token refresh entirely:
var config = new SchemaRegistryConfig { Url = "https://your-registry" };
config.BasicAuthUserInfo = "<API_KEY>:<API_SECRET>";

services.AddSchemaRegistryExtClient(config, tokenRefreshFunc: null);

Getting Started

See docs/usage.md for examples of connecting to Schema Registry with API Key, OAuth, or custom token refresh.

Using this library from another library (e.g. ConfluentKafkaProducerClient)

It’s safe and common to depend on JohBloch.ConfluentKafka.SchemaRegistryExtClient from another reusable library.

Best practice is:

  • Your reusable library exposes one AddXyz(...) extension method.
  • That method registers dependencies into the host container, but never calls BuildServiceProvider().
  • The final app/host calls AddXyz(...) once.

Example (inside your producer library):

using Confluent.SchemaRegistry;
using Microsoft.Extensions.DependencyInjection;
using JohBloch.ConfluentKafka.SchemaRegistryExtClient.Models;
using JohBloch.ConfluentKafka.SchemaRegistryExtClient.Services;

public sealed class ProducerClientOptions
{
  public required SchemaRegistryConfig SchemaRegistryConfig { get; init; }
  public Func<IServiceProvider, Task<(string token, DateTime expiresAt)>>? SchemaRegistryTokenRefresh { get; init; }
  public Action<SchemaClientOptions>? ConfigureSchemaClient { get; init; }
}

public static class ServiceCollectionExtensions
{
  public static IServiceCollection AddConfluentKafkaProducerClient(
    this IServiceCollection services,
    Action<ProducerClientOptions> configure)
  {
    var opts = new ProducerClientOptionsBuilder().Build(configure);

    // Forward Schema Registry registration to the host container.
    if (opts.SchemaRegistryTokenRefresh != null)
    {
      services.AddSchemaRegistryExtClient(
        opts.SchemaRegistryConfig,
        tokenRefreshFunc: opts.SchemaRegistryTokenRefresh,
        configure: opts.ConfigureSchemaClient);
    }
    else
    {
      services.AddSchemaRegistryExtClient(
        opts.SchemaRegistryConfig,
        tokenRefreshFunc: null,
        configure: opts.ConfigureSchemaClient);
    }

    // Then register your own producer client(s) that depend on ISchemaRegistryExtClient/ISchemaRegistrar.
    // services.AddSingleton<IConfluentKafkaProducerClient, ConfluentKafkaProducerClient>();

    return services;
  }

  private sealed class ProducerClientOptionsBuilder
  {
    public ProducerClientOptions Build(Action<ProducerClientOptions> configure)
      => throw new NotImplementedException();
  }
}

In the final app/host you can now do:

services.AddHttpClient("oauth");

services.AddConfluentKafkaProducerClient(opts =>
{
  opts.SchemaRegistryConfig = new SchemaRegistryConfig { Url = "https://your-registry" };
  opts.SchemaRegistryTokenRefresh = async sp =>
  {
    var http = sp.GetRequiredService<IHttpClientFactory>().CreateClient("oauth");
    // call token endpoint, then return (token, expiresAt)
    return ("token", DateTime.UtcNow.AddHours(1));
  };
  opts.ConfigureSchemaClient = schemaOpts =>
  {
    schemaOpts.LogicalCluster = Environment.GetEnvironmentVariable("OAUTH_LOGICAL_CLUSTER");
    schemaOpts.IdentityPoolId = Environment.GetEnvironmentVariable("OAUTH_IDENTITY_POOL_ID");
  };
});

Subject Name Strategy

You can configure how subjects are derived by the client using SchemaClientOptions.SubjectNameStrategy. This aligns subject naming with serializer configuration (for example, the serializer setting subject.name.strategy).

  • TopicName — subjects are topic-key or topic-value.
  • TopicRecordName — subjects are topic-recordType when a recordType is provided.
  • RecordName — subjects are the recordType itself.

If SubjectNameStrategy is not set, the client uses a legacy fallback: prefer topic-record when recordType is provided, otherwise topic-key / topic-value.

Example:

var options = new SchemaClientOptions
{
    SubjectNameStrategy = SubjectNameStrategy.TopicRecordName
};
var client = new SchemaRegistryExtClient(config, tokenRefreshFunc, cache, options);

If you need custom subject naming logic, you can provide a concrete implementation via SchemaClientOptions.SubjectNameStrategyImplementation (this takes precedence over the enum).

If you are using Confluent serializers, ensure the serializer's subject.name.strategy setting matches this option to avoid subject mismatches.

Supported Authentication Methods

  • API Key (Basic Auth/SCRAM, Confluent Cloud)
  • OAuth Bearer Token (Client Credentials)
  • Custom Bearer Token (manual refresh)

Usage examples (🔐 Authentication)

local.settings.json templates

If you're using Azure Functions (or another host that supports local.settings.json-style local secrets), here are ready-to-use templates for the two most common setups.

OAuth2 (Client Credentials):

{
  "IsEncrypted": false,
  "Values": {
    "SCHEMA_REGISTRY_URL": "https://your-registry.example.com",
    "OAUTH_TOKEN_ENDPOINT": "https://identity.example.com/oauth2/token",
    "OAUTH_CLIENT_ID": "your-client-id",
    "OAUTH_CLIENT_SECRET": "your-client-secret",
    "OAUTH_SCOPE": "registry.write",
    "OAUTH_LOGICAL_CLUSTER": "lkc-xxxxx",
    "OAUTH_IDENTITY_POOL_ID": "pool-xxxxx"
  }
}

API Key (Confluent Cloud / Basic Auth):

{
  "IsEncrypted": false,
  "Values": {
    "SCHEMA_REGISTRY_URL": "https://your-confluent-cloud-registry",
    "SCHEMA_REGISTRY_API_KEY": "<API_KEY>",
    "SCHEMA_REGISTRY_API_SECRET": "<API_SECRET>"
  }
}

1) OAuth2 (Client Credentials / Bearer token)

// Minimal example using DI + the library's built-in TokenManager (registered as ITokenManager)
using System.Net.Http.Json;
using System.Text.Json;
using Microsoft.Extensions.DependencyInjection;
using Confluent.SchemaRegistry;
using JohBloch.ConfluentKafka.SchemaRegistryExtClient.Interfaces;
using JohBloch.ConfluentKafka.SchemaRegistryExtClient.Services;

var config = new SchemaRegistryConfig
{
  Url = Environment.GetEnvironmentVariable("SCHEMA_REGISTRY_URL") ?? "https://your-registry.example.com"
};

// You only provide the refresh function. The library will wrap it in the built-in TokenManager.
// No initial access token is required — the TokenManager calls this function on-demand
// (first operation / near expiry) and caches the token until it needs refreshing.
Func<Task<(string token, DateTime expiresAt)>> tokenRefreshFunc = async () =>
{
  using var http = new HttpClient();

  static string Env(string key) => Environment.GetEnvironmentVariable(key)
    ?? throw new InvalidOperationException($"Missing env var: {key}");
  static string? EnvOpt(string key) => Environment.GetEnvironmentVariable(key);

  var form = new Dictionary<string, string>
  {
    ["grant_type"] = "client_credentials",
    ["client_id"] = Env("OAUTH_CLIENT_ID"),
    ["client_secret"] = Env("OAUTH_CLIENT_SECRET"),
  };
  var scope = EnvOpt("OAUTH_SCOPE");
  if (!string.IsNullOrWhiteSpace(scope)) form["scope"] = scope;

  using var resp = await http.PostAsync(Env("OAUTH_TOKEN_ENDPOINT"), new FormUrlEncodedContent(form));
  resp.EnsureSuccessStatusCode();

  var payload = await resp.Content.ReadFromJsonAsync<JsonDocument>();
  var accessToken = payload!.RootElement.GetProperty("access_token").GetString()!;
  var expiresIn = payload.RootElement.TryGetProperty("expires_in", out var p) ? p.GetInt32() : 3600;
  return (accessToken, DateTime.UtcNow.AddSeconds(expiresIn));
};

var services = new ServiceCollection();
services.AddSchemaRegistryExtClient(
  config,
  tokenRefreshFunc,
  opts =>
  {
    // Required for some Confluent Cloud OAuth setups
    opts.LogicalCluster = Environment.GetEnvironmentVariable("OAUTH_LOGICAL_CLUSTER");
    opts.IdentityPoolId = Environment.GetEnvironmentVariable("OAUTH_IDENTITY_POOL_ID");
  });

var sp = services.BuildServiceProvider();

// Use the extended client (it will fetch/refresh tokens as needed)
var client = sp.GetRequiredService<ISchemaRegistryExtClient>();

Tip: If you want to use IHttpClientFactory (recommended), use the overload that receives IServiceProvider:

services.AddHttpClient("oauth");

services.AddSchemaRegistryExtClient(
  config,
  tokenRefreshFunc: async sp =>
  {
    var http = sp.GetRequiredService<IHttpClientFactory>().CreateClient("oauth");
    // call token endpoint, then return (token, expiresAt)
    return ("token", DateTime.UtcNow.AddHours(1));
  },
  configure: opts =>
  {
    opts.LogicalCluster = Environment.GetEnvironmentVariable("OAUTH_LOGICAL_CLUSTER");
    opts.IdentityPoolId = Environment.GetEnvironmentVariable("OAUTH_IDENTITY_POOL_ID");
  });

You can also use MSAL or IdentityModel to get tokens and return (token, expiresAt).

See the "local.settings.json templates" section above.

Use these values in your token refresh implementation (or load them into environment variables before running tests/app):

  • PowerShell: $env:SCHEMA_REGISTRY_URL = (Get-Content local.settings.json | ConvertFrom-Json).Values.SCHEMA_REGISTRY_URL
  • Bash: export SCHEMA_REGISTRY_URL=$(jq -r '.Values.SCHEMA_REGISTRY_URL' local.settings.json)

2) API Key (Confluent Cloud / Basic Auth)

// Confluent Cloud: use the API key and secret as basic auth credentials
var config = new SchemaRegistryConfig { Url = "https://your-confluent-cloud-registry" };
// Option A: set BasicAuthUserInfo property
config.BasicAuthUserInfo = "<API_KEY>:<API_SECRET>";

// Option B: set via config.Set
config.Set("basic.auth.user.info", "<API_KEY>:<API_SECRET>");

var client = new SchemaRegistryExtClient(config);

See the "local.settings.json templates" section above.

To use the local settings values for the API key sample, either set BasicAuthUserInfo from the SCHEMA_REGISTRY_API_KEY/SCHEMA_REGISTRY_API_SECRET environment variables, or load them into the environment using the PowerShell/Bash examples above.


OpenTelemetry metrics (📈)

You can enable OpenTelemetry-compatible metrics when registering services with DI using AddSchemaRegistryExtClient(..., enableOpenTelemetryMetrics: true); see docs/usage.md for details.

License

MIT

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on JohBloch.ConfluentKafka.SchemaRegistryExtClient:

Package Downloads
JohBloch.ConfluentKafka.Clients.Core

Core building blocks for JohBloch.ConfluentKafka.Clients (options, models, security, config validation, and shared helpers).

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2.2.2 279 4/21/2026
1.1.2 946 2/10/2026
1.1.1 114 2/9/2026
1.1.0 234 2/4/2026
1.0.2 122 2/3/2026
1.0.1 121 2/3/2026
1.0.0 116 2/3/2026