ApiFeatures.Mqtt.Apis 9.0.1

dotnet add package ApiFeatures.Mqtt.Apis --version 9.0.1
                    
NuGet\Install-Package ApiFeatures.Mqtt.Apis -Version 9.0.1
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="ApiFeatures.Mqtt.Apis" Version="9.0.1" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="ApiFeatures.Mqtt.Apis" Version="9.0.1" />
                    
Directory.Packages.props
<PackageReference Include="ApiFeatures.Mqtt.Apis" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add ApiFeatures.Mqtt.Apis --version 9.0.1
                    
#r "nuget: ApiFeatures.Mqtt.Apis, 9.0.1"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package ApiFeatures.Mqtt.Apis@9.0.1
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=ApiFeatures.Mqtt.Apis&version=9.0.1
                    
Install as a Cake Addin
#tool nuget:?package=ApiFeatures.Mqtt.Apis&version=9.0.1
                    
Install as a Cake Tool

ApiFeatures.Mqtt.Apis

I. Overview

ApiFeatures.Mqtt.Apis is a .NET 9 library for managing MQTT connections and processing MQTT messages with MongoDB persistence. The library provides a complete API for creating, updating, and managing MQTT broker configurations with support for custom message parsers and processors.

Key Features

  • MQTT Broker Management: Configure and manage multiple MQTT broker connections
  • HiveMQ Cloud Support: Built-in support for HiveMQ Cloud MQTT brokers
  • Event-Driven Message Processing: Process incoming MQTT messages with custom parsers and processors
  • MongoDB Persistence: Store MQTT configurations in MongoDB
  • RESTful API Endpoints: Full CRUD operations for MQTT configuration management
  • Connection Management: Start/stop MQTT connections via API
  • Custom Message Processing: Extensible message parser and processor pattern
  • FluentValidation: Built-in validation for all API requests
  • Keyed Services: Uses .NET's keyed service pattern for flexible dependency injection
  • Hosted Services: Automatic MQTT connection lifecycle management

Use Cases

  • Connect to IoT device MQTT brokers
  • Process sensor data from MQTT topics
  • Integrate with cloud MQTT services (HiveMQ, AWS IoT, Azure IoT Hub)
  • Build real-time data pipelines from MQTT sources
  • Manage multiple MQTT connections from a single application

II. Installation

Step 1: Add Package Reference

Add the ApiFeatures.Mqtt.Apis package to your project:

<ItemGroup>
  <PackageReference Include="ApiFeatures.Mqtt.Apis" Version="*" />
</ItemGroup>

Step 2: Register Services

In your Program.cs, configure and register the MQTT module:

using ApiFeatures.Mqtt.Apis.Extensions;
using ApiFeatures.Mqtt.Apis.Models;
using ApiFeatures.Mqtt.Apis.Models.Options;
using IotVn.CoreFeatures.Services;

var builder = WebApplication.CreateBuilder(args);

// Configure MQTT options
var options = new AddMqttsFeatureOptions();

// Required: Configure MongoDB database
options.WithDatabaseOptions(new MongoMqttDatabaseOptions
{
    ConnectionString = "mongodb://localhost:27017",
    DatabaseName = "mqtt_db"
});

// Required: Configure JSON tool for message parsing
options.WithJsonTool<JsonTool>();
// Or with factory:
// options.WithJsonTool(sp => new JsonTool());

// Optional: Register custom message parser
options.WithMqttMessageParser<MyMqttMessageParser>();
// Or with factory:
// options.WithMqttMessageParser(sp => new MyMqttMessageParser(sp));

// Optional: Register custom message processor
options.WithMqttMessageProcessor<MyMqttMessageProcessor>();
// Or with factory:
// options.WithMqttMessageProcessor(sp => new MyMqttMessageProcessor(sp));

// Register the module
builder.Services.AddMqttsModule(options);

var app = builder.Build();

// Map endpoints (optional authorization)
app.AddMqttsEndpoints(new AddMqttsEndpointsOptions
{
    AuthorizationPolicyHandler = () => new AuthorizationPolicyBuilder()
        .RequireAuthenticatedUser()
        .Build()
});

app.Run();

Step 3: Implement Custom Message Parser (Optional)

Create a parser to transform raw MQTT messages:

using ApiFeatures.Mqtt.Apis.Interfaces;
using ApiFeatures.Mqtt.Apis.Models;

public class MyMqttMessageParser : IMqttMessageParser
{
    private readonly ILogger<MyMqttMessageParser> _logger;

    public MyMqttMessageParser(ILogger<MyMqttMessageParser> logger)
    {
        _logger = logger;
    }

    public async Task<MqttMessage?> ParseAsync(
        MqttMessage mqttMessage, 
        string rawMessage, 
        CancellationToken cancellationToken = default)
    {
        try
        {
            _logger.LogInformation("Parsing MQTT message from topic: {Topic}", mqttMessage.Event);
            
            // Parse the raw MQTT message
            var parsedData = JsonSerializer.Deserialize<MyDataType>(rawMessage);
            
            // Create strongly-typed message
            var typedMessage = new MqttMessage<MyDataType>(
                mqttMessage.MessageId, 
                mqttMessage.Event)
            {
                Data = new[] { parsedData },
                MacAddress = parsedData.DeviceId,
                Model = parsedData.Model,
                Version = parsedData.Version
            };
            
            return typedMessage;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to parse MQTT message");
            return null;
        }
    }
}

Step 4: Implement Message Processor (Optional)

Create a processor to handle parsed MQTT messages:

using ApiFeatures.Mqtt.Apis.Interfaces;
using ApiFeatures.Mqtt.Apis.Models;

public class MyMqttMessageProcessor : IMqttMessageProcessor
{
    private readonly ILogger<MyMqttMessageProcessor> _logger;
    private readonly IMyDataService _dataService;

    public MyMqttMessageProcessor(
        ILogger<MyMqttMessageProcessor> logger,
        IMyDataService dataService)
    {
        _logger = logger;
        _dataService = dataService;
    }

    public async Task<bool> CanExecuteAsync(
        MqttMessage mqttMessage, 
        CancellationToken cancellationToken = default)
    {
        // Check if this processor can handle this message type
        return mqttMessage.Event.StartsWith("sensors/");
    }

    public async Task ProcessAsync(
        MqttMessage mqttMessage, 
        CancellationToken cancellationToken = default)
    {
        _logger.LogInformation("Processing MQTT message: {MessageId}", mqttMessage.MessageId);

        try
        {
            if (mqttMessage is MqttMessage<MyDataType> typedMessage)
            {
                // Process the message data
                foreach (var data in typedMessage.Data ?? Array.Empty<MyDataType>())
                {
                    await _dataService.ProcessDataAsync(data, cancellationToken);
                }
            }

            _logger.LogInformation("Successfully processed message: {MessageId}", mqttMessage.MessageId);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to process MQTT message: {MessageId}", mqttMessage.MessageId);
            throw;
        }
    }
}

III. Configuration Options

AddMqttsFeatureOptions Methods

WithDatabaseOptions(MongoMqttDatabaseOptions options) [REQUIRED]

Configures MongoDB database connection for storing MQTT configurations.

Parameters:

  • ConnectionString (string, required): MongoDB connection string
  • DatabaseName (string, required): Name of the database
  • Collections (MongoMqttDatabaseCollectionOptions, optional): Custom collection names

Example:

options.WithDatabaseOptions(new MongoMqttDatabaseOptions
{
    ConnectionString = "mongodb://localhost:27017",
    DatabaseName = "mqtt_db",
    Collections = new MongoMqttDatabaseCollectionOptions
    {
        Mqtts = "CustomMqttsCollection", // Default: "Mqtts"
        MqttConnections = "CustomConnectionsCollection" // Default: "MqttConnections"
    }
});

WithJsonTool<T>() or WithJsonTool(Func<IServiceProvider, IJsonTool>) [REQUIRED]

Registers the JSON serialization tool for MQTT message parsing.

Generic version:

options.WithJsonTool<JsonTool>();

Factory version:

options.WithJsonTool(sp => new JsonTool());
// Or custom implementation:
options.WithJsonTool(sp => new CustomJsonTool());

Parameters:

  • T: Type that implements IJsonTool
  • factory: Factory function that returns an IJsonTool instance

Lifetime: Singleton (keyed with ServiceKeys.Default)


WithMqttMessageParser<T>() or WithMqttMessageParser(Func<IServiceProvider, IMqttMessageParser>) [OPTIONAL]

Registers custom MQTT message parsers for transforming raw messages.

Generic version:

options.WithMqttMessageParser<MyMqttMessageParser>();

Factory version:

options.WithMqttMessageParser(sp => new MyMqttMessageParser(
    sp.GetRequiredService<ILogger<MyMqttMessageParser>>()
));

Parser Interface:

public interface IMqttMessageParser
{
    Task<MqttMessage?> ParseAsync(
        MqttMessage mqttMessage, 
        string rawMessage, 
        CancellationToken cancellationToken = default);
}

When Used:

  • Called when a new MQTT message arrives from subscribed topics
  • Transforms raw string payload into strongly-typed MqttMessage objects

Parameters:

  • T: Type that implements IMqttMessageParser
  • factory: Factory function that returns an IMqttMessageParser instance

Lifetime: Singleton (keyed with ServiceKeys.Default)

Note: Multiple message parsers can be registered by calling this method multiple times.


WithMqttMessageProcessor<T>() or WithMqttMessageProcessor(Func<IServiceProvider, IMqttMessageProcessor>) [OPTIONAL]

Registers custom MQTT message processors for handling parsed messages.

Generic version:

options.WithMqttMessageProcessor<MyMqttMessageProcessor>();

Factory version:

options.WithMqttMessageProcessor(sp => new MyMqttMessageProcessor(
    sp.GetRequiredService<ILogger<MyMqttMessageProcessor>>(),
    sp.GetRequiredService<IMyDataService>()
));

Processor Interface:

public interface IMqttMessageProcessor
{
    Task<bool> CanExecuteAsync(MqttMessage mqttMessage, CancellationToken cancellationToken = default);
    Task ProcessAsync(MqttMessage mqttMessage, CancellationToken cancellationToken = default);
}

When Used:

  • Called after a message is successfully parsed
  • Checks CanExecuteAsync first to determine if processor can handle the message
  • If true, calls ProcessAsync to handle the message

Parameters:

  • T: Type that implements IMqttMessageProcessor
  • factory: Factory function that returns an IMqttMessageProcessor instance

Lifetime: Scoped (keyed with ServiceKeys.Default)

Note: Multiple message processors can be registered. All processors with CanExecuteAsync returning true will be invoked.


IV. API Endpoints

MQTT Configuration Management

GET /api/mqtts

Get all MQTT configurations.

Response:

[
  {
    "id": "018d1234-5678-7abc-def0-123456789abc",
    "name": "IoT Sensors HiveMQ",
    "description": "MQTT connection for IoT sensor data",
    "topics": [
      "sensors/temperature",
      "sensors/humidity"
    ],
    "options": {
      "type": "CloudHiveMq",
      "hostname": "abc123.s1.eu.hivemq.cloud",
      "port": 8883,
      "clientId": "iot-gateway-001",
      "username": "mqtt_user",
      "password": "***"
    },
    "createdTime": "2024-01-15T10:30:00Z",
    "updatedTime": "2024-01-20T14:45:00Z"
  }
]

GET /api/mqtt/{id}

Get a specific MQTT configuration by ID.

Path Parameters:

  • id (Guid, required): MQTT configuration ID

Response:

{
  "id": "018d1234-5678-7abc-def0-123456789abc",
  "name": "IoT Sensors HiveMQ",
  "description": "MQTT connection for IoT sensor data",
  "topics": [
    "sensors/temperature",
    "sensors/humidity"
  ],
  "options": {
    "type": "CloudHiveMq",
    "hostname": "abc123.s1.eu.hivemq.cloud",
    "port": 8883,
    "clientId": "iot-gateway-001",
    "username": "mqtt_user",
    "password": "***"
  },
  "createdTime": "2024-01-15T10:30:00Z"
}

POST /api/mqtt

Create a new MQTT configuration.

Request Body:

{
  "name": "IoT Sensors HiveMQ",
  "description": "MQTT connection for IoT sensor data",
  "topics": [
    "sensors/temperature",
    "sensors/humidity"
  ],
  "options": {
    "hostname": "abc123.s1.eu.hivemq.cloud",
    "port": 8883,
    "clientId": "iot-gateway-001",
    "username": "mqtt_user",
    "password": "secure_password_here"
  }
}

Response:

{
  "id": "018d1234-5678-7abc-def0-123456789abc",
  "name": "IoT Sensors HiveMQ",
  "description": "MQTT connection for IoT sensor data",
  "topics": [
    "sensors/temperature",
    "sensors/humidity"
  ],
  "options": {
    "type": "CloudHiveMq",
    "hostname": "abc123.s1.eu.hivemq.cloud",
    "port": 8883,
    "clientId": "iot-gateway-001",
    "username": "mqtt_user",
    "password": "***"
  },
  "createdTime": "2024-01-15T10:30:00Z"
}

PUT /api/mqtt/{id}

Update an existing MQTT configuration.

Path Parameters:

  • id (Guid, required): MQTT configuration ID

Request Body (all fields optional):

{
  "name": "Updated MQTT Name",
  "description": "Updated description",
  "topics": [
    "sensors/all"
  ],
  "hostname": "new-broker.hivemq.cloud",
  "port": 8883,
  "clientId": "new-client-id",
  "username": "new_user",
  "password": "new_password"
}

Response:

{
  "id": "018d1234-5678-7abc-def0-123456789abc",
  "name": "Updated MQTT Name",
  "description": "Updated description",
  "topics": [
    "sensors/all"
  ],
  "options": {
    "type": "CloudHiveMq",
    "hostname": "new-broker.hivemq.cloud",
    "port": 8883,
    "clientId": "new-client-id",
    "username": "new_user",
    "password": "***"
  },
  "createdTime": "2024-01-15T10:30:00Z",
  "updatedTime": "2024-01-20T15:00:00Z"
}

Note: Only send the fields you want to update. Null/missing fields won't be updated.


DELETE /api/mqtt/{id}

Delete an MQTT configuration.

Path Parameters:

  • id (Guid, required): MQTT configuration ID

Response: 204 No Content


POST /api/mqtt/{id}/connection

Start an MQTT connection (connect to the broker and subscribe to topics).

Path Parameters:

  • id (Guid, required): MQTT configuration ID

Response: 200 OK

Effects:

  • Connects to the MQTT broker specified in the configuration
  • Subscribes to all configured topics
  • Updates connection status to "Connected" in the database

DELETE /api/mqtt/{id}/connection

Stop an MQTT connection (disconnect from the broker).

Path Parameters:

  • id (Guid, required): MQTT configuration ID

Response: 200 OK

Effects:

  • Disconnects from the MQTT broker
  • Unsubscribes from all topics
  • Updates connection status to "Disconnected" in the database

V. Advanced Configuration

Multiple Message Parsers

You can register multiple message parsers to handle different message formats:

options
    .WithMqttMessageParser<SensorDataParser>()
    .WithMqttMessageParser<DeviceStatusParser>()
    .WithMqttMessageParser<AlertMessageParser>();

The MQTT message service will try each parser in order until one successfully parses the message.

Multiple Message Processors

You can register multiple message processors for different processing pipelines:

options
    .WithMqttMessageProcessor<LoggingProcessor>()        // First: Log all messages
    .WithMqttMessageProcessor<ValidationProcessor>()     // Second: Validate data
    .WithMqttMessageProcessor<DataStorageProcessor>()    // Third: Store in database
    .WithMqttMessageProcessor<AlertingProcessor>();      // Fourth: Send alerts

All processors with CanExecuteAsync returning true will process each message.

Connection Lifecycle

The MQTT module includes hosted services that automatically manage connection lifecycles:

  • MqttHostedService: Manages the overall MQTT service lifecycle
  • MqttConnectionHostedService: Monitors and maintains active MQTT connections

VI. Configuration File Example

appsettings.json

{
  "Mqtt": {
    "MongoDatabase": {
      "ConnectionString": "mongodb://localhost:27017",
      "DatabaseName": "mqtt_db"
    }
  },
  "Logging": {
    "LogLevel": {
      "ApiFeatures.Mqtt.Apis": "Debug"
    }
  }
}

Program.cs with Configuration Binding

var mongoOptions = builder.Configuration
    .GetSection("Mqtt:MongoDatabase")
    .Get<MongoMqttDatabaseOptions>()!;

options.WithDatabaseOptions(mongoOptions);

VII. Dependencies

Required Packages

  • MongoDB.Driver (3.5.1+)
  • MQTTnet (4.3.7+)
  • FluentValidation (12.1.1+)
  • Newtonsoft.Json (13.0.4+)
  • IotVn.CoreFeatures (9.1.0+)
  • ApiFeatures.Modules.Cores (9.0.1+)
  • BusinessFeatures.Cores (latest)

Framework

  • .NET 9.0

VIII. Features

Automatic Connection Management

  • Auto-start on demand: Start MQTT connections via API endpoints
  • Auto-reconnect: Automatic reconnection on connection loss
  • Graceful shutdown: Proper cleanup when stopping connections

Thread-Safe Operations

  • All MQTT operations are thread-safe
  • Concurrent message processing is handled safely

GUID Serialization

  • Uses GuidRepresentation.CSharpLegacy for MongoDB compatibility
  • Automatically configures GUID serialization on database context initialization

Validation

All API endpoints use FluentValidation:

  • MQTT configuration validation
  • Unique MQTT configuration names
  • Required fields validation
  • Connection parameter validation

Security

  • Passwords are masked in API responses (displayed as ***)
  • Secure TLS connections to MQTT brokers
  • Username/password authentication support

IX. Error Handling

Common Exceptions

Exception When Thrown
MqttByIdNotFoundException MQTT configuration with specified ID not found
ValidationException Request validation failed
MqttConnectionException Failed to connect to MQTT broker

X. Service Keys

The module uses keyed services for dependency injection:

Service Key Lifetime
IValidator<*> ServiceKeys.Default Scoped
IJsonTool ServiceKeys.Default Singleton
MongoMqttDatabaseContext ServiceKeys.Default Scoped
IMqttService ServiceKeys.Default Scoped
IMqttConnectionService ServiceKeys.Default Scoped
IHiveMqService ServiceKeys.Default Singleton
IMqttMessageParser ServiceKeys.Default Singleton
IMqttMessageProcessor ServiceKeys.Default Scoped

ServiceKeys Values:

  • ServiceKeys.Default = "ApiFeatures.Mqtt.Apis"

XI. Message Flow

Incoming MQTT Message Processing

  1. Message Received: MQTT client receives message from subscribed topic
  2. Message Parsing: Each registered parser attempts to parse the raw message
  3. Message Processing: All processors with CanExecuteAsync returning true process the message
  4. Error Handling: Errors are logged but don't stop other processors
MQTT Broker 
    ↓
HiveMqService (receives raw message)
    ↓
MqttMessageService
    ↓
┌─────────────────────────┐
│ Message Parsers         │
│ (try in order)          │
│  - SensorDataParser     │
│  - DeviceStatusParser   │
│  - AlertMessageParser   │
└─────────────────────────┘
    ↓
┌─────────────────────────┐
│ Message Processors      │
│ (all with CanExecute)   │
│  - LoggingProcessor     │
│  - ValidationProcessor  │
│  - DataStorageProcessor │
│  - AlertingProcessor    │
└─────────────────────────┘

XII. Best Practices

1. Parser Design

  • Keep parsers focused on a single message format
  • Return null if the parser can't handle the message
  • Log parsing errors for debugging
  • Use strongly-typed MqttMessage<T> for parsed messages

2. Processor Design

  • Implement CanExecuteAsync to filter messages efficiently
  • Keep processors independent and stateless when possible
  • Handle exceptions gracefully
  • Log processing activities for monitoring

3. Connection Management

  • Use descriptive names for MQTT configurations
  • Document topic patterns in descriptions
  • Test connections before deploying
  • Monitor connection status regularly

4. Security

  • Store MQTT credentials securely (use secrets management)
  • Use TLS/SSL connections to brokers (port 8883)
  • Rotate credentials periodically
  • Limit topic subscriptions to necessary topics only

XIII. License

[Your License Here]


XIV. Contributing

[Your Contributing Guidelines Here]

Product Compatible and additional computed target framework versions.
.NET net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
9.0.1 36 5/7/2026