NPipeline.Connectors.Azure.ServiceBus 0.50.0

There is a newer version of this package available.
See the version list below for details.
dotnet add package NPipeline.Connectors.Azure.ServiceBus --version 0.50.0
                    
NuGet\Install-Package NPipeline.Connectors.Azure.ServiceBus -Version 0.50.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="NPipeline.Connectors.Azure.ServiceBus" Version="0.50.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="NPipeline.Connectors.Azure.ServiceBus" Version="0.50.0" />
                    
Directory.Packages.props
<PackageReference Include="NPipeline.Connectors.Azure.ServiceBus" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add NPipeline.Connectors.Azure.ServiceBus --version 0.50.0
                    
#r "nuget: NPipeline.Connectors.Azure.ServiceBus, 0.50.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package NPipeline.Connectors.Azure.ServiceBus@0.50.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=NPipeline.Connectors.Azure.ServiceBus&version=0.50.0
                    
Install as a Cake Addin
#tool nuget:?package=NPipeline.Connectors.Azure.ServiceBus&version=0.50.0
                    
Install as a Cake Tool

NPipeline Azure Service Bus Connector

Azure Service Bus connector for NPipeline - integrate with Microsoft Azure Service Bus for enterprise-grade message queuing and pub/sub messaging.

Features

  • Queue & Topic/Subscription Source Nodes: Consume messages from queues or topic subscriptions with type-safe JSON deserialization
  • Queue & Topic Sink Nodes: Publish messages to queues or topics with batched sending
  • Session Support: First-class support for session-enabled queues and subscriptions via ServiceBusSessionSourceNode
  • Explicit Settlement: Full access to Complete, Abandon, Dead-Letter, and Defer operations via ServiceBusMessage<T>
  • Message Lock Renewal: Automatic lock renewal during long-running processing
  • Multiple Auth Modes: Connection string, Azure AD (Managed Identity / DefaultAzureCredential), and named connections
  • Acknowledgment Strategies: AutoOnSinkSuccess, Manual, and None - with idempotent settlement
  • Dead-Letter Routing: Automatic dead-lettering of deserialization failures
  • Retry Configuration: Exponential and fixed-mode retry with configurable delay/timeout
  • Channel Bridge: Push-to-pull bridge using System.Threading.Channels for backpressure-aware processing

Installation

dotnet add package NPipeline.Connectors.Azure.ServiceBus

Quick Start

using NPipeline.Connectors.Azure.ServiceBus.Configuration;
using NPipeline.Connectors.Azure.ServiceBus.Nodes;

// Source: consume from a queue
var config = new ServiceBusConfiguration
{
    ConnectionString = "Endpoint=sb://...",
    QueueName = "orders",
    AcknowledgmentStrategy = AcknowledgmentStrategy.AutoOnSinkSuccess,
};

var source = builder.AddSource(new ServiceBusQueueSourceNode<Order>(config), "sb-source");
var sink = builder.AddSink(new ServiceBusQueueSinkNode<ProcessedOrder>(sinkConfig), "sb-sink");

Configuration

var config = new ServiceBusConfiguration
{
    // ── Connection ──────────────────────────────────────────────────────────────
    ConnectionString = "<connection-string>",
    // Or for Azure AD authentication:
    // AuthenticationMode = AzureAuthenticationMode.AzureAdCredential,
    // FullyQualifiedNamespace = "my-namespace.servicebus.windows.net",
    // Credential = new DefaultAzureCredential(),

    // ── Source Options ──────────────────────────────────────────────────────────
    QueueName = "my-queue",               // Queue source/sink
    // TopicName = "my-topic",            // For topic sink
    // SubscriptionName = "my-sub",       // For subscription source
    MaxConcurrentCalls = 5,               // Parallel message handlers (default: 1)
    PrefetchCount = 20,                   // Pre-fetch buffer (default: 0)
    MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(10),

    // ── Sink Options ──────────────────────────────────────────────────────────
    EnableBatchSending = true,            // Use ServiceBusMessageBatch (default: true)
    BatchSize = 100,                      // Max messages per batch (default: 100)

    // ── Acknowledgment ────────────────────────────────────────────────────────
    AcknowledgmentStrategy = AcknowledgmentStrategy.AutoOnSinkSuccess,

    // ── Session Options (session-enabled entities only) ───────────────────────
    EnableSessions = false,               // Set to true for ServiceBusSessionSourceNode
    MaxConcurrentSessions = 8,
    SessionMaxConcurrentCallsPerSession = 1,

    // ── Error Handling ────────────────────────────────────────────────────────
    ContinueOnDeserializationError = false,
    DeadLetterOnDeserializationError = true,
    ContinueOnError = true,               // Sink errors (default: true)

    // ── Retry ─────────────────────────────────────────────────────────────────
    Retry = new ServiceBusRetryConfiguration
    {
        Mode = ServiceBusRetryMode.Exponential,
        MaxRetries = 3,
        Delay = TimeSpan.FromSeconds(1),
        MaxDelay = TimeSpan.FromSeconds(30),
    },
};

Settlement

Each message received by a source node is wrapped in a ServiceBusMessage<T> that exposes explicit settlement:

// In a transform node:
await message.CompleteAsync();    // Remove from queue
await message.AbandonAsync();     // Return to queue for redelivery
await message.DeadLetterAsync("Reason", "Description"); // Move to DLQ
await message.DeferAsync();       // Defer (receive later by sequence number)

// Via the IAcknowledgableMessage interface:
await message.AcknowledgeAsync();             // → CompleteAsync()
await message.NegativeAcknowledgeAsync();     // → AbandonAsync() (requeue=true)
await message.NegativeAcknowledgeAsync(false); // → DeadLetterAsync()

Settlement is idempotent - calling any settlement method multiple times is safe, only the first call takes effect.

Dependency Injection

services.AddServiceBusConnector(options =>
{
    options.ConnectionString = configuration["ServiceBus:ConnectionString"];
});

// Register individual nodes
services.AddServiceBusQueueSource<Order>("orders", config =>
{
    config.MaxConcurrentCalls = 10;
});

services.AddServiceBusQueueSink<ProcessedOrder>("processed-orders");

Session-Aware Processing

var config = new ServiceBusConfiguration
{
    ConnectionString = "...",
    QueueName = "session-queue",
    EnableSessions = true,
    MaxConcurrentSessions = 4,
    SessionMaxConcurrentCallsPerSession = 1,
    SessionIdleTimeout = TimeSpan.FromMinutes(2),
};

var source = new ServiceBusSessionSourceNode<Order>(config);

Dead-Letter Queue Reading

var config = new ServiceBusConfiguration
{
    ConnectionString = "...",
    QueueName = "my-queue",
    SubQueue = SubQueue.DeadLetter,  // Read from DLQ
};

Documentation

For comprehensive documentation, see Azure Service Bus Connector Documentation.

Requirements

  • .NET 8.0, 9.0, or 10.0
  • Azure.Messaging.ServiceBus 7.20.1+ (automatically included)
  • Azure.Identity 1.18.0+ (automatically included)
  • NPipeline.Connectors.Azure (automatically included)

License

This package is licensed under the Business Source License 1.1.

Free for non-production use. Production use is free for organizations with 4 or fewer developers and annual revenue of $5M AUD or less. Larger organizations require a commercial license. This license automatically converts to MIT two years after each release.

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
0.52.0 84 5/30/2026
0.51.1 92 5/29/2026
0.51.0 87 5/29/2026
0.50.0 104 5/29/2026
0.49.3 100 5/28/2026
0.49.2 91 5/27/2026
0.49.1 91 5/27/2026
0.49.0 96 5/25/2026
0.48.3 91 5/22/2026
0.48.2 94 5/19/2026
0.48.1 96 5/17/2026
0.48.0 86 5/17/2026
0.47.0 96 5/16/2026
0.46.0 90 5/16/2026
0.45.0 113 5/15/2026
0.44.0 91 5/14/2026
0.43.0 99 5/14/2026
0.42.0 99 5/8/2026
0.41.0 93 4/30/2026
0.40.3 101 4/27/2026
Loading failed