Fluvio.Client.Abstractions
0.1.3
dotnet add package Fluvio.Client.Abstractions --version 0.1.3
NuGet\Install-Package Fluvio.Client.Abstractions -Version 0.1.3
<PackageReference Include="Fluvio.Client.Abstractions" Version="0.1.3" />
<PackageVersion Include="Fluvio.Client.Abstractions" Version="0.1.3" />
<PackageReference Include="Fluvio.Client.Abstractions" />
paket add Fluvio.Client.Abstractions --version 0.1.3
#r "nuget: Fluvio.Client.Abstractions, 0.1.3"
#:package Fluvio.Client.Abstractions@0.1.3
#addin nuget:?package=Fluvio.Client.Abstractions&version=0.1.3
#tool nuget:?package=Fluvio.Client.Abstractions&version=0.1.3
Fluvio.Client - Pure Managed .NET Client for Fluvio
A pure managed .NET client library for Fluvio, the distributed streaming platform. This client is built entirely in C# without native dependencies, implementing the Fluvio wire protocol directly.
Features
- Pure Managed Implementation - No native dependencies, works on any .NET platform
- Producer API - Send messages to Fluvio topics with at-most-once or at-least-once delivery guarantees
- Consumer API - Stream messages from topics with configurable offsets and isolation levels
- Admin API - Create, delete, and list topics programmatically
Installation
dotnet add package Fluvio.Client
Quick Start
Producer Example
using System.Text;
using Fluvio.Client;
using Fluvio.Client.Abstractions;
// Connect to Fluvio cluster
await using var client = await FluvioClient.ConnectAsync();
// Get a producer
var producer = client.Producer();
// Send a message
var message = Encoding.UTF8.GetBytes("Hello, Fluvio!");
var offset = await producer.SendAsync("my-topic", message);
Console.WriteLine($"Sent message at offset {offset}");
Consumer Example
using System.Text;
using Fluvio.Client;
using Fluvio.Client.Abstractions;
// Connect to Fluvio cluster
await using var client = await FluvioClient.ConnectAsync();
// Get a consumer
var consumer = client.Consumer();
// Stream messages continuously
await foreach (var record in consumer.StreamAsync("my-topic", offset: 0))
{
var value = Encoding.UTF8.GetString(record.Value.Span);
Console.WriteLine($"[Offset: {record.Offset}] {value}");
}
Admin Example
using Fluvio.Client;
using Fluvio.Client.Abstractions;
await using var client = await FluvioClient.ConnectAsync();
var admin = client.Admin();
// Create a topic
await admin.CreateTopicAsync("my-topic", new TopicSpec(
Partitions: 3,
ReplicationFactor: 1
));
// List all topics
var topics = await admin.ListTopicsAsync();
foreach (var topic in topics)
{
Console.WriteLine($"Topic: {topic.Name}, Partitions: {topic.Partitions}");
}
API Documentation
FluvioClient
Main entry point for connecting to a Fluvio cluster.
var client = new FluvioClient(new FluvioClientOptions
{
SpuEndpoint = "localhost:9010", // SPU for data operations
ScEndpoint = "localhost:9003", // SC for admin operations
UseTls = false,
ClientId = "my-client",
ConnectionTimeout = TimeSpan.FromSeconds(30),
RequestTimeout = TimeSpan.FromSeconds(60),
EnableAutoReconnect = true, // Automatic reconnection on failure
MaxReconnectAttempts = 5, // Up to 5 reconnection attempts
ReconnectDelay = TimeSpan.FromSeconds(2) // 2s base delay with exponential backoff
});
await client.ConnectAsync();
Producer API
var producer = client.Producer(new ProducerOptions
{
DeliveryGuarantee = DeliveryGuarantee.AtLeastOnce,
BatchSize = 1000,
LingerTime = TimeSpan.FromMilliseconds(100)
});
// Send single message
var offset = await producer.SendAsync(
topic: "my-topic",
value: messageBytes,
key: keyBytes // optional
);
// Send batch
var records = new List<ProduceRecord>
{
new(Encoding.UTF8.GetBytes("message 1")),
new(Encoding.UTF8.GetBytes("message 2"))
};
var offsets = await producer.SendBatchAsync("my-topic", records);
Consumer API
var consumer = client.Consumer(new ConsumerOptions
{
MaxBytes = 1024 * 1024, // 1MB
IsolationLevel = IsolationLevel.ReadCommitted
});
// Stream continuously
await foreach (var record in consumer.StreamAsync("my-topic", partition: 0, offset: 0))
{
// Process record
}
// Fetch batch
var records = await consumer.FetchBatchAsync(
topic: "my-topic",
partition: 0,
offset: 0,
maxBytes: 1024 * 1024
);
SmartModule Support
Apply transformations and filters using SmartModules:
// Using a predefined SmartModule
var consumer = client.Consumer(new ConsumerOptions
{
SmartModules =
[
new SmartModuleInvocation
{
Name = "my-filter", // Name of pre-deployed SmartModule
Kind = SmartModuleKindType.Filter,
Parameters = new Dictionary<string, string>
{
["threshold"] = "100"
}
}
]
});
// Chain multiple SmartModules
var consumer = client.Consumer(new ConsumerOptions
{
SmartModules =
[
new SmartModuleInvocation
{
Name = "filter-errors",
Kind = SmartModuleKindType.Filter
},
new SmartModuleInvocation
{
Name = "map-to-json",
Kind = SmartModuleKindType.Map
}
]
});
// Using an aggregate SmartModule with initial accumulator
var consumer = client.Consumer(new ConsumerOptions
{
SmartModules =
[
new SmartModuleInvocation
{
Name = "running-sum",
Kind = SmartModuleKindType.Aggregate,
Accumulator = Encoding.UTF8.GetBytes("0") // Initial value
}
]
});
Admin API
var admin = client.Admin();
// Create topic
await admin.CreateTopicAsync("my-topic", new TopicSpec
{
Partitions = 3,
ReplicationFactor = 1,
IgnoreRackAssignment = false
});
// Delete topic
await admin.DeleteTopicAsync("my-topic");
// List topics
var topics = await admin.ListTopicsAsync();
// Get specific topic
var topic = await admin.GetTopicAsync("my-topic");
Architecture
This client implements the Fluvio wire protocol directly in managed C#:
- Binary Protocol Layer - Big-endian binary encoding/decoding (Kafka-inspired)
- TCP Connection Layer - Async TCP client with TLS support and connection pooling
- Request/Response Handling - Correlation ID-based multiplexing
- High-Level APIs - Producer, Consumer, and Admin abstractions
Protocol Compatibility
This client is compatible with Fluvio's wire protocol, which is inspired by Apache Kafka's protocol but optimized for Fluvio's architecture. The protocol uses:
- Big-endian byte order
- Size-prefixed messages
- Correlation ID-based request/response matching
- Binary key/value records
Requirements
- .NET 8.0 or later
- Fluvio cluster (local or remote)
Performance
The Fluvio C# client delivers production-ready performance:
- Producer: 113-115 μs per message, 738,550 msg/s in batch mode (100 messages)
- Consumer: ~1 μs per message with streaming, 0.65 ms for batch of 1000
- Protocol: 5 ns record creation, zero-copy design with minimal allocations
See BENCHMARK_RESULTS.md for detailed performance analysis.
Observability & Testability
The client includes comprehensive observability features for production deployments:
Distributed Tracing
Built-in support for distributed tracing using .NET's ActivitySource (OpenTelemetry-compatible):
// Enable tracing by configuring an ActivityListener or using OpenTelemetry SDK
using var listener = new ActivityListener
{
ShouldListenTo = source => source.Name == "Fluvio.Client",
Sample = (ref ActivityCreationOptions<ActivityContext> options) => ActivitySamplingResult.AllData,
ActivityStarted = activity => Console.WriteLine($"Started: {activity.OperationName}"),
ActivityStopped = activity => Console.WriteLine($"Stopped: {activity.OperationName} ({activity.Duration})")
};
ActivitySource.AddActivityListener(listener);
// Operations are automatically traced
var offset = await producer.SendAsync("my-topic", messageBytes);
await foreach (var record in consumer.StreamAsync("my-topic")) { /* ... */ }
Trace data includes:
- Operation context: Topic, partition, offset, record count
- Performance metrics: Request/response sizes, timing
- Correlation: Automatic parent-child span relationships
- Error tracking: Exception details and error codes
Development
Building
dotnet build
Running Tests
dotnet test
Running Benchmarks
# Start Fluvio cluster
fluvio cluster start
# Run performance benchmarks
cd benchmarks/Fluvio.Client.Benchmarks
dotnet run -c Release
Running Examples
# Run producer example
dotnet run --project examples/ProducerExample
# Run consumer example (in another terminal)
dotnet run --project examples/ConsumerExample
Documentation
- GETTING_STARTED.md - Quick start guide and examples
- ARCHITECTURE.md - System design and architecture details
- PROTOCOL_REFERENCE.md - Wire protocol implementation details
- BENCHMARK_RESULTS.md - Performance benchmarks and analysis
- TODO.md - Feature roadmap and Rust parity tracking
Limitations
Important Note: This is an initial implementation that demonstrates the feasibility of a pure managed .NET client for Fluvio. The actual Fluvio wire protocol may differ from this implementation, which is based on publicly available documentation and the Kafka-inspired protocol structure.
For production use, you may need to:
- Verify protocol compatibility with your Fluvio version
- Add more robust error handling
- Implement additional features like topic mirroring, etc.
- Add comprehensive integration tests with a real Fluvio cluster
Contributing
Contributions are welcome! Please feel free to submit issues or pull requests.
License
MIT License - See LICENSE file for details
Acknowledgments
Disclaimer
This is an unofficial client implementation. For official client libraries, please visit the Fluvio documentation.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.0)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on Fluvio.Client.Abstractions:
| Package | Downloads |
|---|---|
|
Fluvio.Client
Pure managed .NET client for high performance streaming platform fluvio.io. |
GitHub repositories
This package is not used by any popular GitHub repositories.