KafkaConsumerRetry 0.1.2

There is a newer version of this package available.
See the version list below for details.
dotnet add package KafkaConsumerRetry --version 0.1.2                
NuGet\Install-Package KafkaConsumerRetry -Version 0.1.2                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="KafkaConsumerRetry" Version="0.1.2" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add KafkaConsumerRetry --version 0.1.2                
#r "nuget: KafkaConsumerRetry, 0.1.2"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install KafkaConsumerRetry as a Cake Addin
#addin nuget:?package=KafkaConsumerRetry&version=0.1.2

// Install KafkaConsumerRetry as a Cake Tool
#tool nuget:?package=KafkaConsumerRetry&version=0.1.2                

UBER style retry architecture

Features:

  • Multi cluster; retries can be on a different Kafka cluster. i.e. one that is in your control
  • Timeouts and Retries are configurable
  • Topic names are configurable
  • Restrict number of "messages at a time"
  • Partition "pausing" using the librdkafka API
  • Gracefully handles allocation/loss of a consumer's partitions

How it works

This library implements a retry system by using retry topics.

The original article text is in the local ARTICLE.md or on Uber

Main considerations that have been avoided by this library

Retry spamming

Retry spamming happens when a message is placed back into a queue when the time between attempts has not been reached. This is a bad idea generally as it can flood messages, while also reducing visibility.

Worker starvation due to retry waits

In most implementations the consume is called and a message is returned. This is then processed, and the cycle happens again.

With retries, often the thread is delayed until the message's retry time is up at which point the message is processed. This has a few of problems.

  • Not calling Consume regularly will drop a consumer out of the consumer group.
  • While the processor is waiting to consume, processor time is wasted.

Sometimes this is mitigated by pushing the message back onto the retry queue, but this then becomes Retry Spamming.

Too many threads doing something at the same time

Sometimes it's not a good idea to do as much as possible. For example, when connecting to databases there is usually a limit to the number of connections that a process can have.

Avoiding main topic retries

Pushing messages to the main topic for retries is dirty.

  • Reduces visibility of system functions e.g. A dev asking "why is this message here multiple times?"
  • You may not have the rights to push to the origin server topic
  • Services that are not part of your retry system may be looking at that topic and process the message again

Usage

This section might be out of date as this develops, so for the truth refer to the TestConsole.csproj


var retryServiceConfig = new KafkaRetryConfig {
        RetryAttempts = 3,
        RetryBaseTime = TimeSpan.FromSeconds(5),
        OriginCluster = new Dictionary<string, string> {
            ["group.id"] = "my-group-name",
            ["bootstrap.servers"] = "localhost:9092",
            ["client.id"] = "client-id",
            ["auto.offset.reset"] = "earliest",
            ["enable.auto.offset.store"] = "false", //Don't auto save the offset
            ["enable.auto.commit"] = "true" // Allow auto commit
        }
    };
var topicNaming = _naming.GetTopicNaming(originalName,retryServiceConfig);
await _consumerRunner.RunConsumersAsync<TestingResultHandler>(retryServiceConfig, topicNaming, cancellationToken);

Deep dive

Topic Naming

Naming of the topic

Class Usage
KafkaRetryConfig Connection settings for the Origin and Retry Kafka Clusters. If no retry cluster is specified, then the origin will be used for the retry topics
TopicNames Responsible for the naming of the retries and dlq
PartitionMessageManager Controls the actions of the workers. More info below

PartitionMessageManager

PartitionMessageManager handles messages from the subscribed topics. Internal message queues are updated on Assigned, Revoked and Lost partition events.

Incoming messages are added to a partition's work queue until a threshold is reached. When the threshold is reached, the Pause() action is called on that topic's partition.

The Pause() call is not passed to the server, but is used by the internal Kafka library librdkafka to stop the requesting of messages from that topic's partition.

Resume() is called when the work queue reaches zero.

Product Compatible and additional computed target framework versions.
.NET net6.0 is compatible.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
0.2.1 357 12/21/2022
0.1.4 569 12/15/2022
0.1.3 309 12/15/2022
0.1.2 338 12/14/2022
0.1.0 311 12/14/2022