Orleans.Streams.Kafkav2 2.0.0

dotnet add package Orleans.Streams.Kafkav2 --version 2.0.0                
NuGet\Install-Package Orleans.Streams.Kafkav2 -Version 2.0.0                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Orleans.Streams.Kafkav2" Version="2.0.0" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add Orleans.Streams.Kafkav2 --version 2.0.0                
#r "nuget: Orleans.Streams.Kafkav2, 2.0.0"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install Orleans.Streams.Kafkav2 as a Cake Addin
#addin nuget:?package=Orleans.Streams.Kafkav2&version=2.0.0

// Install Orleans.Streams.Kafkav2 as a Cake Tool
#tool nuget:?package=Orleans.Streams.Kafkav2&version=2.0.0                

Orleans.Stream.Kafka

This library is a fork of https://github.com/jonathansant/Orleans.Streams.Kafka

Kafka persistent stream provider for Microsoft Orleans that uses the Confluent SDK. This provider has the added benefit that it allows external messages (not generated from orleans) to be merged with orleans streaming system to be consumed as if the messages were generated by orleans.

Dependencies

Orleans.Streams.Kafka has the following dependencies:

  • Microsoft Orleans 8.0.0
  • Confluent.Kafka: 2.4.0
  • Orleans.Streams.Utils: NuGet version

Installation

To start working with the Orleans.Streams.Kafka make sure you do the following steps:

  1. Install Kafka on a machine (or cluster) which you have access to use the Confluent Platform.
  2. Create Topics in Kafka with as many partitions as needed for each topic.
  3. Install the Orleans.Streams.Kafka nuget from the nuget repository.
  4. Add to the Silo configuration the new stream provider with the necessary parameters and the optional ones (if you wish). you can see what is configurable in KafkaStreamProvider under Configurable Values.

Example KafkaStreamProvider configuration:

public class SiloBuilderConfigurator : ISiloBuilderConfigurator
{
	public void Configure(ISiloBuilder hostBuilder)
		=> hostBuilder
				.AddMemoryGrainStorage("PubSubStore")
				.AddKafka("KafkaStreamProvider")
				.WithOptions(options =>
				{
					options.BrokerList = new [] {"localhost:8080"};
					options.ConsumerGroupId = "E2EGroup";
					options.ConsumeMode = ConsumeMode.StreamEnd;

					options
						.AddTopic(Consts.StreamNamespace)
						.AddTopic(Consts.StreamNamespace2, new TopicCreationConfig { AutoCreate = true, Partitions = 2, ReplicationFactor = 1 })
						.AddExternalTopic<TestModel>(Consts.StreamNamespaceExternal)
						;
				})
				.AddJson()
				.AddLoggingTracker()
        .Build()
        ;
}

public class ClientBuilderConfigurator : IClientBuilderConfigurator
{
	public virtual void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
		=> clientBuilder
					.AddKafka("KafkaStreamProvider")
				  .WithOptions(options =>
				  {
					  options.BrokerList =  new [] {"localhost:8080"};
					  options.ConsumerGroupId = "E2EGroup";

					  options
						 .AddTopic(Consts.StreamNamespace)
						 .AddTopic(Consts.StreamNamespace2, new TopicCreationConfig { AutoCreate = true, Partitions = 2, ReplicationFactor = 1 })
						 .AddExternalTopic<TestModel>(Consts.StreamNamespaceExternal)
						  ;
				  })
				  .AddJson()
          .Build()
          ;
}

Usage

Producing:

var testGrain = clusterClient.GetGrain<ITestGrain>(grainId);

var result = await testGrain.GetThePhrase();

Console.BackgroundColor = ConsoleColor.DarkMagenta;
Console.WriteLine(result);

var streamProvider = clusterClient.GetStreamProvider("KafkaProvider");
var stream = streamProvider.GetStream<TestModel>("streamId", "topic1");
await stream.OnNextAsync(new TestModel
{
	Greeting = "hello world"
});

Consuming:

var kafkaProvider = GetStreamProvider("KafkaStreamProvider");
var testStream = kafkaProvider.GetStream<TestModel>("streamId", "topic1");

// To resume stream in case of stream deactivation
var subscriptionHandles = await testStream.GetAllSubscriptionHandles();

if (subscriptionHandles.Count > 0)
{
	foreach (var subscriptionHandle in subscriptionHandles)
	{
		await subscriptionHandle.ResumeAsync(OnNextTestMessage);
	}
}

await testStream.SubscribeAsync(OnNextTestMessage);

Note: The Stream Namespace identifies the Kafka topic.

<a name="configurableValues"></a>Configurable Values

These are the configurable values that the Orleans.Streams.Kafka:

  • Topics: The topics that will be used where messages will be Produced/Consumed.
  • BrokerList: List of Kafka brokers to connect to.
  • ConsumerGroupId: The ConsumerGroupId used by the Kafka Consumer. Default value is orleans-kafka
  • PollTimeout: Determines the duration that the Kafka consumer blocks for to wait for messages. Default value is 100ms
  • PollBufferTimeout: Determines the duration the KafkaAdapterReceiver will continue to poll for messages (for the same batch) Default value is 500ms
  • AdminRequestTimeout: Timeout for admin requests. Default value is 5s
  • ConsumeMode: Determines the offset to start consuming from. Default value is ConsumeMode.LastCommittedMessage
  • ProducerTimeout: Timeout for produce requests. Default value is 5s
  • RetentionPeriodInMs: Message retention period in milliseconds for kafka topic. Default value is 7 days
Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
2.0.0 110 8/4/2024
1.0.0 97 7/30/2024