ApiFeatures.Mqtt.Apis
9.0.1
dotnet add package ApiFeatures.Mqtt.Apis --version 9.0.1
NuGet\Install-Package ApiFeatures.Mqtt.Apis -Version 9.0.1
<PackageReference Include="ApiFeatures.Mqtt.Apis" Version="9.0.1" />
<PackageVersion Include="ApiFeatures.Mqtt.Apis" Version="9.0.1" />
<PackageReference Include="ApiFeatures.Mqtt.Apis" />
paket add ApiFeatures.Mqtt.Apis --version 9.0.1
#r "nuget: ApiFeatures.Mqtt.Apis, 9.0.1"
#:package ApiFeatures.Mqtt.Apis@9.0.1
#addin nuget:?package=ApiFeatures.Mqtt.Apis&version=9.0.1
#tool nuget:?package=ApiFeatures.Mqtt.Apis&version=9.0.1
ApiFeatures.Mqtt.Apis
I. Overview
ApiFeatures.Mqtt.Apis is a .NET 9 library for managing MQTT connections and processing MQTT messages with MongoDB persistence. The library provides a complete API for creating, updating, and managing MQTT broker configurations with support for custom message parsers and processors.
Key Features
- MQTT Broker Management: Configure and manage multiple MQTT broker connections
- HiveMQ Cloud Support: Built-in support for HiveMQ Cloud MQTT brokers
- Event-Driven Message Processing: Process incoming MQTT messages with custom parsers and processors
- MongoDB Persistence: Store MQTT configurations in MongoDB
- RESTful API Endpoints: Full CRUD operations for MQTT configuration management
- Connection Management: Start/stop MQTT connections via API
- Custom Message Processing: Extensible message parser and processor pattern
- FluentValidation: Built-in validation for all API requests
- Keyed Services: Uses .NET's keyed service pattern for flexible dependency injection
- Hosted Services: Automatic MQTT connection lifecycle management
Use Cases
- Connect to IoT device MQTT brokers
- Process sensor data from MQTT topics
- Integrate with cloud MQTT services (HiveMQ, AWS IoT, Azure IoT Hub)
- Build real-time data pipelines from MQTT sources
- Manage multiple MQTT connections from a single application
II. Installation
Step 1: Add Package Reference
Add the ApiFeatures.Mqtt.Apis package to your project:
<ItemGroup>
<PackageReference Include="ApiFeatures.Mqtt.Apis" Version="*" />
</ItemGroup>
Step 2: Register Services
In your Program.cs, configure and register the MQTT module:
using ApiFeatures.Mqtt.Apis.Extensions;
using ApiFeatures.Mqtt.Apis.Models;
using ApiFeatures.Mqtt.Apis.Models.Options;
using IotVn.CoreFeatures.Services;
var builder = WebApplication.CreateBuilder(args);
// Configure MQTT options
var options = new AddMqttsFeatureOptions();
// Required: Configure MongoDB database
options.WithDatabaseOptions(new MongoMqttDatabaseOptions
{
ConnectionString = "mongodb://localhost:27017",
DatabaseName = "mqtt_db"
});
// Required: Configure JSON tool for message parsing
options.WithJsonTool<JsonTool>();
// Or with factory:
// options.WithJsonTool(sp => new JsonTool());
// Optional: Register custom message parser
options.WithMqttMessageParser<MyMqttMessageParser>();
// Or with factory:
// options.WithMqttMessageParser(sp => new MyMqttMessageParser(sp));
// Optional: Register custom message processor
options.WithMqttMessageProcessor<MyMqttMessageProcessor>();
// Or with factory:
// options.WithMqttMessageProcessor(sp => new MyMqttMessageProcessor(sp));
// Register the module
builder.Services.AddMqttsModule(options);
var app = builder.Build();
// Map endpoints (optional authorization)
app.AddMqttsEndpoints(new AddMqttsEndpointsOptions
{
AuthorizationPolicyHandler = () => new AuthorizationPolicyBuilder()
.RequireAuthenticatedUser()
.Build()
});
app.Run();
Step 3: Implement Custom Message Parser (Optional)
Create a parser to transform raw MQTT messages:
using ApiFeatures.Mqtt.Apis.Interfaces;
using ApiFeatures.Mqtt.Apis.Models;
public class MyMqttMessageParser : IMqttMessageParser
{
private readonly ILogger<MyMqttMessageParser> _logger;
public MyMqttMessageParser(ILogger<MyMqttMessageParser> logger)
{
_logger = logger;
}
public async Task<MqttMessage?> ParseAsync(
MqttMessage mqttMessage,
string rawMessage,
CancellationToken cancellationToken = default)
{
try
{
_logger.LogInformation("Parsing MQTT message from topic: {Topic}", mqttMessage.Event);
// Parse the raw MQTT message
var parsedData = JsonSerializer.Deserialize<MyDataType>(rawMessage);
// Create strongly-typed message
var typedMessage = new MqttMessage<MyDataType>(
mqttMessage.MessageId,
mqttMessage.Event)
{
Data = new[] { parsedData },
MacAddress = parsedData.DeviceId,
Model = parsedData.Model,
Version = parsedData.Version
};
return typedMessage;
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to parse MQTT message");
return null;
}
}
}
Step 4: Implement Message Processor (Optional)
Create a processor to handle parsed MQTT messages:
using ApiFeatures.Mqtt.Apis.Interfaces;
using ApiFeatures.Mqtt.Apis.Models;
public class MyMqttMessageProcessor : IMqttMessageProcessor
{
private readonly ILogger<MyMqttMessageProcessor> _logger;
private readonly IMyDataService _dataService;
public MyMqttMessageProcessor(
ILogger<MyMqttMessageProcessor> logger,
IMyDataService dataService)
{
_logger = logger;
_dataService = dataService;
}
public async Task<bool> CanExecuteAsync(
MqttMessage mqttMessage,
CancellationToken cancellationToken = default)
{
// Check if this processor can handle this message type
return mqttMessage.Event.StartsWith("sensors/");
}
public async Task ProcessAsync(
MqttMessage mqttMessage,
CancellationToken cancellationToken = default)
{
_logger.LogInformation("Processing MQTT message: {MessageId}", mqttMessage.MessageId);
try
{
if (mqttMessage is MqttMessage<MyDataType> typedMessage)
{
// Process the message data
foreach (var data in typedMessage.Data ?? Array.Empty<MyDataType>())
{
await _dataService.ProcessDataAsync(data, cancellationToken);
}
}
_logger.LogInformation("Successfully processed message: {MessageId}", mqttMessage.MessageId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process MQTT message: {MessageId}", mqttMessage.MessageId);
throw;
}
}
}
III. Configuration Options
AddMqttsFeatureOptions Methods
WithDatabaseOptions(MongoMqttDatabaseOptions options) [REQUIRED]
Configures MongoDB database connection for storing MQTT configurations.
Parameters:
ConnectionString(string, required): MongoDB connection stringDatabaseName(string, required): Name of the databaseCollections(MongoMqttDatabaseCollectionOptions, optional): Custom collection names
Example:
options.WithDatabaseOptions(new MongoMqttDatabaseOptions
{
ConnectionString = "mongodb://localhost:27017",
DatabaseName = "mqtt_db",
Collections = new MongoMqttDatabaseCollectionOptions
{
Mqtts = "CustomMqttsCollection", // Default: "Mqtts"
MqttConnections = "CustomConnectionsCollection" // Default: "MqttConnections"
}
});
WithJsonTool<T>() or WithJsonTool(Func<IServiceProvider, IJsonTool>) [REQUIRED]
Registers the JSON serialization tool for MQTT message parsing.
Generic version:
options.WithJsonTool<JsonTool>();
Factory version:
options.WithJsonTool(sp => new JsonTool());
// Or custom implementation:
options.WithJsonTool(sp => new CustomJsonTool());
Parameters:
T: Type that implementsIJsonToolfactory: Factory function that returns anIJsonToolinstance
Lifetime: Singleton (keyed with ServiceKeys.Default)
WithMqttMessageParser<T>() or WithMqttMessageParser(Func<IServiceProvider, IMqttMessageParser>) [OPTIONAL]
Registers custom MQTT message parsers for transforming raw messages.
Generic version:
options.WithMqttMessageParser<MyMqttMessageParser>();
Factory version:
options.WithMqttMessageParser(sp => new MyMqttMessageParser(
sp.GetRequiredService<ILogger<MyMqttMessageParser>>()
));
Parser Interface:
public interface IMqttMessageParser
{
Task<MqttMessage?> ParseAsync(
MqttMessage mqttMessage,
string rawMessage,
CancellationToken cancellationToken = default);
}
When Used:
- Called when a new MQTT message arrives from subscribed topics
- Transforms raw string payload into strongly-typed
MqttMessageobjects
Parameters:
T: Type that implementsIMqttMessageParserfactory: Factory function that returns anIMqttMessageParserinstance
Lifetime: Singleton (keyed with ServiceKeys.Default)
Note: Multiple message parsers can be registered by calling this method multiple times.
WithMqttMessageProcessor<T>() or WithMqttMessageProcessor(Func<IServiceProvider, IMqttMessageProcessor>) [OPTIONAL]
Registers custom MQTT message processors for handling parsed messages.
Generic version:
options.WithMqttMessageProcessor<MyMqttMessageProcessor>();
Factory version:
options.WithMqttMessageProcessor(sp => new MyMqttMessageProcessor(
sp.GetRequiredService<ILogger<MyMqttMessageProcessor>>(),
sp.GetRequiredService<IMyDataService>()
));
Processor Interface:
public interface IMqttMessageProcessor
{
Task<bool> CanExecuteAsync(MqttMessage mqttMessage, CancellationToken cancellationToken = default);
Task ProcessAsync(MqttMessage mqttMessage, CancellationToken cancellationToken = default);
}
When Used:
- Called after a message is successfully parsed
- Checks
CanExecuteAsyncfirst to determine if processor can handle the message - If true, calls
ProcessAsyncto handle the message
Parameters:
T: Type that implementsIMqttMessageProcessorfactory: Factory function that returns anIMqttMessageProcessorinstance
Lifetime: Scoped (keyed with ServiceKeys.Default)
Note: Multiple message processors can be registered. All processors with CanExecuteAsync returning true will be invoked.
IV. API Endpoints
MQTT Configuration Management
GET /api/mqtts
Get all MQTT configurations.
Response:
[
{
"id": "018d1234-5678-7abc-def0-123456789abc",
"name": "IoT Sensors HiveMQ",
"description": "MQTT connection for IoT sensor data",
"topics": [
"sensors/temperature",
"sensors/humidity"
],
"options": {
"type": "CloudHiveMq",
"hostname": "abc123.s1.eu.hivemq.cloud",
"port": 8883,
"clientId": "iot-gateway-001",
"username": "mqtt_user",
"password": "***"
},
"createdTime": "2024-01-15T10:30:00Z",
"updatedTime": "2024-01-20T14:45:00Z"
}
]
GET /api/mqtt/{id}
Get a specific MQTT configuration by ID.
Path Parameters:
id(Guid, required): MQTT configuration ID
Response:
{
"id": "018d1234-5678-7abc-def0-123456789abc",
"name": "IoT Sensors HiveMQ",
"description": "MQTT connection for IoT sensor data",
"topics": [
"sensors/temperature",
"sensors/humidity"
],
"options": {
"type": "CloudHiveMq",
"hostname": "abc123.s1.eu.hivemq.cloud",
"port": 8883,
"clientId": "iot-gateway-001",
"username": "mqtt_user",
"password": "***"
},
"createdTime": "2024-01-15T10:30:00Z"
}
POST /api/mqtt
Create a new MQTT configuration.
Request Body:
{
"name": "IoT Sensors HiveMQ",
"description": "MQTT connection for IoT sensor data",
"topics": [
"sensors/temperature",
"sensors/humidity"
],
"options": {
"hostname": "abc123.s1.eu.hivemq.cloud",
"port": 8883,
"clientId": "iot-gateway-001",
"username": "mqtt_user",
"password": "secure_password_here"
}
}
Response:
{
"id": "018d1234-5678-7abc-def0-123456789abc",
"name": "IoT Sensors HiveMQ",
"description": "MQTT connection for IoT sensor data",
"topics": [
"sensors/temperature",
"sensors/humidity"
],
"options": {
"type": "CloudHiveMq",
"hostname": "abc123.s1.eu.hivemq.cloud",
"port": 8883,
"clientId": "iot-gateway-001",
"username": "mqtt_user",
"password": "***"
},
"createdTime": "2024-01-15T10:30:00Z"
}
PUT /api/mqtt/{id}
Update an existing MQTT configuration.
Path Parameters:
id(Guid, required): MQTT configuration ID
Request Body (all fields optional):
{
"name": "Updated MQTT Name",
"description": "Updated description",
"topics": [
"sensors/all"
],
"hostname": "new-broker.hivemq.cloud",
"port": 8883,
"clientId": "new-client-id",
"username": "new_user",
"password": "new_password"
}
Response:
{
"id": "018d1234-5678-7abc-def0-123456789abc",
"name": "Updated MQTT Name",
"description": "Updated description",
"topics": [
"sensors/all"
],
"options": {
"type": "CloudHiveMq",
"hostname": "new-broker.hivemq.cloud",
"port": 8883,
"clientId": "new-client-id",
"username": "new_user",
"password": "***"
},
"createdTime": "2024-01-15T10:30:00Z",
"updatedTime": "2024-01-20T15:00:00Z"
}
Note: Only send the fields you want to update. Null/missing fields won't be updated.
DELETE /api/mqtt/{id}
Delete an MQTT configuration.
Path Parameters:
id(Guid, required): MQTT configuration ID
Response: 204 No Content
POST /api/mqtt/{id}/connection
Start an MQTT connection (connect to the broker and subscribe to topics).
Path Parameters:
id(Guid, required): MQTT configuration ID
Response: 200 OK
Effects:
- Connects to the MQTT broker specified in the configuration
- Subscribes to all configured topics
- Updates connection status to "Connected" in the database
DELETE /api/mqtt/{id}/connection
Stop an MQTT connection (disconnect from the broker).
Path Parameters:
id(Guid, required): MQTT configuration ID
Response: 200 OK
Effects:
- Disconnects from the MQTT broker
- Unsubscribes from all topics
- Updates connection status to "Disconnected" in the database
V. Advanced Configuration
Multiple Message Parsers
You can register multiple message parsers to handle different message formats:
options
.WithMqttMessageParser<SensorDataParser>()
.WithMqttMessageParser<DeviceStatusParser>()
.WithMqttMessageParser<AlertMessageParser>();
The MQTT message service will try each parser in order until one successfully parses the message.
Multiple Message Processors
You can register multiple message processors for different processing pipelines:
options
.WithMqttMessageProcessor<LoggingProcessor>() // First: Log all messages
.WithMqttMessageProcessor<ValidationProcessor>() // Second: Validate data
.WithMqttMessageProcessor<DataStorageProcessor>() // Third: Store in database
.WithMqttMessageProcessor<AlertingProcessor>(); // Fourth: Send alerts
All processors with CanExecuteAsync returning true will process each message.
Connection Lifecycle
The MQTT module includes hosted services that automatically manage connection lifecycles:
- MqttHostedService: Manages the overall MQTT service lifecycle
- MqttConnectionHostedService: Monitors and maintains active MQTT connections
VI. Configuration File Example
appsettings.json
{
"Mqtt": {
"MongoDatabase": {
"ConnectionString": "mongodb://localhost:27017",
"DatabaseName": "mqtt_db"
}
},
"Logging": {
"LogLevel": {
"ApiFeatures.Mqtt.Apis": "Debug"
}
}
}
Program.cs with Configuration Binding
var mongoOptions = builder.Configuration
.GetSection("Mqtt:MongoDatabase")
.Get<MongoMqttDatabaseOptions>()!;
options.WithDatabaseOptions(mongoOptions);
VII. Dependencies
Required Packages
MongoDB.Driver(3.5.1+)MQTTnet(4.3.7+)FluentValidation(12.1.1+)Newtonsoft.Json(13.0.4+)IotVn.CoreFeatures(9.1.0+)ApiFeatures.Modules.Cores(9.0.1+)BusinessFeatures.Cores(latest)
Framework
- .NET 9.0
VIII. Features
Automatic Connection Management
- Auto-start on demand: Start MQTT connections via API endpoints
- Auto-reconnect: Automatic reconnection on connection loss
- Graceful shutdown: Proper cleanup when stopping connections
Thread-Safe Operations
- All MQTT operations are thread-safe
- Concurrent message processing is handled safely
GUID Serialization
- Uses
GuidRepresentation.CSharpLegacyfor MongoDB compatibility - Automatically configures GUID serialization on database context initialization
Validation
All API endpoints use FluentValidation:
- MQTT configuration validation
- Unique MQTT configuration names
- Required fields validation
- Connection parameter validation
Security
- Passwords are masked in API responses (displayed as
***) - Secure TLS connections to MQTT brokers
- Username/password authentication support
IX. Error Handling
Common Exceptions
| Exception | When Thrown |
|---|---|
MqttByIdNotFoundException |
MQTT configuration with specified ID not found |
ValidationException |
Request validation failed |
MqttConnectionException |
Failed to connect to MQTT broker |
X. Service Keys
The module uses keyed services for dependency injection:
| Service | Key | Lifetime |
|---|---|---|
IValidator<*> |
ServiceKeys.Default |
Scoped |
IJsonTool |
ServiceKeys.Default |
Singleton |
MongoMqttDatabaseContext |
ServiceKeys.Default |
Scoped |
IMqttService |
ServiceKeys.Default |
Scoped |
IMqttConnectionService |
ServiceKeys.Default |
Scoped |
IHiveMqService |
ServiceKeys.Default |
Singleton |
IMqttMessageParser |
ServiceKeys.Default |
Singleton |
IMqttMessageProcessor |
ServiceKeys.Default |
Scoped |
ServiceKeys Values:
ServiceKeys.Default="ApiFeatures.Mqtt.Apis"
XI. Message Flow
Incoming MQTT Message Processing
- Message Received: MQTT client receives message from subscribed topic
- Message Parsing: Each registered parser attempts to parse the raw message
- Message Processing: All processors with
CanExecuteAsyncreturning true process the message - Error Handling: Errors are logged but don't stop other processors
MQTT Broker
↓
HiveMqService (receives raw message)
↓
MqttMessageService
↓
┌─────────────────────────┐
│ Message Parsers │
│ (try in order) │
│ - SensorDataParser │
│ - DeviceStatusParser │
│ - AlertMessageParser │
└─────────────────────────┘
↓
┌─────────────────────────┐
│ Message Processors │
│ (all with CanExecute) │
│ - LoggingProcessor │
│ - ValidationProcessor │
│ - DataStorageProcessor │
│ - AlertingProcessor │
└─────────────────────────┘
XII. Best Practices
1. Parser Design
- Keep parsers focused on a single message format
- Return
nullif the parser can't handle the message - Log parsing errors for debugging
- Use strongly-typed
MqttMessage<T>for parsed messages
2. Processor Design
- Implement
CanExecuteAsyncto filter messages efficiently - Keep processors independent and stateless when possible
- Handle exceptions gracefully
- Log processing activities for monitoring
3. Connection Management
- Use descriptive names for MQTT configurations
- Document topic patterns in descriptions
- Test connections before deploying
- Monitor connection status regularly
4. Security
- Store MQTT credentials securely (use secrets management)
- Use TLS/SSL connections to brokers (port 8883)
- Rotate credentials periodically
- Limit topic subscriptions to necessary topics only
XIII. License
[Your License Here]
XIV. Contributing
[Your Contributing Guidelines Here]
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net9.0
- BusinessFeatures.Cores (>= 9.0.1)
- FluentValidation (>= 12.1.1)
- IotVn.CoreFeatures (>= 9.1.0)
- Microsoft.AspNetCore.Http.Abstractions (>= 2.3.9)
- Microsoft.AspNetCore.OpenApi (>= 9.0.11)
- Microsoft.Extensions.Hosting.Abstractions (>= 9.0.11)
- MongoDB.Driver (>= 3.5.2)
- MQTTnet (>= 5.0.1.1416)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 9.0.1 | 36 | 5/7/2026 |