TAF.Infra.MassTransit
1.0.4
dotnet add package TAF.Infra.MassTransit --version 1.0.4
NuGet\Install-Package TAF.Infra.MassTransit -Version 1.0.4
<PackageReference Include="TAF.Infra.MassTransit" Version="1.0.4" />
<PackageVersion Include="TAF.Infra.MassTransit" Version="1.0.4" />
<PackageReference Include="TAF.Infra.MassTransit" />
paket add TAF.Infra.MassTransit --version 1.0.4
#r "nuget: TAF.Infra.MassTransit, 1.0.4"
#:package TAF.Infra.MassTransit@1.0.4
#addin nuget:?package=TAF.Infra.MassTransit&version=1.0.4
#tool nuget:?package=TAF.Infra.MassTransit&version=1.0.4
TAF.Infra.MassTransit
A comprehensive abstraction layer over MediatR and MassTransit that provides unified Command, Query, and Event handling patterns with support for both local (in-process) and distributed (message broker) execution.
๐ Features
- Unified CQRS Pattern: Clean separation of Commands, Queries, and Events
- Dual Execution Modes:
- Local execution via MediatR
- Distributed execution via MassTransit (RabbitMQ, Azure Service Bus, Amazon SQS)
- Smart Queue Naming: Configurable naming conventions with automatic sanitization and duplicate suffix removal
- Context-Aware: Built-in context propagation for multi-tenant, multi-app scenarios with helper utilities
- Multiple Message Brokers: Support for RabbitMQ, Azure Service Bus, Amazon SQS, and In-Memory
- Automatic Registration: Auto-discovery and registration of handlers
- Retry Policies: Configurable retry mechanisms with exponential backoff
- Type Safety: Strongly-typed message contracts and handlers
- Environment Support: Multi-environment queue/topic naming with project and environment identifiers
๐ฆ Installation
dotnet add package TAF.Infra.MassTransit
๐๏ธ Architecture Overview
The library provides three main abstractions:
- Commands: Actions that change system state (fire-and-forget or with response)
- Queries: Read-only operations that return data (always local via MediatR)
- Events: Notifications about things that have happened (distributed via message broker)
Execution Patterns
Pattern | Local (MediatR) | Distributed (MassTransit) |
---|---|---|
Commands | โ
SendAsync() |
โ
PublishAsync() |
Queries | โ
SendAsync() |
โ (Always local) |
Events | โ | โ
PublishAsync() |
Queue/Topic Naming Convention
The library automatically generates structured names for queues and topics:
- Commands:
{projectName}-{environment}-command-{commandName}
- Events:
{projectName}-{environment}-event-{eventName}
Examples:
CreateUserCommand
โmyproject-prod-command-createuser
UserCreatedEvent
โmyproject-prod-event-usercreated
๐ Quick Start
1. Configuration
appsettings.json:
{
"MessageBus": {
"Host": "localhost",
"Port": 5672,
"Username": "guest",
"Password": "guest",
"VirtualHost": "/",
"ExchangeName": "default",
"RetryLimit": 5,
"UseMessageScheduler": true
}
}
Program.cs:
using TAF.Infra.MassTransit;
using TAF.Infra.MassTransit.Enums;
var builder = WebApplication.CreateBuilder(args);
// Basic configuration from appsettings
builder.Services.AddMessageBus(builder.Configuration);
// With custom naming conventions
builder.Services.AddMessageBusWithNaming(
builder.Configuration,
projectName: "MyProject",
environment: "production"
);
// Manual configuration with naming
builder.Services.AddMessageBusWithNaming(
options =>
{
options.Provider = MessageBusConfigurationEnum.RabbitMQ;
options.Settings["Host"] = "localhost";
options.Settings["Username"] = "guest";
options.Settings["Password"] = "guest";
},
projectName: "MyProject",
environment: "dev",
commandQueueTemplate: "{projectName}-{environment}-cmd-{commandName}",
eventTopicTemplate: "{projectName}-{environment}-evt-{eventName}"
);
2. Create Messages
Commands:
using TAF.Infra.MassTransit.Base;
// Fire-and-forget command
public class CreateUserCommand : BaseCommand
{
public string Name { get; set; }
public string Email { get; set; }
}
// Command with response
public class GetUserCommand : BaseCommand<UserResponse>
{
public Guid UserId { get; set; }
}
public class UserResponse
{
public string Name { get; set; }
public string Email { get; set; }
}
Queries:
using TAF.Infra.MassTransit.Base;
public class GetUsersQuery : BaseQuery<List<UserResponse>>
{
public int PageSize { get; set; } = 10;
public int PageNumber { get; set; } = 1;
}
Events:
using TAF.Infra.MassTransit.Base;
public class UserCreatedEvent : BaseEvent
{
public Guid UserId { get; set; }
public string Name { get; set; }
public string Email { get; set; }
}
3. Create Handlers
Command Handler:
using TAF.Infra.MassTransit.Abstractions;
using TAF.Infra.MassTransit.Abstractions.Command;
public class CreateUserCommandHandler : ICommandHandler<CreateUserCommand>
{
private readonly IUserRepository _userRepository;
private readonly IEventBus _eventBus;
public CreateUserCommandHandler(IUserRepository userRepository, IEventBus eventBus)
{
_userRepository = userRepository;
_eventBus = eventBus;
}
public async Task HandleAsync(CreateUserCommand command, Context context,
CancellationToken cancellationToken = default)
{
var user = new User
{
Name = command.Name,
Email = command.Email
};
await _userRepository.CreateAsync(user);
// Publish event
var userCreatedEvent = new UserCreatedEvent
{
UserId = user.Id,
Name = user.Name,
Email = user.Email
};
await _eventBus.PublishAsync(userCreatedEvent, context, cancellationToken);
}
}
Query Handler:
using TAF.Infra.MassTransit.Abstractions.Query;
public class GetUsersQueryHandler : IQueryHandler<GetUsersQuery, List<UserResponse>>
{
private readonly IUserRepository _userRepository;
public GetUsersQueryHandler(IUserRepository userRepository)
{
_userRepository = userRepository;
}
public async Task<List<UserResponse>> HandleAsync(GetUsersQuery query, Context context,
CancellationToken cancellationToken = default)
{
var users = await _userRepository.GetPagedAsync(query.PageNumber, query.PageSize);
return users.Select(u => new UserResponse { Name = u.Name, Email = u.Email }).ToList();
}
}
Event Handler:
using TAF.Infra.MassTransit.Abstractions.Event;
public class UserCreatedEventHandler : IEventHandler<UserCreatedEvent>
{
private readonly IEmailService _emailService;
public UserCreatedEventHandler(IEmailService emailService)
{
_emailService = emailService;
}
public async Task HandleAsync(UserCreatedEvent @event, Context context,
CancellationToken cancellationToken = default)
{
await _emailService.SendWelcomeEmailAsync(@event.Email, @event.Name);
}
}
4. Usage in Controllers/Services
using TAF.Infra.MassTransit.Helpers;
[ApiController]
[Route("api/[controller]")]
public class UsersController : ControllerBase
{
private readonly ICommandBus _commandBus;
private readonly IQueryBus _queryBus;
public UsersController(ICommandBus commandBus, IQueryBus queryBus)
{
_commandBus = commandBus;
_queryBus = queryBus;
}
[HttpPost]
public async Task<IActionResult> CreateUser([FromBody] CreateUserRequest request)
{
// Create context from HTTP headers (extracts tenant, user info, etc.)
var context = ContextHelper.CreateFromHeaders(Request.Headers);
var command = new CreateUserCommand
{
Name = request.Name,
Email = request.Email
};
// Execute locally via MediatR
await _commandBus.SendAsync<CreateUserCommand, Unit>(command, context);
// OR execute distributed via MassTransit
// Queue: myproject-prod-command-createuser
await _commandBus.PublishAsync(command, context);
return Ok();
}
[HttpGet]
public async Task<IActionResult> GetUsers([FromQuery] int page = 1, [FromQuery] int size = 10)
{
// Create context from parameters
var context = ContextHelper.CreateFromParameters(
userId: User.Identity.Name,
tenantId: "tenant-123",
appId: "web-api",
environmentId: "production"
);
var query = new GetUsersQuery
{
PageNumber = page,
PageSize = size
};
// Queries are always executed locally
var result = await _queryBus.SendAsync<GetUsersQuery, List<UserResponse>>(query, context);
return Ok(result);
}
}
โ๏ธ Configuration Options
Message Broker Providers
services.AddMessageBus(options =>
{
// RabbitMQ (default)
options.Provider = MessageBusConfigurationEnum.RabbitMQ;
// Azure Service Bus
options.Provider = MessageBusConfigurationEnum.AzureServiceBus;
// Amazon SQS
options.Provider = MessageBusConfigurationEnum.AmazonSQS;
// In-Memory (testing)
options.Provider = MessageBusConfigurationEnum.InMemory;
});
Smart Naming Configuration
// Default templates: {projectName}-{environment}-command-{commandName}
services.AddMessageBusWithNaming(
configuration,
projectName: "MyProject", // Default: assembly name
environment: "production", // Default: "dev"
commandQueueTemplate: "{projectName}.{environment}.cmd.{commandName}",
eventTopicTemplate: "{projectName}.{environment}.evt.{eventName}"
);
Naming Examples:
// Class: CreateUserCommand
// Result: myproject-prod-command-createuser
// Class: CreateUserCommandCommand (duplicate suffix)
// Result: myproject-prod-command-createusercommand (removes last "Command")
// Class: UserCreatedEvent
// Result: myproject-prod-event-usercreated
// Class: UserCreatedEventEvent (duplicate suffix)
// Result: myproject-prod-event-usercreatedevent (removes last "Event")
Advanced Configuration
services.Configure<MessageBrokerSettings>(options =>
{
options.Host = "rabbitmq.example.com";
options.Port = 5672;
options.Username = "myuser";
options.Password = "mypassword";
options.VirtualHost = "/myapp";
options.UseSsl = true;
options.PrefetchCount = 50;
// Retry settings
options.MassTransit.RetryPolicy.RetryLimit = 3;
options.MassTransit.RetryPolicy.InitialIntervalMs = 1000;
options.MassTransit.RetryPolicy.MaxIntervalMs = 30000;
});
๐ ๏ธ Context and Multi-Tenancy
The library provides built-in context support for cross-cutting concerns:
Context Helper Utilities
using TAF.Infra.MassTransit.Helpers;
// Create context from HTTP headers (in controllers/middleware)
var context = ContextHelper.CreateFromHeaders(Request.Headers);
// Create context from parameters (in services/background jobs)
var context = ContextHelper.CreateFromParameters(
userId: "user-123",
tenantId: "tenant-456",
appId: "web-api",
environmentId: "production",
correlationId: Guid.NewGuid(),
properties: new Dictionary<string, object>
{
{ "TraceId", "trace-789" },
{ "ClientIP", "192.168.1.1" }
}
);
// Manual context creation
var context = new Context
{
CorrelationId = Guid.NewGuid(),
TenantId = "tenant-123",
AppId = "web-app",
UserId = "user-456",
EnvironmentId = "production"
};
// Add custom properties
context.SetProperty("TraceId", "trace-789");
context.SetProperty("ClientIP", "192.168.1.1");
Context in Middleware
public class ContextMiddleware
{
public async Task InvokeAsync(HttpContext httpContext, RequestDelegate next)
{
// Automatically extract context from headers
var messageContext = ContextHelper.CreateFromHeaders(httpContext.Request.Headers);
// Store in some context provider for later use
using var scope = new ContextScope(messageContext);
await next(httpContext);
}
}
๐ Execution Patterns
Local Execution (MediatR)
- Fast, in-process execution
- Suitable for simple operations
- Automatic transaction boundaries
- Commands and Queries
Distributed Execution (MassTransit)
- Resilient, scalable execution
- Cross-service communication
- Built-in retry and error handling
- Commands and Events only
๐งช Testing Support
// Use In-Memory provider for testing
services.AddMessageBusWithNaming(
options =>
{
options.Provider = MessageBusConfigurationEnum.InMemory;
},
projectName: "TestProject",
environment: "test"
);
๐ง Advanced Features
Custom Base Classes
public class AuditableCommand : BaseCommand
{
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
public string CreatedBy { get; set; }
}
public class CreateOrderCommand : AuditableCommand
{
public List<OrderItem> Items { get; set; }
public decimal Total { get; set; }
}
Multiple Handlers
// Multiple event handlers for the same event
public class EmailNotificationHandler : IEventHandler<UserCreatedEvent> { ... }
public class AuditLogHandler : IEventHandler<UserCreatedEvent> { ... }
public class MetricsHandler : IEventHandler<UserCreatedEvent> { ... }
Queue Name Resolution
// The library automatically handles complex naming scenarios:
// Full class name: MyApp.Commands.User.CreateUserCommand
// Generated queue: myapp-prod-command-createuser
// Full class name: MyApp.Events.Order.OrderShippedEvent
// Generated topic: myapp-prod-event-ordershipped
// Duplicate suffix handling:
// CreateUserCommandCommand โ myapp-prod-command-createusercommand
// UserCreatedEventEvent โ myapp-prod-event-usercreatedevent
๐ API Reference
Core Interfaces
ICommandBus
- Command execution (local/distributed)IQueryBus
- Query execution (local only)IEventBus
- Event publishing (distributed)ICommandHandler<T>
- Command handler interfaceICommandHandler<T, R>
- Command handler with response interfaceIQueryHandler<T, R>
- Query handler interfaceIEventHandler<T>
- Event handler interface
Base Classes
BaseCommand
- Base class for commandsBaseCommand<T>
- Base class for commands with responseBaseQuery<T>
- Base class for queriesBaseEvent
- Base class for events
Helper Classes
ContextHelper
- Static helper for context creationMessageNamingConfiguration
- Configuration for queue/topic namingContextScope
- Scoped context management
Configuration Classes
MessageBusConfiguration
- Main configurationMessageBrokerSettings
- Broker-specific settingsMessageNamingConfiguration
- Naming convention settings
๐ค Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
๐ License
This project is licensed under the MIT License.
๐ Issues
If you encounter any issues, please file them in the GitHub Issues section.
Built with โค๏ธ for distributed systems
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- MassTransit (>= 8.5.2)
- MassTransit.AmazonSQS (>= 8.5.2)
- MassTransit.Azure.ServiceBus.Core (>= 8.5.2)
- MassTransit.RabbitMQ (>= 8.5.2)
- MediatR (>= 13.0.0)
- Microsoft.AspNetCore.Http.Features (>= 5.0.17)
- Microsoft.Extensions.Configuration.Abstractions (>= 9.0.8)
- Microsoft.Extensions.Configuration.Binder (>= 8.0.2)
- Newtonsoft.Json (>= 13.0.3)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Initial release featuring:
- Unified CQRS pattern with Commands, Queries, and Events
- Dual execution modes (local via MediatR, distributed via MassTransit)
- Support for RabbitMQ, Azure Service Bus, Amazon SQS, and In-Memory brokers
- Built-in context propagation for multi-tenant scenarios
- Automatic handler discovery and registration
- Configurable retry policies and error handling
- Type-safe message contracts and handlers