JN.RabbitMQClient
2.5.1
dotnet add package JN.RabbitMQClient --version 2.5.1
NuGet\Install-Package JN.RabbitMQClient -Version 2.5.1
<PackageReference Include="JN.RabbitMQClient" Version="2.5.1" />
paket add JN.RabbitMQClient --version 2.5.1
#r "nuget: JN.RabbitMQClient, 2.5.1"
// Install JN.RabbitMQClient as a Cake Addin #addin nuget:?package=JN.RabbitMQClient&version=2.5.1 // Install JN.RabbitMQClient as a Cake Tool #tool nuget:?package=JN.RabbitMQClient&version=2.5.1
JN.RabbitMQClient
Simple implementation of RabbitMQ consumer and sender.
Features
- Sender implementation
- Multiple consumer instances supported
- Multiple processing options for received messages
- Random expiration for messages sent to an holding queue (depending on the processing option)
- TLS connection support
- Limiter for message processing
- Message properties for more advanced scenarios such as queues with support for priority messages, messages Headers, etc.
More details available on the project website.
Install
Download the package from NuGet:
Install-Package JN.RabbitMQClient -Version [version number]
Usage
First, you must create the RabbitMqConsumerService
and then define delegates for ReceiveMessage
, ShutdownConsumer
and ReceiveMessageError
. The service will start the required number of consumers when StartConsumers
is called.
To use a retry queue, the method StartConsumers
should be called with a RetryQueueDetails
object.
Message processing instructions
OK
- message is considered as successfully processed
RequeueMessageWithDelay
- message is removed from the queue, but sent to a holding queue for later processing (typically with a dead letter configuration)
IgnoreMessage
- message is removed from the queue and ignored
IgnoreMessageWithRequeue
- message is rejected and sent back to the queue
Example
Example for consumer and sender services:
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Hello World!");
// consumer
// you can also use extensions AddConsumersService() and AddSenderService() (in namespace JN.RabbitMQClient.Extensions)
var consumerService = new RabbitMqConsumerService(GetBrokerConfigConsumers());
consumerService.ReceiveMessage += ReceiveMessage;
consumerService.ShutdownConsumer += ShutdownConsumer;
consumerService.ReceiveMessageError += ReceiveMessageError;
consumerService.MaxChannelsPerConnection = 5;
consumerService.ConsumersPrefetch = 2;
consumerService.ServiceDescription = "test consumer service";
consumerService.StartConsumers("my consumer");
// sender
var senderService = new RabbitMqSenderService(GetBrokerConfigSender());
IMessageProperties properties = new MessageProperties { Priority = 3 };
var result = senderService.Send("my message", properties);
Console.WriteLine($"Send result: {result.Success}");
Console.WriteLine(result.ReturnedObject != null
? $"Queue details. ConsumerCount: {result.ReturnedObject.ConsumerCount}; MessageCount: {result.ReturnedObject.MessageReadyCount}"
: $"Unable to obtain queue details. Reason: {result.ErrorDescription}");
Console.WriteLine("Press any key to exit...");
Console.ReadKey();
consumerService.Dispose();
}
private static IBrokerConfigSender GetBrokerConfigSender()
{
IBrokerConfigSender configSender = new BrokerConfigSender
{
Username = "test",
Password = "123",
Host = hostName,
VirtualHost = "MyVirtualHost",
RoutingKeyOrQueueName = "MyTestQueue",
KeepConnectionOpen = true,
GetQueueInfoOnSend = true
};
return configSender;
}
private static IBrokerConfigConsumers GetBrokerConfigConsumers()
{
IBrokerConfigConsumers configConsumers = new BrokerConfigConsumers
{
Username = "test",
Password = "123",
Host = hostName,
VirtualHost = "MyVirtualHost",
RoutingKeyOrQueueName = "MyTestQueue",
ShuffleHostList = false,
Port = 0,
TotalInstances = 4
};
return configConsumers;
}
private static async Task ReceiveMessageError(string routingKeyOrQueueName, string consumerTag, string exchange, string message, string errorMessage)
{
await Console.Out.WriteLineAsync($"Error: '{consumerTag}' | Queued message: {message} | Error message: {errorMessage}").ConfigureAwait(false);
}
private static async Task ShutdownConsumer(string consumerTag, ushort errorCode, string shutdownInitiator, string errorMessage)
{
await Console.Out.WriteLineAsync($"Shutdown '{consumerTag}' | {errorCode} | {shutdownInitiator} | {errorMessage}").ConfigureAwait(false);
}
private static async Task<MessageProcessInstruction> ReceiveMessage(string routingKeyOrQueueName, string consumerTag, long firstErrorTimestamp, string exchange, string message, string additionalInfo, IMessageProperties properties)
{
var priorityReceived = properties.Priority;
var newPriority = (byte)(priorityReceived <= 3 ? 5 : priorityReceived);
await Console.Out.WriteLineAsync($"Message received by '{consumerTag}' from queue '{routingKeyOrQueueName}': {message}; Priority received: {properties.Priority} ").ConfigureAwait(false);
return new MessageProcessInstruction
{
Value = Constants.MessageProcessInstruction.OK,
Priority = newPriority,
AdditionalInfo = "id: 123"
};
}
}
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net5.0 was computed. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
.NET Core | netcoreapp2.0 was computed. netcoreapp2.1 was computed. netcoreapp2.2 was computed. netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
.NET Standard | netstandard2.0 is compatible. netstandard2.1 was computed. |
.NET Framework | net461 was computed. net462 is compatible. net463 was computed. net47 was computed. net471 was computed. net472 was computed. net48 was computed. net481 was computed. |
MonoAndroid | monoandroid was computed. |
MonoMac | monomac was computed. |
MonoTouch | monotouch was computed. |
Tizen | tizen40 was computed. tizen60 was computed. |
Xamarin.iOS | xamarinios was computed. |
Xamarin.Mac | xamarinmac was computed. |
Xamarin.TVOS | xamarintvos was computed. |
Xamarin.WatchOS | xamarinwatchos was computed. |
-
.NETFramework 4.6.2
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 8.0.0)
- Microsoft.Extensions.Options.ConfigurationExtensions (>= 8.0.0)
- RabbitMQ.Client (>= 6.8.1)
-
.NETStandard 2.0
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 8.0.0)
- Microsoft.Extensions.Options.ConfigurationExtensions (>= 8.0.0)
- RabbitMQ.Client (>= 6.8.1)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
2.5.1 | 122 | 2/16/2024 |
2.5.0 | 134 | 2/14/2024 |
2.4.3.1 | 457 | 11/2/2022 |
2.4.3 | 494 | 7/19/2022 |
2.4.2 | 450 | 5/23/2022 |
2.4.1 | 434 | 4/20/2022 |
2.4.0 | 606 | 2/7/2022 |
2.3.4 | 302 | 12/28/2021 |
2.3.3 | 401 | 10/14/2021 |
2.3.2 | 329 | 3/17/2021 |
2.3.0 | 524 | 9/4/2020 |
2.2.1 | 540 | 6/6/2020 |
2.2.0-beta.1 | 306 | 5/24/2020 |
2.2.0-beta | 325 | 5/15/2020 |
2.1.1 | 471 | 4/14/2020 |
2.1.0 | 497 | 12/19/2019 |
2.0.0 | 496 | 11/16/2019 |
1.0.1 | 535 | 6/30/2019 |
1.0.0 | 609 | 4/23/2019 |
Release Notes for current version 2.5.1:
- Minor bug fixes
Release Notes for current version 2.5.0:
- Update RabbitMQ.Client to latest version
- Update minimum targetFramework to .NETFramework 4.6.2
- Added return object for 'Send' in the SenderService
- Added the possibility of returning the number of clients connected to a queue and the total number of ready messages as a response in the 'Send' method in SenderService. The functionality must be activated in the 'GetQueueInfoOnSend' property of the configuration.
- Added method 'GetQueueInfo' in the SenderService
- Minor bug fixes
Release Notes for version 2.4.3.1:
- Added connection timeout
Release Notes for version 2.4.3:
- Added service extensions AddConsumersService() and AddSenderService() (in namespace JN.RabbitMQClient.Extensions)
- Added ConnectionDetails readonly property
- Renamed property GetTotalConsumers to TotalConsumers
- Renamed property GetTotalRunningConsumers to TotalRunningConsumers
- Updated RabbitMQ.Client to the latest version
Release Notes for version 2.4.2:
- Update RabbitMQ.Client to latest version
- Added ConsumersPrefetch property (in consumer service)
- Bug fixes
Release Notes for version 2.4.1:
- Update RabbitMQ.Client to latest version
- Bug fixes
Release Notes for version 2.4.0:
- Added support for message properties (in sender and consumer classes); messages can now be sent for more advanced scenarios, such as queues with support for priority messages, messages Headers, etc.
- Merged sender classes; feature for keep connection open was imported to the main sender class
- Changed type for MaxChannelsPerConnection property (in consumer service)
- Bug fixes
Release Notes for version 2.3.4:
- Added support for additional information to be passed to the processing delegate; the processing instruction is now an object 'MessageProcessInstruction' where that additional information can be passed. Useful for when a message is requeued with delay to pass information to the next processing attempt.
Release Notes for version 2.3.3:
- Update target frameworks; added .NETFramework4.6.1
- Update RabbitMQ.Client to latest version
- Update consumer to expose MaxChannelsPerConnection property
Release Notes for version 2.3.2:
- Added sender service that keeps connection open.
Release Notes for version 2.3.0:
- Update RabbitMQ.Client to latest version
- Changed namespace for IRabbitMqConsumerService and IRabbitMqSenderService
- Changed behavior for StopConsumers(consumerTag) - now stops all consumers with tag starting with 'consumerTag'
- Added limiter feature for processing messages.