DNV.Dapr.PubSub 2.1.0-preview.1756370241

This is a prerelease version of DNV.Dapr.PubSub.
There is a newer version of this package available.
See the version list below for details.
dotnet add package DNV.Dapr.PubSub --version 2.1.0-preview.1756370241
                    
NuGet\Install-Package DNV.Dapr.PubSub -Version 2.1.0-preview.1756370241
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="DNV.Dapr.PubSub" Version="2.1.0-preview.1756370241" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="DNV.Dapr.PubSub" Version="2.1.0-preview.1756370241" />
                    
Directory.Packages.props
<PackageReference Include="DNV.Dapr.PubSub" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add DNV.Dapr.PubSub --version 2.1.0-preview.1756370241
                    
#r "nuget: DNV.Dapr.PubSub, 2.1.0-preview.1756370241"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package DNV.Dapr.PubSub@2.1.0-preview.1756370241
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=DNV.Dapr.PubSub&version=2.1.0-preview.1756370241&prerelease
                    
Install as a Cake Addin
#tool nuget:?package=DNV.Dapr.PubSub&version=2.1.0-preview.1756370241&prerelease
                    
Install as a Cake Tool

DNV.Dapr.PubSub

A comprehensive Dapr Pub/Sub client library that provides simplified APIs for publishing messages to topics with full HttpClient injection support, factory pattern, and bulk publishing capabilities.

Overview

This library provides two main components:

  • DNV.Dapr.PubSub.Abstractions: Core abstractions and interfaces
  • DNV.Dapr.PubSub: Default implementation with HttpClient injection support

Why This Implementation?

The main purpose of this DaprPubSubClient implementation is to enable HttpClient injection for Pub/Sub operations, which the official Dapr SDK does not natively support. This is crucial for:

  • HttpClient Lifecycle Management: Proper disposal and connection pooling through DI container
  • Custom HttpClient Configuration: Timeouts, headers, handlers, and other HTTP-specific settings for pub/sub operations
  • Testing: Easy mocking and testing with injected HttpClient instances
  • Observability: Integration with logging, tracing, and monitoring through HttpClient handlers
  • Corporate Policies: Meeting enterprise requirements for HTTP client management and security
  • Named Client Support: Multiple pub/sub configurations for different scenarios

Key Features

  • Injectable HttpClient: Full control over HttpClient configuration and lifecycle for pub/sub operations
  • Named Client Support: Create multiple pub/sub clients with different configurations
  • Factory Pattern: IDaprPubSubClientFactory for flexible client creation and management
  • Configuration Binding: Support for IConfiguration integration with strongly-typed options
  • Default Topic Support: Configure default topics for simplified publishing
  • Metadata Support: Attach custom metadata to published messages
  • Bulk Publishing: Efficient bulk message publishing with BulkPublishAsync
  • Async/Await Support: Full asynchronous operation support throughout
  • Exception Handling: Leverages underlying DaprHttpClient exception handling
  • URL Encoding: Proper encoding of pub/sub component names and topic names
  • Query String Building: Metadata support via query string parameters

Architecture

Core Components

  • DaprPubSubClient (Abstract): Base class defining the pub/sub client contract
  • DefaultDaprPubSubClient: Concrete implementation using HttpClient for Dapr REST API calls
  • IDaprPubSubClientFactory: Factory interface for creating named clients
  • DefaultDaprPubSubClientFactory: Default factory implementation
  • DaprPubSubOptions: Configuration options for pub/sub clients
  • BulkPublishRequestEntry: Model for bulk publishing operations

Installation

# Core abstractions
dotnet add package DNV.Dapr.PubSub.Abstractions

# Full implementation with HttpClient injection
dotnet add package DNV.Dapr.PubSub

Getting Started

Basic Setup with Dependency Injection

Configure named Dapr PubSub clients with HttpClient injection:

using DNV.Dapr.PubSub;
using Microsoft.Extensions.DependencyInjection;

var services = new ServiceCollection();

// Register named Dapr PubSub client
services.AddDaprPubSubClient(
    "orderEvents", 
    o =>
    {
		o.HttpPort = 3501;
        o.PubSubName = "order-pubsub";
        o.TopicName = "order-events";
    }
);

var serviceProvider = services.BuildServiceProvider();
var factory = serviceProvider.GetRequiredService<IDaprPubSubClientFactory>();
var pubsubClient = factory.CreateClient("orderEvents");

Configuration with IConfiguration

// appsettings.json
{
  "PubSubClients": {
    "OrderEvents": {
      "PubSubName": "order-pubsub",
      "TopicName": "order-events"
    },
    "UserNotifications": {
      "PubSubName": "notification-pubsub",
      "TopicName": "user-notifications"
    }
  }
}
// Bind configuration sections
services.AddDaprPubSubClient(
    "OrderEvents",
    configuration.GetSection("PubSubClients:OrderEvents")
);

services.AddDaprPubSubClient(
    "UserNotifications", 
    configuration.GetSection("PubSubClients:UserNotifications")
);

Core API Methods

The DaprPubSubClient abstract class provides four main methods:

Single Message Publishing

// Publish to default topic
Task PublishAsync(object message, IDictionary<string, string>? metadata = null, CancellationToken ct = default)

// Publish to specific topic  
Task PublishAsync(object message, string topicName, IDictionary<string, string>? metadata = null, CancellationToken ct = default)

Bulk Message Publishing

// Bulk publish to default topic
Task BulkPublishAsync(IEnumerable<object> messages, IDictionary<string, string>? metadata = null, CancellationToken ct = default)

// Bulk publish to specific topic
Task BulkPublishAsync(IEnumerable<object> messages, string topicName, IDictionary<string, string>? metadata = null, CancellationToken ct = default)

Usage Examples

Basic Message Publishing

Publish to Default Topic
var orderEvent = new OrderCreatedEvent
{
    OrderId = "ORD-001",
    CustomerId = "CUST-123",
    Amount = 99.99m,
    CreatedAt = DateTime.UtcNow
};

// Publish single message to default topic
await pubsubClient.PublishAsync(orderEvent);
Publish to Specific Topic
// Override default topic
var userEvent = new UserRegisteredEvent
{
    UserId = "USER-456",
    Email = "user@example.com",
    RegistrationDate = DateTime.UtcNow
};

await pubsubClient.PublishAsync(userEvent, "user-events");

Bulk Publishing

Efficiently publish multiple messages in a single operation:

var orderEvents = new OrderCreatedEvent[]
{
    new() { OrderId = "ORD-001", CustomerId = "CUST-123" },
    new() { OrderId = "ORD-002", CustomerId = "CUST-456" },
    new() { OrderId = "ORD-003", CustomerId = "CUST-789" }
};

// Bulk publish to default topic
await pubsubClient.BulkPublishAsync(orderEvents);

// Bulk publish to specific topic
await pubsubClient.BulkPublishAsync(orderEvents, "order-events");

Publishing with Metadata

Add custom metadata to messages:

var metadata = new Dictionary<string, string>
{
    ["source"] = "order-service",
    ["version"] = "1.0",
    ["correlation-id"] = Guid.NewGuid().ToString(),
    ["trace-id"] = Activity.Current?.Id ?? "unknown"
};

var orderEvent = new OrderUpdatedEvent
{
    OrderId = "ORD-001",
    Status = "Shipped",
    UpdatedAt = DateTime.UtcNow
};

await pubsubClient.PublishAsync(orderEvent, metadata);

Multiple Named Clients

Configure and use multiple pub/sub clients for different business domains:

// Configure multiple clients
services.AddDaprPubSubClient("orders", options =>
{
    options.PubSubName = "order-pubsub";
    options.TopicName = "order-events";
});

services.AddDaprPubSubClient("notifications", options =>
{
    options.PubSubName = "notification-pubsub";
    options.TopicName = "email-notifications";
});

// Use in application
public class OrderService
{
    private readonly DaprPubSubClient _orderClient;
    private readonly DaprPubSubClient _notificationClient;

    public OrderService(IDaprPubSubClientFactory factory)
    {
        _orderClient = factory.CreateClient("orders");
        _notificationClient = factory.CreateClient("notifications");
    }

    public async Task ProcessOrderAsync(Order order)
    {
        // Publish order event
        await _orderClient.PublishAsync(new OrderCreatedEvent { OrderId = order.Id });
        
        // Publish notification
        await _notificationClient.PublishAsync(new EmailNotification 
        { 
            To = order.CustomerEmail, 
            Subject = "Order Confirmation" 
        });
    }
}

Advanced Configuration

HttpClient Customization

The key benefit of this implementation is full HttpClient control:

services.AddDaprPubSubClient("monitored", options =>
{
    options.PubSubName = "events";
    options.TopicName = "system-events";
});

// Configure the underlying HttpClient
services.ConfigureHttpClientDefaults(builder =>
{
    builder.AddStandardResilienceHandler(); // Polly integration
    builder.ConfigureHttpClient(client =>
    {
        client.Timeout = TimeSpan.FromSeconds(60);
        client.DefaultRequestHeaders.Add("X-Publisher", "MyService");
    });
});

Custom HttpClient Handlers

Add middleware-style handlers for cross-cutting concerns:

public class PublishingMetricsHandler : DelegatingHandler
{
    private readonly IMetrics _metrics;

    public PublishingMetricsHandler(IMetrics metrics)
    {
        _metrics = metrics;
    }

    protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
    {
        using var timer = _metrics.Measure.Timer.Time("pubsub.publish.duration");
        
        try
        {
            var response = await base.SendAsync(request, cancellationToken);
            _metrics.Measure.Counter.Increment("pubsub.publish.success");
            return response;
        }
        catch
        {
            _metrics.Measure.Counter.Increment("pubsub.publish.error");
            throw;
        }
    }
}

// Register the handler
services.AddHttpClient("monitored")
    .AddHttpMessageHandler<PublishingMetricsHandler>();

Cancellation Token Support

All methods support cancellation tokens for proper async operation control:

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

try
{
    await pubsubClient.PublishAsync(
        criticalEvent,
        "important-events", 
        metadata: null,
        ct: cts.Token
    );
}
catch (OperationCanceledException)
{
    Console.WriteLine("Publish operation was cancelled due to timeout");
}

Configuration Model

DaprPubSubOptions

public class DaprPubSubOptions : DaprHttpClientOptions
{
    /// <summary>
    /// Name of the Dapr pub/sub component (required).
    /// Must match the component name in your Dapr configuration.
    /// </summary>
    public string? PubSubName { get; set; }

    /// <summary>
    /// Default topic name for publishing (optional).
    /// Can be overridden in individual publish calls.
    /// </summary>
    public string? TopicName { get; set; }
}

Configuration Examples

Environment-based configuration:

{
  "PubSubClients": {
    "Orders": {
      "PubSubName": "redis-pubsub",
      "TopicName": "order-events"
    }
  }
}

Multiple environments:

{
  "Development": {
    "PubSubClients": {
      "Orders": {
        "PubSubName": "memory-pubsub",
        "TopicName": "dev-order-events"
      }
    }
  },
  "Production": {
    "PubSubClients": {
      "Orders": {
        "PubSubName": "azure-servicebus",
        "TopicName": "prod-order-events"
      }
    }
  }
}

Testing

Unit Testing with Mock HttpClient

[Test]
public async Task PublishAsync_WithValidMessage_PublishesSuccessfully()
{
    // Arrange
    var mockHandler = new MockHttpMessageHandler();
    mockHandler
        .When("POST", "http://localhost:3500/v1.0/publish/test-pubsub/test-topic")
        .Respond(HttpStatusCode.NoContent);

    var httpClient = new HttpClient(mockHandler)
    {
        BaseAddress = new Uri("http://localhost:3500")
    };

    var options = new DaprPubSubOptions
    {
        PubSubName = "test-pubsub",
        TopicName = "test-topic"
    };

    var client = new DefaultDaprPubSubClient(httpClient, options);
    var testEvent = new { Message = "Test Event", Timestamp = DateTime.UtcNow };

    // Act
    await client.PublishAsync(testEvent);

    // Assert
    mockHandler.VerifyNoOutstandingExpectation();
}

Troubleshooting

Common Issues

  1. "PubSub component not found"

    • Verify PubSubName matches your Dapr component configuration
    • Check Dapr sidecar logs for component loading errors
  2. "Topic not found"

    • Some pub/sub backends require manual topic creation
    • Check if your pub/sub component supports auto-topic creation
  3. Message size limits

    • Redis: ~512MB default
    • Azure Service Bus: 1MB (Standard), 100MB (Premium)
    • Consider message compression for large payloads
  4. Serialization issues

    • Ensure your event models are JSON serializable
    • Check for circular references in object graphs
  5. Network timeouts

    • Increase HttpClient timeout for large messages
    • Implement retry policies for transient failures
Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2.1.0 1,277 2/8/2025
2.1.0-preview.1756698443 332 9/1/2025
2.1.0-preview.1756370241 167 8/28/2025
2.1.0-preview.1756358515 162 8/28/2025
2.1.0-preview.1756200584 193 8/26/2025
2.1.0-preview.1756199251 181 8/26/2025
2.1.0-preview.1755771145 118 8/21/2025