Picton.Messaging
9.0.0
See the version list below for details.
dotnet add package Picton.Messaging --version 9.0.0
NuGet\Install-Package Picton.Messaging -Version 9.0.0
<PackageReference Include="Picton.Messaging" Version="9.0.0" />
paket add Picton.Messaging --version 9.0.0
#r "nuget: Picton.Messaging, 9.0.0"
// Install Picton.Messaging as a Cake Addin #addin nuget:?package=Picton.Messaging&version=9.0.0 // Install Picton.Messaging as a Cake Tool #tool nuget:?package=Picton.Messaging&version=9.0.0
Picton
About
Picton.Messaging is a C# library containing a high performance message processor (also known as a message "pump") designed to process messages from an Azure storage queue as efficiently as possible.
I created Picton.Mesaging because I needed a way to process a large volume of messages from Azure storage queues as quickly and efficiently as possible. I searched for a long time, but I could never find a solution that met all my requirements.
In March 2016 I attended three webinars by Daniel Marbach on "Async/Await & Task Parallel Library" (part 1, part 2 and part 3). Part 2 was particularly interresting to me because Daniel presented a generic message pump that meets most (but not all) of my requirements. Specifically, Daniel's message pump meets the following criteria:
- Concurrent message handling. This means that multiple messages can be processed at the same time. This is critical, especially if your message processing logic is I/O bound.
- Limiting concurrency. This means that we can decide the maximum number of messages that can be processed at the same time.
- Cancelling and graceful shutdown. This means that our message pump can be notified that we want to stop processing additional messages and also we can decide what to do with messages that are being processed at the moment when we decide to stop.
The sample code that Daniel shared during his webinars was very generic and not specific to Azure so I made the following enhancements:
- The messages are fetched from an Azure storage queue. We can specify which queue and it can even be a queue in the storage emulator.
- Backoff. When no messages are found in the queue, it's important to scale back and query the queue less often. As soon as new messages are available in the queue, we want to scale back up in order to process these messages as quickly as possible. I made two improvements to implemented this logic:
- first, introduce a pause when no message is found in the queue in order to reduce the number of queries to the storage queue. By default we pause for one second, but it's configurable. You could even eliminate the pause altogether but I do not recommend it.
- second, reduce the number of concurrent message processing tasks. This is the most important improvement introduced in the Picton library (in my humble opinion!). Daniel's sample code uses
SemaphoreSlim
to act as a "gatekeeper" and to limit the number of tasks that be be executed concurently. However, the number of "slots" permitted by the semaphore must be predetermined and is fixed. Picton eliminates this restriction and allows this number to be dynamically increased and decreased based on the presence or abscence of messages in the queue.
In December 2017 version 2.0 was released with a much more efficient method of fetching messages from the Azure queue: there is now a dedicated task for this pupose instead of allowing each individual concurent task to fetch their own messages. This means that the logic to increase/decrease the number of available slots in the SemaphoreSlim is no longer necessary and has ben removed.
Nuget
Picton.Messaging is available as a Nuget package.
Installation
The easiest way to include Picton.Messaging in your C# project is by grabing the nuget package:
PM> Install-Package Picton.Messaging
Once you have the Picton.Messaging library properly referenced in your project, modify your RoleEntryPoint like this example:
using Microsoft.WindowsAzure.ServiceRuntime;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.RetryPolicies;
using Picton.Messaging;
using System;
using System.Diagnostics;
namespace WorkerRole1
{
public class MyWorkerRole : RoleEntryPoint
{
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
public override void Run()
{
Trace.TraceInformation("WorkerRole is running");
try
{
this.RunAsync(this.cancellationTokenSource.Token).Wait();
}
finally
{
this.runCompleteEvent.Set();
}
}
public override bool OnStart()
{
// Use TLS 1.2 for Service Bus connections
ServicePointManager.SecurityProtocol = SecurityProtocolType.Tls12;
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
// For information on handling configuration changes
// see the MSDN topic at https://go.microsoft.com/fwlink/?LinkId=166357.
bool result = base.OnStart();
Trace.TraceInformation("WorkerRole has been started");
return result;
}
public override void OnStop()
{
Trace.TraceInformation("WorkerRole is stopping");
// Invoking "Cancel()" will cause the AsyncMessagePump to stop
this.cancellationTokenSource.Cancel();
this.runCompleteEvent.WaitOne();
base.OnStop();
Trace.TraceInformation("WorkerRole has stopped");
}
private async Task RunAsync(CancellationToken cancellationToken)
{
var connectionString = "<-- insert connection string for your Azure account -->";
var concurrentTask = 10; // <-- this is the max number of messages that can be processed at a time
// Configure the message pump
var options = new MessagePumpOptions(connectionString, concurrentTasks);
var messagePump = new AsyncMessagePump(options)
{
OnMessage = (queueName, message, cancellationToken) =>
{
// This is where you insert your custom logic to process a message
},
OnError = (queueName, message, exception, isPoison) =>
{
// Insert your custom error handling
// ==========================================================================
// Important note regarding "isPoison":
// --------------------------------------------------------------------------
// this parameter indicates whether this message has exceeded the maximum
// number of retries.
//
// When you have configured the "poison queue name" for the given queue and
// this parameter is "true", the message is automatically copied to the poison
// queue and removed from the original queue.
//
// If you have not configured the "poison queue name" for the given queue and
// this parameter is "true", the message is automatically removed from the
// original queue and you are responsible for storing the message. If you don't,
// this mesage will be lost.
// ==========================================================================
}
};
// Replace the following samples with the queues you want to monitor
messagePump.AddQueue("queue01", "queue01-poison", TimeSpan.FromMinutes(1), 3, "queue01-oversize-messages");
messagePump.AddQueue("queue02", "queue02-poison", TimeSpan.FromMinutes(1), 3, "queue02-oversize-messages");
messagePump.AddQueue("queue03", "queue03-poison", TimeSpan.FromMinutes(1), 3, "queue03-oversize-messages");
// Queues can share the same poison queue
messagePump.AddQueue("queue04", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "queue04-oversize-messages");
messagePump.AddQueue("queue05", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "queue05-oversize-messages");
// Queues can also share the same blob storage for messages that exceed the max size
messagePump.AddQueue("queue06", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "large-messages-blob");
messagePump.AddQueue("queue07", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "large-messages-blob");
// Start the message pump
await messagePump.StartAsync(cancellationToken);
}
}
}
License
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net5.0 was computed. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 is compatible. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
.NET Core | netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
.NET Standard | netstandard2.1 is compatible. |
.NET Framework | net48 is compatible. net481 was computed. |
MonoAndroid | monoandroid was computed. |
MonoMac | monomac was computed. |
MonoTouch | monotouch was computed. |
Tizen | tizen60 was computed. |
Xamarin.iOS | xamarinios was computed. |
Xamarin.Mac | xamarinmac was computed. |
Xamarin.TVOS | xamarintvos was computed. |
Xamarin.WatchOS | xamarinwatchos was computed. |
-
.NETFramework 4.8
- App.Metrics (>= 4.3.0)
- Microsoft.Extensions.DependencyModel (>= 7.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 7.0.1)
- Picton (>= 9.2.0)
- System.Threading.Channels (>= 8.0.0)
-
.NETStandard 2.1
- App.Metrics (>= 4.3.0)
- Microsoft.CSharp (>= 4.7.0)
- Microsoft.Extensions.DependencyModel (>= 7.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 7.0.1)
- Picton (>= 9.2.0)
- System.Threading.Channels (>= 8.0.0)
-
net7.0
- App.Metrics (>= 4.3.0)
- Microsoft.Extensions.DependencyModel (>= 7.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 7.0.1)
- Picton (>= 9.2.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.