KafkaConsumerRetry 0.1.3

There is a newer version of this package available.
See the version list below for details.
dotnet add package KafkaConsumerRetry --version 0.1.3                
NuGet\Install-Package KafkaConsumerRetry -Version 0.1.3                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="KafkaConsumerRetry" Version="0.1.3" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add KafkaConsumerRetry --version 0.1.3                
#r "nuget: KafkaConsumerRetry, 0.1.3"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install KafkaConsumerRetry as a Cake Addin
#addin nuget:?package=KafkaConsumerRetry&version=0.1.3

// Install KafkaConsumerRetry as a Cake Tool
#tool nuget:?package=KafkaConsumerRetry&version=0.1.3                

UBER style retry architecture

Features:

  • Multi cluster; retries can be on a different Kafka cluster. i.e. one that is in your control
  • Timeouts and Retries are configurable
  • Topic names are configurable
  • Restrict number of "messages at a time"
  • Partition "pausing" using the librdkafka API
  • Gracefully handles allocation/loss of a consumer's partitions

How do I use it?

Check out the ExampleProject in the root directory

Why use a retry?

Sometimes things don't work the first time, and that's ok. A good example is when two events at the same time tell a processor to either update or create an entity in a database.

sequenceDiagram
    participant A1 as Subscriber One
    participant DB as Database
    participant A2 as Subscriber Two
    A1->>DB: Have you got X?
    DB->>A1: No
    A2->>DB: Have you got X?
    DB->>A2: No
    A1->>DB: Insert X
    DB->>A1: Done
    A2->>DB: Insert X
    DB->>A2: Failure!
    note over A2: Delay Execution before retry
    A2->>DB: Have you got X?
    DB->>A2: Yes, here you go.
    note over A2: Check that X should be updated
    A2->>DB: Update X
    DB->>A2: Ok

Embracing the retry means not being too concerned about timing issues

The Happy/Sad Path of a failing message

sequenceDiagram
    participant OT as Origin Topic
    participant CG as Consumer Group
    participant RT as Retry Topic 1
    participant RT2 as Retry Topic 2 
    participant DL as Dead Letter Topic 
    OT->>CG: Receive Message
    note over CG: "Exception!"
    CG->>RT: Push failed message
    RT->>CG: Receive failed message
    note over CG: Processing delayed until retry delay time
    note over CG: "Exception!"
    CG->>RT2: Push failed message for 2nd time
    RT2->>CG: Receive failed message
    note over CG: Processing delayed until retry delay time
    note over CG: "Exception!"
    CG->>DL: Push message to DLQ

Notes

  • Embrace the retry and don't be too concerned if messages end up in the first retry topic.
  • If messages end up in the second or later topics then there might be an issue that needs looking at.
  • DLQ is the end of the line, so make sure it's monitored!
  • Retry times and retry delay are configurable per consumer
  • Messages are queued for processing per partition, therefore keeping ordering where needed.
  • Delayed retries do not block other messages from being processed as they are handled per partition and are outside of the rate limiter
  • In the case of retries, the messages on the same partition will have been added after the current message and so will need to wait at least the time the current message is waiting.
  • The limit of parallel processing is restricted by the configurable IRateLimiter
  • Exception information is carried along with the message that has failed. This can be viewed in the headers.
  • Message exception failures are retried per consumer group; the consumer group information is stored in the headers of the pushed messages.
  • If there are multiple consumer groups using the same retry topics, and more than one of those consumer groups fails, multiple copies of the message that failed will appear in the retry queue. This is normal and not a bug.

The original article text is in the local ARTICLE.md or on Uber

Main considerations that have been avoided by this library

Retry spamming

Retry spamming happens when a message is placed back into a queue when the time between attempts has not been reached. This is a bad idea generally as it can flood messages, while also reducing visibility.

Worker starvation due to retry waits

In most implementations the consume is called and a message is returned. This is then processed, and the cycle happens again.

With retries, often the thread is delayed until the message's retry time is up at which point the message is processed. This has a few of problems.

  • Not calling Consume regularly will drop a consumer out of the consumer group.
  • While the processor is waiting to consume, processor time is wasted.

Sometimes this is mitigated by pushing the message back onto the retry queue, but this then becomes Retry Spamming.

Too many threads doing something at the same time

Sometimes it's not a good idea to do as much as possible. For example, when connecting to databases there is usually a limit to the number of connections that a process can have.

Avoiding main topic retries

Pushing messages to the main topic for retries is dirty.

  • Reduces visibility of system functions e.g. A dev asking "why is this message here multiple times?"
  • You may not have the rights to push to the origin server topic
  • Services that are not part of your retry system may be looking at that topic and process the message again

Usage

This section might be out of date as this develops, so for the truth refer to the TestConsole.csproj


var retryServiceConfig = new KafkaRetryConfig {
        RetryAttempts = 3,
        RetryBaseTime = TimeSpan.FromSeconds(5),
        OriginCluster = new Dictionary<string, string> {
            ["group.id"] = "my-group-name",
            ["bootstrap.servers"] = "localhost:9092",
            ["client.id"] = "client-id",
            ["auto.offset.reset"] = "earliest",
            ["enable.auto.offset.store"] = "false", //Don't auto save the offset
            ["enable.auto.commit"] = "true" // Allow auto commit
        }
    };
var topicNaming = _naming.GetTopicNaming(originalName,retryServiceConfig);
await _consumerRunner.RunConsumersAsync<TestingResultHandler>(retryServiceConfig, topicNaming, cancellationToken);

Deep dive

Topic Naming

Naming of the topic

Class Usage
KafkaRetryConfig Connection settings for the Origin and Retry Kafka Clusters. If no retry cluster is specified, then the origin will be used for the retry topics
TopicNames Responsible for the naming of the retries and dlq
PartitionMessageManager Controls the actions of the workers. More info below

PartitionMessageManager

PartitionMessageManager handles messages from the subscribed topics. Internal message queues are updated on Assigned, Revoked and Lost partition events.

Incoming messages are added to a partition's work queue until a threshold is reached. When the threshold is reached, the Pause() action is called on that topic's partition.

The Pause() call is not passed to the server, but is used by the internal Kafka library librdkafka to stop the requesting of messages from that topic's partition.

Resume() is called when the work queue reaches zero.

Product Compatible and additional computed target framework versions.
.NET net6.0 is compatible.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
0.2.1 357 12/21/2022
0.1.4 569 12/15/2022
0.1.3 309 12/15/2022
0.1.2 338 12/14/2022
0.1.0 311 12/14/2022