Azure.Messaging.EventHubs.Processor
5.6.2
Prefix Reserved
See the version list below for details.
dotnet add package Azure.Messaging.EventHubs.Processor --version 5.6.2
NuGet\Install-Package Azure.Messaging.EventHubs.Processor -Version 5.6.2
<PackageReference Include="Azure.Messaging.EventHubs.Processor" Version="5.6.2" />
paket add Azure.Messaging.EventHubs.Processor --version 5.6.2
#r "nuget: Azure.Messaging.EventHubs.Processor, 5.6.2"
// Install Azure.Messaging.EventHubs.Processor as a Cake Addin #addin nuget:?package=Azure.Messaging.EventHubs.Processor&version=5.6.2 // Install Azure.Messaging.EventHubs.Processor as a Cake Tool #tool nuget:?package=Azure.Messaging.EventHubs.Processor&version=5.6.2
Azure Event Hubs Event Processor client library for .NET
Azure Event Hubs is a highly scalable publish-subscribe service that can ingest millions of events per second and stream them to multiple consumers. This lets you process and analyze the massive amounts of data produced by your connected devices and applications. Once Event Hubs has collected the data, you can retrieve, transform, and store it by using any real-time analytics provider or with batching/storage adapters. If you would like to know more about Azure Event Hubs, you may wish to review: What is Event Hubs.
The Event Processor client library is a companion to the Azure Event Hubs client library, providing a stand-alone client for consuming events in a robust, durable, and scalable way that is suitable for the majority of production scenarios. An opinionated implementation built using Azure Storage blobs, the Event Processor is recommended for:
Reading and processing events across all partitions of an Event Hub at scale with resilience to transient failures and intermittent network issues.
Processing events cooperatively, where multiple processors dynamically distribute and share the responsibility in the context of a consumer group, gracefully managing the load as processors are added and removed from the group.
Managing checkpoints and state for processing in a durable manner using Azure Storage blobs as the underlying data store.
Source code | Package (NuGet) | API reference documentation | Product documentation
Getting started
Prerequisites
Azure Subscription: To use Azure services, including Azure Event Hubs, you'll need a subscription. If you do not have an existing Azure account, you may sign up for a free trial or use your Visual Studio Subscription benefits when you create an account.
Event Hubs namespace with an Event Hub: To interact with Azure Event Hubs, you'll also need to have a namespace and Event Hub available. If you are not familiar with creating Azure resources, you may wish to follow the step-by-step guide for creating an Event Hub using the Azure portal. There, you can also find detailed instructions for using the Azure CLI, Azure PowerShell, or Azure Resource Manager (ARM) templates to create an Event Hub.
Azure Storage account with blob storage: To persist checkpoints as blobs in Azure Storage, you'll need to have an Azure Storage account with blobs available. If you are not familiar with Azure Storage accounts, you may wish to follow the step-by-step guide for creating a storage account using the Azure portal. There, you can also find detailed instructions for using the Azure CLI, Azure PowerShell, or Azure Resource Manager (ARM) templates to create storage accounts.
C# 8.0: The Azure Event Hubs client library makes use of new features that were introduced in C# 8.0. In order to take advantage of the C# 8.0 syntax, it is recommended that you compile using the .NET Core SDK 3.0 or higher with a language version of
latest
. It is also possible to compile with the .NET Core SDK 2.1.x using a language version ofpreview
.Visual Studio users wishing to take full advantage of the C# 8.0 syntax will need to use Visual Studio 2019 or later. Visual Studio 2019, including the free Community edition, can be downloaded here. Users of Visual Studio 2017 can take advantage of the C# 8 syntax by making use of the Microsoft.Net.Compilers NuGet package and setting the language version, though the editing experience may not be ideal.
You can still use the library with previous C# language versions, but will need to manage asynchronous enumerable and asynchronous disposable members manually rather than benefiting from the new syntax. You may still target any framework version supported by your .NET Core SDK, including earlier versions of .NET Core or the .NET framework. For more information, see: how to specify target frameworks.
Important Note: In order to build or run the examples and the samples without modification, use of C# 8.0 is mandatory. You can still run the samples if you decide to tweak them for other language versions.
To quickly create the needed resources in Azure and to receive connection strings for them, you can deploy our sample template by clicking:
Install the package
Install the Azure Event Hubs Event Processor client library for .NET using NuGet:
dotnet add package Azure.Messaging.EventHubs.Processor
Authenticate the client
Obtain an Event Hubs connection string
For the Event Hubs client library to interact with an Event Hub, it will need to understand how to connect and authorize with it. The easiest means for doing so is to use a connection string, which is created automatically when creating an Event Hubs namespace. If you aren't familiar with using connection strings with Event Hubs, you may wish to follow the step-by-step guide to get an Event Hubs connection string.
Obtain an Azure Storage connection string
For the event processor client to make use of Azure Storage blobs for checkpointing, it will need to understand how to connect to a storage account and authorize with it. The most straightforward method of doing so is to use a connection string, which is generated at the time that the storage account is created. If you aren't familiar with storage account connection string authorization in Azure, you may wish to follow the step-by-step guide to configure Azure Storage connection strings.
Key concepts
An event processor is a construct intended to manage the responsibilities associated with connecting to a given Event Hub and processing events from each of its partitions, in the context of a specific consumer group. The act of processing events read from the partition and handling any errors that occur is delegated by the event processor to code that you provide, allowing your logic to concentrate on delivering business value while the processor handles the tasks associated with reading events, managing the partitions, and allowing state to be persisted in the form of checkpoints.
Checkpointing is a process by which readers mark and persist their position for events that have been processed for a partition. Checkpointing is the responsibility of the consumer and occurs on a per-partition, typically in the context of a specific consumer group. For the
EventProcessorClient
, this means that, for a consumer group and partition combination, the processor must keep track of its current position in the event stream. If you would like more information, please refer to checkpointing in the Event Hubs product documentation.When an event processor connects, it will begin reading events at the checkpoint that was previously persisted by the last processor of that partition in that consumer group, if one exists. As an event processor reads and acts on events in the partition, it should periodically create checkpoints to both mark the events as "complete" by downstream applications and to provide resiliency should an event processor or the environment hosting it fail. Should it be necessary, it is possible to reprocess events that were previously marked as "complete" by specifying an earlier offset through this checkpointing process.
A partition is an ordered sequence of events that is held in an Event Hub. Partitions are a means of data organization associated with the parallelism required by event consumers. Azure Event Hubs provides message streaming through a partitioned consumer pattern in which each consumer only reads a specific subset, or partition, of the message stream. As newer events arrive, they are added to the end of this sequence. The number of partitions is specified at the time an Event Hub is created and cannot be changed.
A consumer group is a view of an entire Event Hub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and from their own position. There can be at most 5 concurrent readers on a partition per consumer group; however it is recommended that there is only one active consumer for a given partition and consumer group pairing. Each active reader receives all of the events from its partition; if there are multiple readers on the same partition, then they will receive duplicate events.
For more concepts and deeper discussion, see: Event Hubs Features.
Client lifetime
The EventProcessorClient
is safe to cache and use as a singleton for the lifetime of the application, which is best practice when events are being read regularly. The clients are responsible for efficient management of network, CPU, and memory use, working to keep usage low during periods of inactivity. Calling StopProcessingAsync
or StopProcessing
on the processor is required to ensure that network resources and other unmanaged objects are properly cleaned up.
Thread safety
We guarantee that all client instance methods are thread-safe and independent of each other (guideline). This ensures that the recommendation of reusing client instances is always safe, even across threads.
Additional concepts
Client options | Event handlers | Handling failures | Diagnostics | Mocking
Examples
Creating an Event Processor client
Since the EventProcessorClient
has a dependency on Azure Storage blobs for persistence of its state, you'll need to provide a BlobContainerClient
for the processor, which has been configured for the storage account and container that should be used.
var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";
var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";
var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient
(
storageClient,
consumerGroup,
eventHubsConnectionString,
eventHubName
);
Configure the event and error handlers
In order to use the EventProcessorClient
, handlers for event processing and errors must be provided. These handlers are considered self-contained and developers are responsible for ensuring that exceptions within the handler code are accounted for.
var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";
var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";
async Task processEventHandler(ProcessEventArgs eventArgs)
{
try
{
// Perform the application-specific processing for an event. This method
// is intended for illustration and is not defined in this snippet.
await DoSomethingWithTheEvent(eventArgs.Partition, eventArgs.Data);
}
catch
{
// Handle the exception from handler code
}
}
async Task processErrorHandler(ProcessErrorEventArgs eventArgs)
{
try
{
// Perform the application-specific processing for an error. This method
// is intended for illustration and is not defined in this snippet.
await DoSomethingWithTheError(eventArgs.Exception);
}
catch
{
// Handle the exception from handler code
}
}
var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);
processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;
Start and stop processing
The EventProcessorClient
will perform its processing in the background once it has been explicitly started and continue doing so until it has been explicitly stopped. While this allows the application code to perform other tasks, it also places the responsibility of ensuring that the process does not terminate during processing if there are no other tasks being performed.
var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));
var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";
var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";
Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;
var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);
processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;
await processor.StartProcessingAsync();
try
{
// The processor performs its work in the background; block until cancellation
// to allow processing to take place.
await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
// This is expected when the delay is canceled.
}
try
{
await processor.StopProcessingAsync();
}
finally
{
// To prevent leaks, the handlers should be removed when processing is complete.
processor.ProcessEventAsync -= processEventHandler;
processor.ProcessErrorAsync -= processErrorHandler;
}
Using an Active Directory principal with the Event Processor client
The Azure Identity library provides Azure Active Directory authentication support which can be used for the Azure client libraries, including Event Hubs and Azure Storage.
To make use of an Active Directory principal, one of the available credentials from the Azure.Identity
library is specified when creating the Event Hubs client. In addition, the fully qualified Event Hubs namespace and the name of desired Event Hub are supplied in lieu of the Event Hubs connection string.
To make use of an Active Directory principal with Azure Storage blob containers, the fully qualified URL to the container must be provided when creating the storage client. Details about the valid URI formats for accessing Blob storage may be found in Naming and Referencing Containers, Blobs, and Metadata.
var credential = new DefaultAzureCredential();
var blobStorageUrl ="<< FULLY-QUALIFIED CONTAINER URL (like https://myaccount.blob.core.windows.net/mycontainer) >>";
var fullyQualifiedNamespace = "<< FULLY-QUALIFIED EVENT HUBS NAMESPACE (like something.servicebus.windows.net) >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";
var storageClient = new BlobContainerClient(new Uri(blobStorageUrl), credential);
var processor = new EventProcessorClient
(
storageClient,
consumerGroup,
fullyQualifiedNamespace,
eventHubName,
credential
);
When using Azure Active Directory with Event Hubs, your principal must be assigned a role which allows reading from Event Hubs, such as the Azure Event Hubs Data Receiver
role. For more information about using Azure Active Directory authorization with Event Hubs, please refer to the associated documentation.
When using Azure Active Directory with Azure Storage, your principal must be assigned a role which allows read, write, and delete access to blobs, such as the Storage Blob Data Contributor
role. For more information about using Active Directory Authorization with Azure Storage, please refer to the the associated documentation and the Azure Storage authorization sample.
Troubleshooting
Exception handling
Event Processor client exceptions
The Event Processor client makes every attempt to be resilient in the face of exceptions and will take the necessary actions to continue processing unless it is impossible to do so. No action from developers is needed for this to take place; it is natively part of the processor's behavior.
In order to allow developers the opportunity to inspect and react to exceptions that occur within the Event Processor client operations, they are surfaced via the ProcessError
event. The arguments for this event offer details about the exception and the context in which it was observed. Developers may perform normal operations on the Event Processor client from within this event handler, such as stopping and/or restarting it in response to errors, but may not otherwise influence the processor's exception behavior.
For a basic example of implementing the error handler, please see the sample: Event Processor Handlers.
Exceptions in event handlers
Because the Event Processor client lacks the appropriate context to understand the severity of exceptions within the event handlers that developers provide, it cannot assume what actions would be a reasonable response to them. As a result, developers are considered responsible for exceptions that occur within the event handlers they provide using try/catch
blocks and other standard language constructs.
The Event Processor client will not attempt to detect exceptions in developer code nor surface them explicitly. The resulting behavior will depend on the processor's hosting environment and the context in which the event handler was called. Because this may vary between different scenarios, it is strongly recommended that developers code their event handlers defensively and account for potential exceptions.
Exception details
For detailed information about exceptions that may occur, please refer to the Event Hubs client library README and the service documentation for Event Hubs messaging exceptions.
Logging and diagnostics
The Event Processor client library is fully instrumented for logging information at various levels of detail using the .NET EventSource
to emit information. Logging is performed for each operation and follows the pattern of marking the starting point of the operation, it's completion, and any exceptions encountered. Additional information that may offer insight is also logged in the context of the associated operation.
The Event Processor client logs are available to any EventListener
by opting into the source named "Azure-Messaging-EventHubs-Processor-EventProcessorClient" or opting into all sources that have the trait "AzureEventSource". To make capturing logs from the Azure client libraries easier, the Azure.Core
library used by Event Hubs offers an AzureEventSourceListener
. More information can be found in the Azure.Core Diagnostics sample.
The Event Processor library is also instrumented for distributed tracing using Application Insights or OpenTelemetry. More information can be found in the Azure.Core Diagnostics sample.
Next steps
Beyond the scenarios discussed, the Azure Event Hubs Processor library offers support for additional scenarios to help take advantage of the full feature set of the EventProcessorClient
. In order to help explore some of these scenarios, the Event Hubs Processor client library offers a project of samples to serve as an illustration for common scenarios. Please see the samples README for details.
Contributing
This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.
When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.
This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.
Please see our contributing guide for more information.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net5.0 was computed. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
.NET Core | netcoreapp2.0 was computed. netcoreapp2.1 was computed. netcoreapp2.2 was computed. netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
.NET Standard | netstandard2.0 is compatible. netstandard2.1 was computed. |
.NET Framework | net461 was computed. net462 was computed. net463 was computed. net47 was computed. net471 was computed. net472 was computed. net48 was computed. net481 was computed. |
MonoAndroid | monoandroid was computed. |
MonoMac | monomac was computed. |
MonoTouch | monotouch was computed. |
Tizen | tizen40 was computed. tizen60 was computed. |
Xamarin.iOS | xamarinios was computed. |
Xamarin.Mac | xamarinmac was computed. |
Xamarin.TVOS | xamarintvos was computed. |
Xamarin.WatchOS | xamarinwatchos was computed. |
-
.NETStandard 2.0
- Azure.Core (>= 1.20.0)
- Azure.Messaging.EventHubs (>= 5.6.2)
- Azure.Storage.Blobs (>= 12.8.0)
- Microsoft.Azure.Amqp (>= 2.5.6)
- Microsoft.Bcl.AsyncInterfaces (>= 1.0.0)
- System.Diagnostics.DiagnosticSource (>= 4.6.0)
- System.Reflection.TypeExtensions (>= 4.5.1)
- System.Threading.Channels (>= 4.6.0)
- System.Threading.Tasks.Extensions (>= 4.5.4)
NuGet packages (30)
Showing the top 5 NuGet packages that depend on Azure.Messaging.EventHubs.Processor:
Package | Downloads |
---|---|
MassTransit.EventHub
MassTransit EventHub Support; MassTransit provides a developer-focused, modern platform for creating distributed applications without complexity. |
|
Microsoft.Azure.DurableTask.Netherite
Netherite orchestration service provider for the Durable Task Framework. |
|
Energinet.DataHub.Core.FunctionApp.TestCommon
[Release Notes](https://github.com/Energinet-DataHub/geh-core/blob/master/source/TestCommon/documents/release-notes/release-notes.md) [Documentation](https://github.com/Energinet-DataHub/geh-core/blob/master/source/TestCommon/documents/documentation.md) |
|
SlimMessageBus.Host.AzureEventHub
Ak,m Event Hubs provider for SlimMessageBus |
|
AzureSolutions.Helpers
This class library includes helper logic referenced by Azure Solutions documentation (https://richchapler.github.io/AzureSolutionsDocumentation/) and related solutions. |
GitHub repositories (11)
Showing the top 5 popular GitHub repositories that depend on Azure.Messaging.EventHubs.Processor:
Repository | Stars |
---|---|
MassTransit/MassTransit
Distributed Application Framework for .NET
|
|
Azure/azure-sdk-for-net
This repository is for active development of the Azure SDK for .NET. For consumers of the SDK we recommend visiting our public developer docs at https://learn.microsoft.com/dotnet/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-net.
|
|
dotnet/aspire
Tools, templates, and packages to accelerate building observable, production-ready apps
|
|
phongnguyend/Practical.CleanArchitecture
Full-stack .Net 8 Clean Architecture (Microservices, Modular Monolith, Monolith), Blazor, Angular 19, React 18, Vue 3, BFF with YARP, Domain-Driven Design, CQRS, SOLID, Asp.Net Core Identity Custom Storage, OpenID Connect, Entity Framework Core, OpenTelemetry, SignalR, Hosted Services, Health Checks, Rate Limiting, Cloud Services (Azure, AWS, GCP).
|
|
mspnp/cloud-design-patterns
Sample implementations for cloud design patterns found in the Azure Architecture Center.
|
Version | Downloads | Last updated |
---|---|---|
5.12.0-beta.1 | 7,335 | 7 months ago |
5.11.5 | 474,242 | 4 months ago |
5.11.4 | 169,273 | 5 months ago |
5.11.3 | 414,151 | 7 months ago |
5.11.2 | 250,080 | 9 months ago |
5.11.1 | 341,307 | 10 months ago |
5.11.0 | 116,392 | 2/14/2024 |
5.10.0 | 900,908 | 11/8/2023 |
5.9.3 | 642,333 | 9/12/2023 |
5.9.2 | 690,914 | 6/6/2023 |
5.9.1 | 189,637 | 5/9/2023 |
5.9.0 | 291,394 | 4/11/2023 |
5.8.1 | 158,185 | 3/13/2023 |
5.8.0 | 12,377 | 3/7/2023 |
5.7.5 | 1,044,642 | 11/22/2022 |
5.7.4 | 201,371 | 11/8/2022 |
5.7.3 | 269,062 | 10/11/2022 |
5.7.2 | 485,793 | 8/9/2022 |
5.7.1 | 315,686 | 7/7/2022 |
5.7.0 | 460,910 | 5/10/2022 |
5.7.0-beta.5 | 7,585 | 4/5/2022 |
5.7.0-beta.4 | 845 | 3/11/2022 |
5.7.0-beta.3 | 1,039 | 2/9/2022 |
5.7.0-beta.2 | 8,502 | 1/13/2022 |
5.7.0-beta.1 | 3,415 | 11/9/2021 |
5.6.2 | 1,360,024 | 10/5/2021 |
5.6.1 | 218,472 | 9/8/2021 |
5.6.0 | 134,431 | 8/10/2021 |
5.5.0 | 251,134 | 7/7/2021 |
5.5.0-beta.1 | 1,080 | 6/8/2021 |
5.4.1 | 328,558 | 5/11/2021 |
5.4.0 | 218,056 | 4/5/2021 |
5.4.0-beta.1 | 396 | 3/17/2021 |
5.3.1 | 190,308 | 3/9/2021 |
5.3.0 | 116,798 | 2/9/2021 |
5.3.0-beta.4 | 16,300 | 11/10/2020 |
5.3.0-beta.3 | 4,315 | 9/30/2020 |
5.3.0-beta.1 | 1,932 | 9/15/2020 |
5.2.0 | 402,483 | 9/8/2020 |
5.2.0-preview.3 | 1,563 | 8/18/2020 |
5.2.0-preview.1 | 1,657 | 7/6/2020 |
5.1.0 | 378,819 | 5/5/2020 |
5.1.0-preview.1 | 468 | 4/6/2020 |
5.0.1 | 166,573 | 1/29/2020 |
5.0.0-preview.6 | 16,492 | 12/4/2019 |