TAF.Infra.MassTransit 1.0.4

dotnet add package TAF.Infra.MassTransit --version 1.0.4
                    
NuGet\Install-Package TAF.Infra.MassTransit -Version 1.0.4
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="TAF.Infra.MassTransit" Version="1.0.4" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="TAF.Infra.MassTransit" Version="1.0.4" />
                    
Directory.Packages.props
<PackageReference Include="TAF.Infra.MassTransit" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add TAF.Infra.MassTransit --version 1.0.4
                    
#r "nuget: TAF.Infra.MassTransit, 1.0.4"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package TAF.Infra.MassTransit@1.0.4
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=TAF.Infra.MassTransit&version=1.0.4
                    
Install as a Cake Addin
#tool nuget:?package=TAF.Infra.MassTransit&version=1.0.4
                    
Install as a Cake Tool

TAF.Infra.MassTransit

A comprehensive abstraction layer over MediatR and MassTransit that provides unified Command, Query, and Event handling patterns with support for both local (in-process) and distributed (message broker) execution.

๐Ÿš€ Features

  • Unified CQRS Pattern: Clean separation of Commands, Queries, and Events
  • Dual Execution Modes:
    • Local execution via MediatR
    • Distributed execution via MassTransit (RabbitMQ, Azure Service Bus, Amazon SQS)
  • Smart Queue Naming: Configurable naming conventions with automatic sanitization and duplicate suffix removal
  • Context-Aware: Built-in context propagation for multi-tenant, multi-app scenarios with helper utilities
  • Multiple Message Brokers: Support for RabbitMQ, Azure Service Bus, Amazon SQS, and In-Memory
  • Automatic Registration: Auto-discovery and registration of handlers
  • Retry Policies: Configurable retry mechanisms with exponential backoff
  • Type Safety: Strongly-typed message contracts and handlers
  • Environment Support: Multi-environment queue/topic naming with project and environment identifiers

๐Ÿ“ฆ Installation

dotnet add package TAF.Infra.MassTransit

๐Ÿ—ƒ๏ธ Architecture Overview

The library provides three main abstractions:

  • Commands: Actions that change system state (fire-and-forget or with response)
  • Queries: Read-only operations that return data (always local via MediatR)
  • Events: Notifications about things that have happened (distributed via message broker)

Execution Patterns

Pattern Local (MediatR) Distributed (MassTransit)
Commands โœ… SendAsync() โœ… PublishAsync()
Queries โœ… SendAsync() โŒ (Always local)
Events โŒ โœ… PublishAsync()

Queue/Topic Naming Convention

The library automatically generates structured names for queues and topics:

  • Commands: {projectName}-{environment}-command-{commandName}
  • Events: {projectName}-{environment}-event-{eventName}

Examples:

  • CreateUserCommand โ†’ myproject-prod-command-createuser
  • UserCreatedEvent โ†’ myproject-prod-event-usercreated

๐Ÿš€ Quick Start

1. Configuration

appsettings.json:

{
  "MessageBus": {
    "Host": "localhost",
    "Port": 5672,
    "Username": "guest",
    "Password": "guest",
    "VirtualHost": "/",
    "ExchangeName": "default",
    "RetryLimit": 5,
    "UseMessageScheduler": true
  }
}

Program.cs:

using TAF.Infra.MassTransit;
using TAF.Infra.MassTransit.Enums;

var builder = WebApplication.CreateBuilder(args);

// Basic configuration from appsettings
builder.Services.AddMessageBus(builder.Configuration);

// With custom naming conventions
builder.Services.AddMessageBusWithNaming(
    builder.Configuration,
    projectName: "MyProject",
    environment: "production"
);

// Manual configuration with naming
builder.Services.AddMessageBusWithNaming(
    options =>
    {
        options.Provider = MessageBusConfigurationEnum.RabbitMQ;
        options.Settings["Host"] = "localhost";
        options.Settings["Username"] = "guest";
        options.Settings["Password"] = "guest";
    },
    projectName: "MyProject",
    environment: "dev",
    commandQueueTemplate: "{projectName}-{environment}-cmd-{commandName}",
    eventTopicTemplate: "{projectName}-{environment}-evt-{eventName}"
);

2. Create Messages

Commands:

using TAF.Infra.MassTransit.Base;

// Fire-and-forget command
public class CreateUserCommand : BaseCommand
{
    public string Name { get; set; }
    public string Email { get; set; }
}

// Command with response
public class GetUserCommand : BaseCommand<UserResponse>
{
    public Guid UserId { get; set; }
}

public class UserResponse
{
    public string Name { get; set; }
    public string Email { get; set; }
}

Queries:

using TAF.Infra.MassTransit.Base;

public class GetUsersQuery : BaseQuery<List<UserResponse>>
{
    public int PageSize { get; set; } = 10;
    public int PageNumber { get; set; } = 1;
}

Events:

using TAF.Infra.MassTransit.Base;

public class UserCreatedEvent : BaseEvent
{
    public Guid UserId { get; set; }
    public string Name { get; set; }
    public string Email { get; set; }
}

3. Create Handlers

Command Handler:

using TAF.Infra.MassTransit.Abstractions;
using TAF.Infra.MassTransit.Abstractions.Command;

public class CreateUserCommandHandler : ICommandHandler<CreateUserCommand>
{
    private readonly IUserRepository _userRepository;
    private readonly IEventBus _eventBus;

    public CreateUserCommandHandler(IUserRepository userRepository, IEventBus eventBus)
    {
        _userRepository = userRepository;
        _eventBus = eventBus;
    }

    public async Task HandleAsync(CreateUserCommand command, Context context, 
        CancellationToken cancellationToken = default)
    {
        var user = new User 
        { 
            Name = command.Name, 
            Email = command.Email 
        };
        
        await _userRepository.CreateAsync(user);

        // Publish event
        var userCreatedEvent = new UserCreatedEvent
        {
            UserId = user.Id,
            Name = user.Name,
            Email = user.Email
        };

        await _eventBus.PublishAsync(userCreatedEvent, context, cancellationToken);
    }
}

Query Handler:

using TAF.Infra.MassTransit.Abstractions.Query;

public class GetUsersQueryHandler : IQueryHandler<GetUsersQuery, List<UserResponse>>
{
    private readonly IUserRepository _userRepository;

    public GetUsersQueryHandler(IUserRepository userRepository)
    {
        _userRepository = userRepository;
    }

    public async Task<List<UserResponse>> HandleAsync(GetUsersQuery query, Context context, 
        CancellationToken cancellationToken = default)
    {
        var users = await _userRepository.GetPagedAsync(query.PageNumber, query.PageSize);
        return users.Select(u => new UserResponse { Name = u.Name, Email = u.Email }).ToList();
    }
}

Event Handler:

using TAF.Infra.MassTransit.Abstractions.Event;

public class UserCreatedEventHandler : IEventHandler<UserCreatedEvent>
{
    private readonly IEmailService _emailService;

    public UserCreatedEventHandler(IEmailService emailService)
    {
        _emailService = emailService;
    }

    public async Task HandleAsync(UserCreatedEvent @event, Context context, 
        CancellationToken cancellationToken = default)
    {
        await _emailService.SendWelcomeEmailAsync(@event.Email, @event.Name);
    }
}

4. Usage in Controllers/Services

using TAF.Infra.MassTransit.Helpers;

[ApiController]
[Route("api/[controller]")]
public class UsersController : ControllerBase
{
    private readonly ICommandBus _commandBus;
    private readonly IQueryBus _queryBus;

    public UsersController(ICommandBus commandBus, IQueryBus queryBus)
    {
        _commandBus = commandBus;
        _queryBus = queryBus;
    }

    [HttpPost]
    public async Task<IActionResult> CreateUser([FromBody] CreateUserRequest request)
    {
        // Create context from HTTP headers (extracts tenant, user info, etc.)
        var context = ContextHelper.CreateFromHeaders(Request.Headers);

        var command = new CreateUserCommand 
        { 
            Name = request.Name, 
            Email = request.Email 
        };

        // Execute locally via MediatR
        await _commandBus.SendAsync<CreateUserCommand, Unit>(command, context);
        
        // OR execute distributed via MassTransit
        // Queue: myproject-prod-command-createuser
        await _commandBus.PublishAsync(command, context);

        return Ok();
    }

    [HttpGet]
    public async Task<IActionResult> GetUsers([FromQuery] int page = 1, [FromQuery] int size = 10)
    {
        // Create context from parameters
        var context = ContextHelper.CreateFromParameters(
            userId: User.Identity.Name,
            tenantId: "tenant-123",
            appId: "web-api",
            environmentId: "production"
        );

        var query = new GetUsersQuery 
        { 
            PageNumber = page, 
            PageSize = size 
        };

        // Queries are always executed locally
        var result = await _queryBus.SendAsync<GetUsersQuery, List<UserResponse>>(query, context);

        return Ok(result);
    }
}

โš™๏ธ Configuration Options

Message Broker Providers

services.AddMessageBus(options =>
{
    // RabbitMQ (default)
    options.Provider = MessageBusConfigurationEnum.RabbitMQ;
    
    // Azure Service Bus
    options.Provider = MessageBusConfigurationEnum.AzureServiceBus;
    
    // Amazon SQS
    options.Provider = MessageBusConfigurationEnum.AmazonSQS;
    
    // In-Memory (testing)
    options.Provider = MessageBusConfigurationEnum.InMemory;
});

Smart Naming Configuration

// Default templates: {projectName}-{environment}-command-{commandName}
services.AddMessageBusWithNaming(
    configuration,
    projectName: "MyProject",        // Default: assembly name
    environment: "production",       // Default: "dev"
    commandQueueTemplate: "{projectName}.{environment}.cmd.{commandName}",
    eventTopicTemplate: "{projectName}.{environment}.evt.{eventName}"
);

Naming Examples:

// Class: CreateUserCommand
// Result: myproject-prod-command-createuser

// Class: CreateUserCommandCommand (duplicate suffix)
// Result: myproject-prod-command-createusercommand (removes last "Command")

// Class: UserCreatedEvent  
// Result: myproject-prod-event-usercreated

// Class: UserCreatedEventEvent (duplicate suffix)
// Result: myproject-prod-event-usercreatedevent (removes last "Event")

Advanced Configuration

services.Configure<MessageBrokerSettings>(options =>
{
    options.Host = "rabbitmq.example.com";
    options.Port = 5672;
    options.Username = "myuser";
    options.Password = "mypassword";
    options.VirtualHost = "/myapp";
    options.UseSsl = true;
    options.PrefetchCount = 50;
    
    // Retry settings
    options.MassTransit.RetryPolicy.RetryLimit = 3;
    options.MassTransit.RetryPolicy.InitialIntervalMs = 1000;
    options.MassTransit.RetryPolicy.MaxIntervalMs = 30000;
});

๐Ÿ› ๏ธ Context and Multi-Tenancy

The library provides built-in context support for cross-cutting concerns:

Context Helper Utilities

using TAF.Infra.MassTransit.Helpers;

// Create context from HTTP headers (in controllers/middleware)
var context = ContextHelper.CreateFromHeaders(Request.Headers);

// Create context from parameters (in services/background jobs)
var context = ContextHelper.CreateFromParameters(
    userId: "user-123",
    tenantId: "tenant-456",
    appId: "web-api",
    environmentId: "production",
    correlationId: Guid.NewGuid(),
    properties: new Dictionary<string, object>
    {
        { "TraceId", "trace-789" },
        { "ClientIP", "192.168.1.1" }
    }
);

// Manual context creation
var context = new Context
{
    CorrelationId = Guid.NewGuid(),
    TenantId = "tenant-123",
    AppId = "web-app",
    UserId = "user-456",
    EnvironmentId = "production"
};

// Add custom properties
context.SetProperty("TraceId", "trace-789");
context.SetProperty("ClientIP", "192.168.1.1");

Context in Middleware

public class ContextMiddleware
{
    public async Task InvokeAsync(HttpContext httpContext, RequestDelegate next)
    {
        // Automatically extract context from headers
        var messageContext = ContextHelper.CreateFromHeaders(httpContext.Request.Headers);
        
        // Store in some context provider for later use
        using var scope = new ContextScope(messageContext);
        
        await next(httpContext);
    }
}

๐Ÿ“„ Execution Patterns

Local Execution (MediatR)

  • Fast, in-process execution
  • Suitable for simple operations
  • Automatic transaction boundaries
  • Commands and Queries

Distributed Execution (MassTransit)

  • Resilient, scalable execution
  • Cross-service communication
  • Built-in retry and error handling
  • Commands and Events only

๐Ÿงช Testing Support

// Use In-Memory provider for testing
services.AddMessageBusWithNaming(
    options =>
    {
        options.Provider = MessageBusConfigurationEnum.InMemory;
    },
    projectName: "TestProject",
    environment: "test"
);

๐Ÿ”ง Advanced Features

Custom Base Classes

public class AuditableCommand : BaseCommand
{
    public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
    public string CreatedBy { get; set; }
}

public class CreateOrderCommand : AuditableCommand
{
    public List<OrderItem> Items { get; set; }
    public decimal Total { get; set; }
}

Multiple Handlers

// Multiple event handlers for the same event
public class EmailNotificationHandler : IEventHandler<UserCreatedEvent> { ... }
public class AuditLogHandler : IEventHandler<UserCreatedEvent> { ... }
public class MetricsHandler : IEventHandler<UserCreatedEvent> { ... }

Queue Name Resolution

// The library automatically handles complex naming scenarios:

// Full class name: MyApp.Commands.User.CreateUserCommand
// Generated queue: myapp-prod-command-createuser

// Full class name: MyApp.Events.Order.OrderShippedEvent  
// Generated topic: myapp-prod-event-ordershipped

// Duplicate suffix handling:
// CreateUserCommandCommand โ†’ myapp-prod-command-createusercommand
// UserCreatedEventEvent โ†’ myapp-prod-event-usercreatedevent

๐Ÿ“š API Reference

Core Interfaces

  • ICommandBus - Command execution (local/distributed)
  • IQueryBus - Query execution (local only)
  • IEventBus - Event publishing (distributed)
  • ICommandHandler<T> - Command handler interface
  • ICommandHandler<T, R> - Command handler with response interface
  • IQueryHandler<T, R> - Query handler interface
  • IEventHandler<T> - Event handler interface

Base Classes

  • BaseCommand - Base class for commands
  • BaseCommand<T> - Base class for commands with response
  • BaseQuery<T> - Base class for queries
  • BaseEvent - Base class for events

Helper Classes

  • ContextHelper - Static helper for context creation
  • MessageNamingConfiguration - Configuration for queue/topic naming
  • ContextScope - Scoped context management

Configuration Classes

  • MessageBusConfiguration - Main configuration
  • MessageBrokerSettings - Broker-specific settings
  • MessageNamingConfiguration - Naming convention settings

๐Ÿค Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

๐Ÿ“„ License

This project is licensed under the MIT License.

๐Ÿ› Issues

If you encounter any issues, please file them in the GitHub Issues section.


Built with โค๏ธ for distributed systems

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Initial release featuring:
- Unified CQRS pattern with Commands, Queries, and Events
- Dual execution modes (local via MediatR, distributed via MassTransit)
- Support for RabbitMQ, Azure Service Bus, Amazon SQS, and In-Memory brokers
- Built-in context propagation for multi-tenant scenarios
- Automatic handler discovery and registration
- Configurable retry policies and error handling
- Type-safe message contracts and handlers