TAF.Infra.MassTransit 1.0.3

There is a newer version of this package available.
See the version list below for details.
dotnet add package TAF.Infra.MassTransit --version 1.0.3
                    
NuGet\Install-Package TAF.Infra.MassTransit -Version 1.0.3
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="TAF.Infra.MassTransit" Version="1.0.3" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="TAF.Infra.MassTransit" Version="1.0.3" />
                    
Directory.Packages.props
<PackageReference Include="TAF.Infra.MassTransit" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add TAF.Infra.MassTransit --version 1.0.3
                    
#r "nuget: TAF.Infra.MassTransit, 1.0.3"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package TAF.Infra.MassTransit@1.0.3
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=TAF.Infra.MassTransit&version=1.0.3
                    
Install as a Cake Addin
#tool nuget:?package=TAF.Infra.MassTransit&version=1.0.3
                    
Install as a Cake Tool

TAF.Infra.MassTransit

A comprehensive abstraction layer over MediatR and MassTransit that provides unified Command, Query, and Event handling patterns with support for both local (in-process) and distributed (message broker) execution.

๐Ÿš€ Features

  • Unified CQRS Pattern: Clean separation of Commands, Queries, and Events
  • Dual Execution Modes:
    • Local execution via MediatR
    • Distributed execution via MassTransit (RabbitMQ, Azure Service Bus, Amazon SQS)
  • Context-Aware: Built-in context propagation for multi-tenant, multi-app scenarios
  • Multiple Message Brokers: Support for RabbitMQ, Azure Service Bus, Amazon SQS, and In-Memory
  • Automatic Registration: Auto-discovery and registration of handlers
  • Retry Policies: Configurable retry mechanisms with exponential backoff
  • Type Safety: Strongly-typed message contracts and handlers

๐Ÿ“ฆ Installation

dotnet add package TAF.Infra.MassTransit

๐Ÿ—๏ธ Architecture Overview

The library provides three main abstractions:

  • Commands: Actions that change system state (fire-and-forget or with response)
  • Queries: Read-only operations that return data (always local via MediatR)
  • Events: Notifications about things that have happened (distributed via message broker)

Execution Patterns

Pattern Local (MediatR) Distributed (MassTransit)
Commands โœ… SendAsync() โœ… PublishAsync()
Queries โœ… SendAsync() โŒ (Always local)
Events โŒ โœ… PublishAsync()

๐Ÿš€ Quick Start

1. Configuration

appsettings.json:

{
  "MessageBus": {
    "Host": "localhost",
    "Port": 5672,
    "Username": "guest",
    "Password": "guest",
    "VirtualHost": "/",
    "ExchangeName": "default",
    "RetryLimit": 5,
    "UseMessageScheduler": true
  }
}

Program.cs:

using TAF.Infra.MassTransit;
using TAF.Infra.MassTransit.Enums;

var builder = WebApplication.CreateBuilder(args);

// Add message bus with RabbitMQ
builder.Services.AddMessageBus(options =>
{
    options.Provider = MessageBusConfigurationEnum.RabbitMQ;
    options.Settings["Host"] = "localhost";
    options.Settings["Username"] = "guest";
    options.Settings["Password"] = "guest";
});

// Or configure from appsettings.json
builder.Services.AddMessageBus(builder.Configuration);

2. Create Messages

Commands:

using TAF.Infra.MassTransit.Base;

// Fire-and-forget command
public class CreateUserCommand : BaseCommand
{
    public string Name { get; set; }
    public string Email { get; set; }
}

// Command with response
public class GetUserCommand : BaseCommand<UserResponse>
{
    public Guid UserId { get; set; }
}

public class UserResponse
{
    public string Name { get; set; }
    public string Email { get; set; }
}

Queries:

using TAF.Infra.MassTransit.Base;

public class GetUsersQuery : BaseQuery<List<UserResponse>>
{
    public int PageSize { get; set; } = 10;
    public int PageNumber { get; set; } = 1;
}

Events:

using TAF.Infra.MassTransit.Base;

public class UserCreatedEvent : BaseEvent
{
    public Guid UserId { get; set; }
    public string Name { get; set; }
    public string Email { get; set; }
}

3. Create Handlers

Command Handler:

using TAF.Infra.MassTransit.Abstractions;
using TAF.Infra.MassTransit.Abstractions.Command;

public class CreateUserCommandHandler : ICommandHandler<CreateUserCommand>
{
    private readonly IUserRepository _userRepository;
    private readonly IEventBus _eventBus;

    public CreateUserCommandHandler(IUserRepository userRepository, IEventBus eventBus)
    {
        _userRepository = userRepository;
        _eventBus = eventBus;
    }

    public async Task HandleAsync(CreateUserCommand command, Context context, 
        CancellationToken cancellationToken = default)
    {
        var user = new User 
        { 
            Name = command.Name, 
            Email = command.Email 
        };
        
        await _userRepository.CreateAsync(user);

        // Publish event
        var userCreatedEvent = new UserCreatedEvent
        {
            UserId = user.Id,
            Name = user.Name,
            Email = user.Email
        };

        await _eventBus.PublishAsync(userCreatedEvent, context, cancellationToken);
    }
}

Query Handler:

using TAF.Infra.MassTransit.Abstractions.Query;

public class GetUsersQueryHandler : IQueryHandler<GetUsersQuery, List<UserResponse>>
{
    private readonly IUserRepository _userRepository;

    public GetUsersQueryHandler(IUserRepository userRepository)
    {
        _userRepository = userRepository;
    }

    public async Task<List<UserResponse>> HandleAsync(GetUsersQuery query, Context context, 
        CancellationToken cancellationToken = default)
    {
        var users = await _userRepository.GetPagedAsync(query.PageNumber, query.PageSize);
        return users.Select(u => new UserResponse { Name = u.Name, Email = u.Email }).ToList();
    }
}

Event Handler:

using TAF.Infra.MassTransit.Abstractions.Event;

public class UserCreatedEventHandler : IEventHandler<UserCreatedEvent>
{
    private readonly IEmailService _emailService;

    public UserCreatedEventHandler(IEmailService emailService)
    {
        _emailService = emailService;
    }

    public async Task HandleAsync(UserCreatedEvent @event, Context context, 
        CancellationToken cancellationToken = default)
    {
        await _emailService.SendWelcomeEmailAsync(@event.Email, @event.Name);
    }
}

4. Usage in Controllers/Services

[ApiController]
[Route("api/[controller]")]
public class UsersController : ControllerBase
{
    private readonly ICommandBus _commandBus;
    private readonly IQueryBus _queryBus;

    public UsersController(ICommandBus commandBus, IQueryBus queryBus)
    {
        _commandBus = commandBus;
        _queryBus = queryBus;
    }

    [HttpPost]
    public async Task<IActionResult> CreateUser([FromBody] CreateUserRequest request)
    {
        var context = new Context
        {
            CorrelationId = Guid.NewGuid(),
            UserId = User.Identity.Name,
            TenantId = "tenant-1"
        };

        var command = new CreateUserCommand 
        { 
            Name = request.Name, 
            Email = request.Email 
        };

        // Execute locally via MediatR
        await _commandBus.SendAsync<CreateUserCommand, Unit>(command, context);
        
        // OR execute distributed via MassTransit
        await _commandBus.PublishAsync(command, context, "user-service-queue");

        return Ok();
    }

    [HttpGet]
    public async Task<IActionResult> GetUsers([FromQuery] int page = 1, [FromQuery] int size = 10)
    {
        var context = new Context
        {
            CorrelationId = Guid.NewGuid(),
            UserId = User.Identity.Name
        };

        var query = new GetUsersQuery 
        { 
            PageNumber = page, 
            PageSize = size 
        };

        // Queries are always executed locally
        var result = await _queryBus.SendAsync<GetUsersQuery, List<UserResponse>>(query, context);

        return Ok(result);
    }
}

โš™๏ธ Configuration Options

Message Broker Providers

services.AddMessageBus(options =>
{
    // RabbitMQ (default)
    options.Provider = MessageBusConfigurationEnum.RabbitMQ;
    
    // Azure Service Bus
    options.Provider = MessageBusConfigurationEnum.AzureServiceBus;
    
    // Amazon SQS
    options.Provider = MessageBusConfigurationEnum.AmazonSQS;
    
    // In-Memory (testing)
    options.Provider = MessageBusConfigurationEnum.InMemory;
});

Advanced Configuration

services.Configure<MessageBrokerSettings>(options =>
{
    options.Host = "rabbitmq.example.com";
    options.Port = 5672;
    options.Username = "myuser";
    options.Password = "mypassword";
    options.VirtualHost = "/myapp";
    options.UseSsl = true;
    options.PrefetchCount = 50;
    
    // Retry settings
    options.MassTransit.RetryPolicy.RetryLimit = 3;
    options.MassTransit.RetryPolicy.InitialIntervalMs = 1000;
    options.MassTransit.RetryPolicy.MaxIntervalMs = 30000;
});

๐Ÿ›๏ธ Context and Multi-Tenancy

The library provides built-in context support for cross-cutting concerns:

var context = new Context
{
    CorrelationId = Guid.NewGuid(),
    TenantId = "tenant-123",
    AppId = "web-app",
    UserId = "user-456",
    EnvironmentId = "production"
};

// Add custom properties
context.SetProperty("TraceId", "trace-789");
context.SetProperty("ClientIP", "192.168.1.1");

๐Ÿ”„ Execution Patterns

Local Execution (MediatR)

  • Fast, in-process execution
  • Suitable for simple operations
  • Automatic transaction boundaries

Distributed Execution (MassTransit)

  • Resilient, scalable execution
  • Cross-service communication
  • Built-in retry and error handling

๐Ÿงช Testing Support

// Use In-Memory provider for testing
services.AddMessageBus(options =>
{
    options.Provider = MessageBusConfigurationEnum.InMemory;
});

๐Ÿ”ง Advanced Features

Custom Base Classes

public class AuditableCommand : BaseCommand
{
    public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
    public string CreatedBy { get; set; }
}

public class CreateOrderCommand : AuditableCommand
{
    public List<OrderItem> Items { get; set; }
    public decimal Total { get; set; }
}

Multiple Handlers

// Multiple event handlers for the same event
public class EmailNotificationHandler : IEventHandler<UserCreatedEvent> { ... }
public class AuditLogHandler : IEventHandler<UserCreatedEvent> { ... }
public class MetricsHandler : IEventHandler<UserCreatedEvent> { ... }

๐Ÿ“š API Reference

Core Interfaces

  • ICommandBus - Command execution (local/distributed)
  • IQueryBus - Query execution (local only)
  • IEventBus - Event publishing (distributed)
  • ICommandHandler<T> - Command handler interface
  • IQueryHandler<T, R> - Query handler interface
  • IEventHandler<T> - Event handler interface

Base Classes

  • BaseCommand - Base class for commands
  • BaseCommand<T> - Base class for commands with response
  • BaseQuery<T> - Base class for queries
  • BaseEvent - Base class for events

๐Ÿค Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

๐Ÿ“„ License

This project is licensed under the MIT License.

๐Ÿ› Issues

If you encounter any issues, please file them in the GitHub Issues section.


Built with โค๏ธ for distributed systems

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Initial release featuring:
- Unified CQRS pattern with Commands, Queries, and Events
- Dual execution modes (local via MediatR, distributed via MassTransit)
- Support for RabbitMQ, Azure Service Bus, Amazon SQS, and In-Memory brokers
- Built-in context propagation for multi-tenant scenarios
- Automatic handler discovery and registration
- Configurable retry policies and error handling
- Type-safe message contracts and handlers