MQTTnet.Rx.SerialPort 4.0.2

dotnet add package MQTTnet.Rx.SerialPort --version 4.0.2
                    
NuGet\Install-Package MQTTnet.Rx.SerialPort -Version 4.0.2
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="MQTTnet.Rx.SerialPort" Version="4.0.2" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="MQTTnet.Rx.SerialPort" Version="4.0.2" />
                    
Directory.Packages.props
<PackageReference Include="MQTTnet.Rx.SerialPort" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add MQTTnet.Rx.SerialPort --version 4.0.2
                    
#r "nuget: MQTTnet.Rx.SerialPort, 4.0.2"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package MQTTnet.Rx.SerialPort@4.0.2
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=MQTTnet.Rx.SerialPort&version=4.0.2
                    
Install as a Cake Addin
#tool nuget:?package=MQTTnet.Rx.SerialPort&version=4.0.2
                    
Install as a Cake Tool

License Build

MQTTnet.Rx.Client

Nuget NuGet

MQTTnet.Rx.Server

Nuget NuGet

MQTTnet.Rx.ABPlc

Nuget NuGet

MQTTnet.Rx.Modbus

Nuget NuGet

MQTTnet.Rx.S7Plc

Nuget NuGet

MQTTnet.Rx.SerialPort

Nuget NuGet

MQTTnet.Rx.TwinCAT

Nuget NuGet

Alt

<p align="left"> <a href="https://github.com/ChrisPulman/MQTTnet.Rx"> <img alt="MQTTnet.Rx" src="https://github.com/ChrisPulman/MQTTnet.Rx/blob/main/Images/logo.png" width="200"/> </a> </p>

MQTTnet.Rx

Reactive extensions and helpers for MQTTnet (v5) that make it simple to build event-driven MQTT clients and servers using IObservable streams.

  • Targets .NET 8, .NET 9, and .NET 10
  • Based on MQTTnet 5.x and System.Reactive
  • Client and Server wrappers with rich observable APIs
  • Auto-reconnect Resilient client (replacement for ManagedClient)
  • Topic discovery, filtering, and JSON helpers
  • Low-allocation memory-efficient extensions for high-throughput scenarios
  • Last Will and Testament (LWT) configuration helpers
  • Optional integration packages (Modbus, SerialPort, S7 PLC, Allen-Bradley PLC, TwinCAT)

Note on ManagedClient: Support for ManagedClient is removed because MQTTnet v5 no longer includes it. Use the Resilient client in MQTTnet.Rx.Client instead.

Packages

  • MQTTnet.Rx.Client – Reactive MQTT client helpers (raw and resilient)
  • MQTTnet.Rx.Server – Reactive MQTT server helpers
  • MQTTnet.Rx.Modbus – Publish Modbus values via MQTT using ModbusRx.Reactive
  • MQTTnet.Rx.SerialPort – Publish/consume serial data via MQTT with CP.IO.Ports
  • MQTTnet.Rx.S7Plc – Publish/subscribe S7 PLC tags via MQTT with S7PlcRx
  • MQTTnet.Rx.ABPlc – Publish/subscribe Allen-Bradley PLC tags via MQTT with ABPlcRx
  • MQTTnet.Rx.TwinCAT – Publish/subscribe TwinCAT tags via MQTT with CP.TwinCatRx

Install

# Pick what you need
 dotnet add package MQTTnet.Rx.Client
 dotnet add package MQTTnet.Rx.Server
 dotnet add package MQTTnet.Rx.Modbus
 dotnet add package MQTTnet.Rx.SerialPort
 dotnet add package MQTTnet.Rx.S7Plc
 dotnet add package MQTTnet.Rx.ABPlc
 dotnet add package MQTTnet.Rx.TwinCAT

Quick start – Client (raw)

Publish an observable stream and subscribe to a topic.

using System.Reactive.Subjects;
using MQTTnet.Rx.Client;

var messages = new Subject<(string topic, string payload)>();

// Connect and publish
var publishSub = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .PublishMessage(messages)
    .Subscribe(r => Console.WriteLine($"PUB: {r.ReasonCode} [{r.PacketIdentifier}]"));

// Subscribe to a topic (supports wildcards + and #)
var subscribeSub = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .SubscribeToTopic("sensors/+/temp")
    .Subscribe(m => Console.WriteLine($"SUB: {m.ApplicationMessage.Topic} => {m.ApplicationMessage.ConvertPayloadToString()}"));

// Emit some messages
messages.OnNext(("sensors/kitchen/temp", "21.5"));
messages.OnNext(("sensors/lab/temp", "19.9"));

Quick start – Resilient client (auto-reconnect)

The resilient client stays connected and queues outbound messages while reconnecting.

using System.Reactive.Subjects;
using MQTTnet.Rx.Client;

var messages = new Subject<(string topic, string payload)>();

var resilientPubSub = Create.ResilientMqttClient()
    .WithResilientClientOptions(o =>
        o.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
         .WithClientOptions(c => c.WithTcpServer("localhost", 1883).WithClientId("app-1")))
    .PublishMessage(messages)
    .Subscribe(e => Console.WriteLine($"PUB: {e.ApplicationMessage.Id} [{e.ApplicationMessage.ApplicationMessage?.Topic}]"));

var resilientSub = Create.ResilientMqttClient()
    .WithResilientClientOptions(o =>
        o.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
         .WithClientOptions(c => c.WithTcpServer("localhost", 1883).WithClientId("app-2")))
    .SubscribeToTopic("devices/#")
    .Subscribe(m => Console.WriteLine($"SUB: {m.ApplicationMessage.Topic} => {m.ApplicationMessage.ConvertPayloadToString()}"));

// Emit messages
messages.OnNext(("devices/alpha", "hello"));

Resilient client event streams

var client = Create.ResilientMqttClient()
    .WithResilientClientOptions(o => o.WithClientOptions(c => c.WithTcpServer("localhost", 1883)));

var events = client.Subscribe(c =>
{
    var d1 = c.Connected.Subscribe(_ => Console.WriteLine("Connected"));
    var d2 = c.Disconnected.Subscribe(_ => Console.WriteLine("Disconnected"));
    var d3 = c.ApplicationMessageReceived.Subscribe(m => Console.WriteLine($"RX: {m.ApplicationMessage.Topic}"));
    var d4 = c.ConnectingFailed.Subscribe(e => Console.WriteLine($"Connection failed: {e.Exception?.Message}"));
    var d5 = c.ConnectionStateChanged.Subscribe(_ => Console.WriteLine($"State changed: Connected={c.IsConnected}"));
});

Reactive Client Operations

The library provides reactive wrappers for common MQTT client operations:

Ping and Keep-Alive

// Send a single ping
var pingSub = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .Ping()
    .Subscribe(_ => Console.WriteLine("Pong received"));

// Periodic pings to maintain connection
var keepAliveSub = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .PingPeriodically(TimeSpan.FromSeconds(30))
    .Subscribe(_ => Console.WriteLine("Keep-alive ping"));

Subscribe and Unsubscribe Operations

using MQTTnet.Protocol;

var client = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883));

// Subscribe to multiple topics with QoS
var subResult = client
    .Subscribe(new[] { "topic1", "topic2" }, MqttQualityOfServiceLevel.AtLeastOnce)
    .Subscribe(result => Console.WriteLine($"Subscribed: {result.Items.Count} topics"));

// Subscribe with custom topic filter
var customSub = client
    .Subscribe(f => f.WithTopic("sensors/#").WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce))
    .Subscribe(result => Console.WriteLine("Custom subscription done"));

// Unsubscribe from topics
var unsubResult = client
    .Unsubscribe("topic1", "topic2")
    .Subscribe(result => Console.WriteLine($"Unsubscribed: {result.Items.Count} topics"));

Connection Management

var client = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883));

// Monitor connection status
var statusSub = client
    .ConnectionStatus()
    .Subscribe(isConnected => Console.WriteLine($"Connected: {isConnected}"));

// Wait for connection with timeout
var waitSub = client
    .WaitForConnection(TimeSpan.FromSeconds(10))
    .Subscribe(c => Console.WriteLine("Client is now connected"));

// Disconnect gracefully
var disconnectSub = client
    .Disconnect(MqttClientDisconnectOptionsReason.NormalDisconnection)
    .Subscribe(_ => Console.WriteLine("Disconnected"));

// Reconnect using previous options
var reconnectSub = client
    .Reconnect()
    .Subscribe(_ => Console.WriteLine("Reconnected"));

Publish Operations

using MQTTnet.Protocol;

var client = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883));

// Simple publish with string payload
var pub1 = client
    .Publish("topic/test", "Hello World", MqttQualityOfServiceLevel.AtLeastOnce, retain: false)
    .Subscribe(result => Console.WriteLine($"Published: {result.ReasonCode}"));

// Publish with byte array payload
var pub2 = client
    .Publish("topic/binary", new byte[] { 0x01, 0x02, 0x03 })
    .Subscribe(result => Console.WriteLine($"Published binary: {result.ReasonCode}"));

// Publish with custom message builder
var pub3 = client
    .Publish(builder => builder
        .WithTopic("topic/custom")
        .WithPayload("Custom message")
        .WithUserProperty("app", "demo")
        .WithContentType("text/plain"))
    .Subscribe(result => Console.WriteLine($"Published custom: {result.ReasonCode}"));

// Publish multiple messages from an observable stream
var messageStream = Observable.Interval(TimeSpan.FromSeconds(1))
    .Select(i => Create.MqttFactory.CreateApplicationMessageBuilder()
        .WithTopic($"topic/seq/{i}")
        .WithPayload($"Message {i}")
        .Build());

var pub4 = client
    .PublishMany(messageStream)
    .Subscribe(result => Console.WriteLine($"Published from stream: {result.ReasonCode}"));

JSON helpers and topic discovery

Use ToDictionary to parse JSON payloads and Observe to extract keys as typed streams.

using MQTTnet.Rx.Client;

var d = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .SubscribeToTopic("telemetry/#")
    .ToDictionary()
    .Subscribe(dict =>
    {
        if (dict != null)
        {
            Console.WriteLine(string.Join(", ", dict.Select(kv => $"{kv.Key}={kv.Value}")));
        }
    });

// Observe specific keys with conversion helpers
var temperature = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .SubscribeToTopic("telemetry/room1")
    .ToDictionary()
    .Observe("temperature")
    .ToDouble()
    .Subscribe(t => Console.WriteLine($"Temp: {t:0.0}"));

Type Conversion Helpers

// Available conversion methods for Observe() results:
.ToBool()    // Convert to boolean
.ToByte()    // Convert to byte
.ToInt16()   // Convert to short
.ToInt32()   // Convert to int
.ToInt64()   // Convert to long
.ToSingle()  // Convert to float
.ToDouble()  // Convert to double

Deserialize to Typed Objects

using Newtonsoft.Json;

public class SensorReading
{
    public string SensorId { get; set; }
    public double Temperature { get; set; }
    public DateTime Timestamp { get; set; }
}

var readings = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .SubscribeToTopic("sensors/+/reading")
    .ToObject<SensorReading>()
    .Subscribe(reading => Console.WriteLine($"Sensor {reading?.SensorId}: {reading?.Temperature}°C"));

// With custom JSON settings
var settings = new JsonSerializerSettings { DateFormatString = "yyyy-MM-ddTHH:mm:ssZ" };
var readingsCustom = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .SubscribeToTopic("sensors/+/reading")
    .ToObject<SensorReading>(settings)
    .Subscribe(reading => Console.WriteLine($"Custom: {reading?.Timestamp}"));

📋 Topic Filtering Extensions

Advanced topic filtering and extraction capabilities:

Match Multiple Topic Patterns

// Match any of several patterns
var multiMatch = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .SubscribeToTopic("#")
    .WhereTopicMatchesAny("sensors/+/temp", "devices/+/status", "alerts/#")
    .Subscribe(m => Console.WriteLine($"Matched: {m.ApplicationMessage.Topic}"));

// Exclude specific patterns
var excluded = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .SubscribeToTopic("sensors/#")
    .WhereTopicIsNotMatch("sensors/+/debug")
    .Subscribe(m => Console.WriteLine($"Non-debug: {m.ApplicationMessage.Topic}"));

Extract Values from Topic Patterns

// Extract named values from topic levels
var extracted = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .SubscribeToTopic("sensors/+/readings/+")
    .ExtractTopicValues("sensors/{sensorId}/readings/{type}")
    .Subscribe(x =>
    {
        Console.WriteLine($"Sensor: {x.Values["sensorId"]}, Type: {x.Values["type"]}");
        Console.WriteLine($"Payload: {x.Message.ApplicationMessage.ConvertPayloadToString()}");
    });

Filter by Topic Level Count

// Only messages with exactly 3 topic levels
var threeLevel = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .SubscribeToTopic("#")
    .WhereTopicLevelCount(3)
    .Subscribe(m => Console.WriteLine($"3-level topic: {m.ApplicationMessage.Topic}"));

Extract Specific Topic Levels

// Get the second topic level (index 1)
var deviceIds = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .SubscribeToTopic("devices/+/status")
    .SelectTopicLevel(1)
    .Distinct()
    .Subscribe(deviceId => Console.WriteLine($"Device ID: {deviceId}"));

Group Messages by Topic

// Group messages by their full topic
var grouped = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .SubscribeToTopic("sensors/#")
    .GroupByTopic()
    .Subscribe(group =>
    {
        Console.WriteLine($"New group for topic: {group.Key}");
        group.Subscribe(m => Console.WriteLine($"  Message: {m.ApplicationMessage.ConvertPayloadToString()}"));
    });

// Group by a specific topic level
var groupedByLevel = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .SubscribeToTopic("sensors/+/+")
    .GroupByTopicLevel(1) // Group by sensor ID
    .Subscribe(group =>
    {
        Console.WriteLine($"Messages for sensor: {group.Key}");
        group.Subscribe(m => Console.WriteLine($"  {m.ApplicationMessage.Topic}"));
    });

🚀 Memory-Efficient Extensions

For high-throughput scenarios, use the low-allocation extensions in MQTTnet.Rx.Client.MemoryEfficient:

using MQTTnet.Rx.Client.MemoryEfficient;

var client = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .SubscribeToTopic("data/#");

// Zero-copy payload access with pooled buffers
var pooledSub = client
    .ToPooledPayload()
    .Subscribe(data =>
    {
        // Process the buffer
        Console.WriteLine($"Received {data.Length} bytes");
        
        // IMPORTANT: Return the buffer to the pool when done
        data.ReturnBuffer();
    });

// Get payload length without allocation
var lengthSub = client
    .GetPayloadLength()
    .Subscribe(len => Console.WriteLine($"Payload size: {len} bytes"));

// Convert to byte array (when you need to keep the data)
var arraySub = client
    .ToPayloadArray()
    .Subscribe(bytes => Console.WriteLine($"Array: {bytes.Length} bytes"));

// Low-allocation UTF-8 string decoding
var stringSub = client
    .ToUtf8StringLowAlloc()
    .Subscribe(s => Console.WriteLine($"Message: {s}"));

Batching and Throttling

using MQTTnet.Rx.Client.MemoryEfficient;

// Batch messages by time window
var batchedByTime = client
    .BatchProcess(
        TimeSpan.FromSeconds(1),
        batch => $"Processed {batch.Count} messages")
    .Subscribe(result => Console.WriteLine(result));

// Batch messages by count
var batchedByCount = client
    .BatchProcess(
        count: 100,
        batch => batch.Sum(m => m.ApplicationMessage.Payload.Length))
    .Subscribe(totalBytes => Console.WriteLine($"Batch total: {totalBytes} bytes"));

// Throttle high-frequency messages
var throttled = client
    .ThrottleMessages(TimeSpan.FromMilliseconds(100))
    .Subscribe(m => Console.WriteLine($"Throttled: {m.ApplicationMessage.Topic}"));

// Sample messages at intervals
var sampled = client
    .SampleMessages(TimeSpan.FromSeconds(1))
    .Subscribe(m => Console.WriteLine($"Sample: {m.ApplicationMessage.Topic}"));

Back-Pressure Handling

using MQTTnet.Rx.Client.MemoryEfficient;

// Drop messages when subscriber is slow
var dropSub = client
    .WithBackPressureDrop(onDrop: m => Console.WriteLine($"Dropped: {m.ApplicationMessage.Topic}"))
    .Subscribe(m => 
    {
        // Slow processing
        Thread.Sleep(100);
        Console.WriteLine($"Processed: {m.ApplicationMessage.Topic}");
    });

// Queue messages with overflow handling
var queueSub = client
    .WithBackPressureQueue(
        maxQueueSize: 1000,
        onOverflow: m => Console.WriteLine($"Queue overflow, dropped: {m.ApplicationMessage.Topic}"))
    .Subscribe(m => Console.WriteLine($"Queued: {m.ApplicationMessage.Topic}"));

Efficient Topic Filtering

using MQTTnet.Rx.Client.MemoryEfficient;

// Span-based prefix matching (no string allocation)
var prefixSub = client
    .WhereTopicStartsWith("sensors/")
    .Subscribe(m => Console.WriteLine($"Sensor message: {m.ApplicationMessage.Topic}"));

// Span-based suffix matching
var suffixSub = client
    .WhereTopicEndsWith("/status")
    .Subscribe(m => Console.WriteLine($"Status message: {m.ApplicationMessage.Topic}"));

// Observe on thread pool to avoid blocking MQTT client
var threadPoolSub = client
    .ObserveOnThreadPool()
    .Subscribe(m => 
    {
        // Heavy processing won't block MQTT receive loop
        ProcessMessage(m);
    });

📚 Advanced publish options

All publish helpers accept QoS and retain flags, with overloads for string or byte[] payloads.

using MQTTnet.Protocol;

var msgs = new Subject<(string topic, string payload)>();

var pub = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .PublishMessage(msgs, qos: MqttQualityOfServiceLevel.AtLeastOnce, retain: false)
    .Subscribe(r => Console.WriteLine($"Sent: {r.ReasonCode}"));

// Custom message builder (add user properties etc.)
var pub2 = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .PublishMessage(
        msgs,
        messageBuilder: b => b.WithUserProperty("app", "demo"))
    .Subscribe();

Binary payloads (byte[])

using System.Reactive.Subjects;
using MQTTnet.Rx.Client;

// Publish byte[] payloads with raw client
var bytes = new Subject<(string topic, byte[] payload)>();
var pubBytes = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .PublishMessage(bytes)
    .Subscribe();

bytes.OnNext(("images/frame", new byte[] { 0x01, 0x02, 0x03 }));

// Subscribe and access raw bytes (zero-copy helpers)
var subBytes = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .SubscribeToTopic("images/#")
    .Subscribe(m =>
    {
        // Avoid ToArray: use Payload() or PayloadUtf8()
        var payload = m.Payload(); // ReadOnlySequence<byte>
        Console.WriteLine($"Got {payload.Length} bytes from {m.ApplicationMessage.Topic}");
    });

// Resilient client publish with byte[]
var pubBytesResilient = Create.ResilientMqttClient()
    .WithResilientClientOptions(o => o.WithClientOptions(c => c.WithTcpServer("localhost", 1883)))
    .PublishMessage(bytes)
    .Subscribe();

🔔 Last Will and Testament (LWT)

Configure Last Will messages that are published when the client disconnects unexpectedly:

using MQTTnet.Protocol;
using MQTTnet.Rx.Client;

// Simple string LWT
var client1 = Create.MqttClient()
    .WithClientOptions(c => c
        .WithTcpServer("localhost", 1883)
        .WithClientId("device-001")
        .WithLastWill("devices/device-001/status", "offline", MqttQualityOfServiceLevel.AtLeastOnce, retain: true))
    .Subscribe();

// Binary LWT
var client2 = Create.MqttClient()
    .WithClientOptions(c => c
        .WithTcpServer("localhost", 1883)
        .WithLastWill("devices/device-002/status", new byte[] { 0x00 }))
    .Subscribe();

// JSON LWT with typed object
public class DeviceStatus
{
    public string Status { get; set; }
    public DateTime Timestamp { get; set; }
}

var client3 = Create.MqttClient()
    .WithClientOptions(c => c
        .WithTcpServer("localhost", 1883)
        .WithLastWillJson("devices/device-003/status", new DeviceStatus 
        { 
            Status = "offline", 
            Timestamp = DateTime.UtcNow 
        }))
    .Subscribe();

// LWT with delay (will delay message delivery)
var client4 = Create.MqttClient()
    .WithClientOptions(c => c
        .WithTcpServer("localhost", 1883)
        .WithLastWill("devices/device-004/status", "offline")
        .WithWillDelayInterval(30)) // 30 second delay
    .Subscribe();

🔐 TLS/SSL configuration (server and client)

This section shows how to enable TLS on the MQTT broker and connect securely from clients using MQTTnet v5.

1) Prepare certificates

  • Create or obtain a server certificate (PFX with private key). For development you can use a self-signed cert.
  • Optionally, create a client certificate (PFX) if you want mutual TLS (client authentication).
  • Ensure the issuing CA (or the self-signed cert) is trusted on the machines running the client(s).

2) Enable TLS on the broker

using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using MQTTnet.Rx.Server;

var serverCert = new X509Certificate2("server.pfx", "pfx-password");

var server = Create.MqttServer(b =>
        b.WithDefaultEndpoint(false)                // disable plain TCP endpoint
         .WithEncryptedEndpoint()                   // enable TLS endpoint
         .WithEncryptedEndpointPort(8883)          // standard MQTT over TLS port
         .WithEncryptionCertificate(serverCert)    // use your server certificate
         .WithEncryptionSslProtocol(SslProtocols.Tls12)
         .Build())
    .Subscribe(sub =>
    {
        sub.Disposable.Add(sub.Server.Started().Subscribe(_ => Console.WriteLine("Secure broker started on 8883")));
        // Optional: validate connecting clients, enforce client certs, etc.
        sub.Disposable.Add(sub.Server.ValidatingConnection().Subscribe(args =>
        {
            // Example: require TLS
            if (!args.IsSecureConnection)
            {
                args.ReasonCode = MQTTnet.Protocol.MqttConnectReasonCode.ClientIdentifierNotValid;
            }
        }));
    });

Notes:

  • WithDefaultEndpoint(false) disables the unsecured (tcp) listener; omit or set true to keep both.
  • WithEncryptionSslProtocol can be set to Tls13 if available on your platform.

3) Connect a TLS client (server authentication)

using System.Security.Authentication;
using MQTTnet.Rx.Client;

var client = Create.ResilientMqttClient()
    .WithResilientClientOptions(o => o.WithClientOptions(c =>
        c.WithTcpServer("your-broker-host", 8883)
         .WithTlsOptions(tls =>
            tls.WithSslProtocols(SslProtocols.Tls12)
               .WithIgnoreCertificateChainErrors(false)
               .WithIgnoreCertificateRevocationErrors(false))))
    .Subscribe();

// Using the helper extensions
var client2 = Create.MqttClient()
    .WithClientOptions(c => c
        .WithTcpServer("your-broker-host", 8883)
        .WithTlsEnabled()
        .WithTlsProtocols(SslProtocols.Tls12))
    .Subscribe();

4) Connect a TLS client with client certificate (mutual TLS)

using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using MQTTnet.Rx.Client;

var clientCert = new X509Certificate2("client.pfx", "pfx-password");

var client = Create.ResilientMqttClient()
    .WithResilientClientOptions(o => o.WithClientOptions(c =>
        c.WithTcpServer("your-broker-host", 8883)
         .WithTlsOptions(tls =>
            tls.WithSslProtocols(SslProtocols.Tls12)
               .WithCertificates(new[] { clientCert })
               // For development only: accept untrusted/invalid chains
               .WithIgnoreCertificateChainErrors(true)
               .WithIgnoreCertificateRevocationErrors(true))))
    .Subscribe();

// Using the helper extensions
var client2 = Create.MqttClient()
    .WithClientOptions(c => c
        .WithTcpServer("your-broker-host", 8883)
        .WithTlsClientCertificate(clientCert))
    .Subscribe();

// With custom certificate validation
var client3 = Create.MqttClient()
    .WithClientOptions(c => c
        .WithTcpServer("your-broker-host", 8883)
        .WithTlsCertificateValidation(args =>
        {
            // Custom validation logic
            Console.WriteLine($"Validating cert: {args.Certificate.Subject}");
            return true; // Accept
        }))
    .Subscribe();

Tips:

  • On production, install and trust the CA certificate instead of ignoring validation.
  • When using mutual TLS, validate the client certificate on the server (via ValidatingConnection or a custom connection validator).

🔄 WhenReady – Gate Pipelines on Connection

Use WhenReady() to ensure operations only proceed when the resilient client is connected:

using MQTTnet.Rx.Client;

var client = Create.ResilientMqttClient()
    .WithResilientClientOptions(o => o.WithClientOptions(c => c.WithTcpServer("localhost", 1883)));

// Only process when connected
var readySub = client
    .WhenReady()
    .Subscribe(c =>
    {
        Console.WriteLine("Client ready, starting operations...");
        // Start publishing, subscribing, etc.
    });

// Combine with other operations
var safePub = client
    .WhenReady()
    .SelectMany(c => c.ApplicationMessageReceived)
    .Subscribe(m => Console.WriteLine($"Safe receive: {m.ApplicationMessage.Topic}"));

Retained messages (server-side and client behavior)

  • When publishing with retain = true, the broker stores the last payload for the topic and delivers it to new subscribers.
  • To clear a retained message, publish an empty payload with retain = true to that topic.
  • Broker events for diagnostics are exposed via MQTTnet.Rx.Server.
using MQTTnet.Protocol;
using MQTTnet.Rx.Server;

// Publish retained and later clear it
var msgsRetain = new Subject<(string topic, string payload)>();
var pubRetained = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .PublishMessage(msgsRetain, qos: MqttQualityOfServiceLevel.AtLeastOnce, retain: true)
    .Subscribe();

msgsRetain.OnNext(("status/line1", "online"));
// Clear the retained message (per MQTT spec -> empty payload + retain)
msgsRetain.OnNext(("status/line1", string.Empty));

// Server-side: observe retained message lifecycle
var server = MQTTnet.Rx.Server.Create.MqttServer(b => b.WithDefaultEndpointPort(2883).WithDefaultEndpoint().Build())
    .Subscribe(sub =>
    {
        sub.Disposable.Add(sub.Server.RetainedMessageChanged().Subscribe(e => Console.WriteLine($"Retained changed: {e.ApplicationMessage.Topic}")));
        sub.Disposable.Add(sub.Server.RetainedMessagesCleared().Subscribe(_ => Console.WriteLine("All retained cleared")));
        sub.Disposable.Add(sub.Server.LoadingRetainedMessage().Subscribe(_ => Console.WriteLine("Loading retained at startup")));
    });

Server – Reactive MQTT broker

Spin up an in-process MQTT broker and subscribe to server-side events with IObservable.

using MQTTnet.Rx.Server;

var server = Create.MqttServer(builder =>
        builder.WithDefaultEndpointPort(2883)
               .WithDefaultEndpoint()
               .Build())
    .Subscribe(async sub =>
    {
        // Subscribe to server events
        sub.Disposable.Add(sub.Server.ClientConnected().Subscribe(e => Console.WriteLine($"SERVER: Connected {e.ClientId}")));
        sub.Disposable.Add(sub.Server.ClientDisconnected().Subscribe(e => Console.WriteLine($"SERVER: Disconnected {e.ClientId}")));
        sub.Disposable.Add(sub.Server.InterceptingPublish().Subscribe(e => Console.WriteLine($"SERVER: Publish {e.ApplicationMessage.Topic}")));
    });

🧰 Payload Extensions

Zero-copy and efficient payload access:

using MQTTnet.Rx.Client;

var client = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .SubscribeToTopic("data/#");

// ReadOnlySequence<byte> - zero-copy access
var seqSub = client
    .Subscribe(e =>
    {
        var payload = e.Payload(); // ReadOnlySequence<byte>
        Console.WriteLine($"Payload length: {payload.Length}");
    });

// UTF-8 string without ToArray()
var utf8Sub = client
    .Subscribe(e =>
    {
        var text = e.PayloadUtf8(); // Efficient UTF-8 decoding
        Console.WriteLine($"Text: {text}");
    });

// Observable projection to UTF-8 strings
var stringSub = client
    .ToUtf8String()
    .Subscribe(text => Console.WriteLine($"Message: {text}"));

🏭 Diagnostics

  • Topic matching: SubscribeToTopic supports wildcards (+ and #). Internally, topics are matched using MqttTopicFilterComparer.
  • Duplicate subscriptions: SubscribeToTopic reference-counts identical topic filters per client and unsubscribes when the last observer disposes.
  • Packet inspection (raw client):
var c = Create.MqttClient().WithClientOptions(o => o.WithTcpServer("localhost", 1883));
var sniff = c.Subscribe(cli => cli.InspectPackage().Subscribe(p => Console.WriteLine($"{p.Direction} {p.Packet}") ));

Integration packages

These packages provide focused helpers to bridge other systems to MQTT using observables. Refer to the respective package documentation for how to create and configure the underlying connections/clients.

Modbus (MQTTnet.Rx.Modbus)

Publish Modbus registers/coils/inputs as MQTT messages. You can provide either a preconfigured Modbus master or a factory.

  • FromMaster(master): wrap an existing ModbusIpMaster
  • FromFactory(() ⇒ new ModbusIpMaster(...)): wrap a factory creating a master

Raw client publish (IMqttClient) with QoS/retain:

using MQTTnet.Rx.Modbus;
using MQTTnet.Protocol;

var modbusState = Create.FromFactory(() => /* create and configure ModbusIpMaster */);

// Input registers
var pubInRegs = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .PublishInputRegisters(modbusState, topic: "modbus/inputregs", startAddress: 0, numberOfPoints: 8, interval: 250, qos: MqttQualityOfServiceLevel.AtLeastOnce, retain: false)
    .Subscribe();

// Holding registers
var pubHold = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .PublishHoldingRegisters(modbusState, topic: "modbus/holdingregs", startAddress: 0, numberOfPoints: 8, interval: 500)
    .Subscribe();

// Inputs (discrete)
var pubInputs = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .PublishInputs(modbusState, topic: "modbus/inputs", startAddress: 0, numberOfPoints: 16, interval: 250)
    .Subscribe();

// Coils
var pubCoils = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .PublishCoils(modbusState, topic: "modbus/coils", startAddress: 0, numberOfPoints: 16, interval: 250)
    .Subscribe();

Resilient client publish (IResilientMqttClient):

using MQTTnet.Rx.Modbus;
using MQTTnet.Protocol;

var modbusState = Create.FromMaster(preconfiguredMaster);

var resilient = Create.ResilientMqttClient()
    .WithResilientClientOptions(o => o.WithClientOptions(c => c.WithTcpServer("localhost", 1883)).WithAutoReconnectDelay(TimeSpan.FromSeconds(2)));

var p1 = resilient.PublishInputRegisters(modbusState, "modbus/inputregs", 0, 8, 250).Subscribe();
var p2 = resilient.PublishHoldingRegisters(modbusState, "modbus/holdingregs", 0, 8, 500).Subscribe();
var p3 = resilient.PublishInputs(modbusState, "modbus/inputs", 0, 16, 250).Subscribe();
var p4 = resilient.PublishCoils(modbusState, "modbus/coils", 0, 16, 250).Subscribe();

Custom payloads (string/byte[]) via factory:

// JSON with additional metadata
var reader = modbusState.ReadHoldingRegisters(0, 8, 500)
    .Select(r => (connected: true, error: (Exception?)null, data: new { ts = DateTime.UtcNow, values = r.data } as object));

var pubCustom = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .PublishModbus(reader, topic: "modbus/custom", payloadFactory: d => JsonConvert.SerializeObject(d))
    .Subscribe();

// Binary payload
var pubBin = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .PublishModbus(reader, topic: "modbus/customBin", payloadFactory: d => Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(d)))
    .Subscribe();

Write values from MQTT to Modbus (subscribe + write):

// Shortcuts with default parsers
var subReg1 = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .SubscribeWriteSingleRegister(modbusState, topic: "modbus/write/reg/10", address: 10, writer: (m, v) => m.WriteSingleRegister(10, v));

var subRegs = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .SubscribeWriteMultipleRegisters(modbusState, topic: "modbus/write/regs", startAddress: 0, writer: (m, values) => m.WriteMultipleRegisters(0, values));

var subCoil1 = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .SubscribeWriteSingleCoil(modbusState, topic: "modbus/write/coil/5", address: 5, writer: (m, v) => m.WriteSingleCoil(5, v));

var subCoils = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .SubscribeWriteMultipleCoils(modbusState, topic: "modbus/write/coils", startAddress: 0, writer: (m, values) => m.WriteMultipleCoils(0, values));

// Or generic form mapping your own parser/writer
var generic = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .SubscribeWrite(modbusState, topic: "modbus/write/custom", parse: s => ushort.Parse(s), writer: (m, v) => m.WriteSingleRegister(20, v));

Notes:

  • FromMaster and FromFactory help you pass your own configured master or a factory.
  • All publish helpers accept qos/retain.
  • For write operations, use SubscribeWrite* helpers or the generic SubscribeWrite/SubscribeWriteAsync to plug the correct Modbus API calls.

Serial Port (MQTTnet.Rx.SerialPort)

Publish framed serial data to MQTT using CP.IO.Ports ISerialPortRx.

using System;
using System.Reactive.Linq;
using CP.IO.Ports;
using MQTTnet.Rx.SerialPort;

// Create and configure an ISerialPortRx (COM port, baud, parity, etc.)
ISerialPortRx port = /* create and configure an ISerialPortRx */;

var serialPub = Create.MqttClient()
    .WithClientOptions(c => c.WithTcpServer("localhost", 1883))
    .PublishSerialPort(
        topic: "serial/data",
        serialPort: port,
        startsWith: Observable.Return('<'),
        endsWith: Observable.Return('>'),
        timeOut: 1000)
    .Subscribe();

PLCs – S7, Allen-Bradley, TwinCAT

Each PLC package adds helpers to publish or subscribe PLC tags via MQTT using the respective Rx client libraries (S7PlcRx, ABPlcRx, CP.TwinCatRx). Example signatures include:

  • MQTTnet.Rx.S7Plc: PublishS7PlcTag<T>(client, topic, plcVariable, configurePlc)
  • MQTTnet.Rx.ABPlc: PublishABPlcTag<T>(client, topic, plcVariable, configurePlc)
  • MQTTnet.Rx.TwinCAT: PublishTcPlcTag<T>(client, topic, plcVariable, configurePlc)

Refer to those libraries for creating and configuring connected PLC clients, then use the Publish* helpers to push tag changes to MQTT. You can also combine SubscribeToTopic with your PLC client to write incoming values back to tags.


📖 API Reference Summary

Create (Factory Methods)

Method Description
Create.MqttClient() Creates a shared raw MQTT client observable
Create.ResilientMqttClient() Creates a shared resilient MQTT client observable
Create.MqttFactory Gets the default MqttClientFactory instance
.WithClientOptions() Configures and connects the raw client
.WithResilientClientOptions() Configures and starts the resilient client

Subscribe Extensions

Method Description
.SubscribeToTopic(topic) Subscribe to a single topic with wildcards
.SubscribeToTopics(topics...) Subscribe to multiple topics
.ToDictionary() Parse JSON payload to Dictionary
.ToObject<T>() Deserialize JSON to typed object
.Observe(key) Observe a specific key from Dictionary stream

Publish Extensions

Method Description
.PublishMessage(messages) Publish from observable stream
.Publish(topic, payload) Publish single message
.PublishMany(messages) Publish multiple messages

Topic Filter Extensions

Method Description
.WhereTopicIsMatch(filter) Filter by MQTT topic pattern
.WhereTopicMatchesAny(filters...) Match any of multiple patterns
.WhereTopicIsNotMatch(filter) Exclude matching topics
.ExtractTopicValues(pattern) Extract named values from topic
.WhereTopicLevelCount(count) Filter by topic level count
.SelectTopicLevel(index) Extract specific topic level
.GroupByTopic() Group messages by topic
.GroupByTopicLevel(index) Group by specific topic level

Memory-Efficient Extensions

Method Description
.ToPooledPayload() Get payload with pooled buffer
.GetPayloadLength() Get payload length
.ToPayloadArray() Convert to byte array
.ToUtf8StringLowAlloc() Low-allocation UTF-8 decoding
.BatchProcess(timeSpan, processor) Batch by time window
.BatchProcess(count, processor) Batch by count
.ThrottleMessages(duration) Throttle message rate
.SampleMessages(interval) Sample at intervals
.WithBackPressureDrop() Drop when slow
.WithBackPressureQueue() Queue with overflow handling
.WhereTopicStartsWith(prefix) Span-based prefix filter
.WhereTopicEndsWith(suffix) Span-based suffix filter
.ObserveOnThreadPool() Move processing to thread pool

Payload Extensions

Method Description
.Payload() Get ReadOnlySequence<byte>
.PayloadUtf8() Get UTF-8 string
.ToUtf8String() Observable projection to strings

Connection Extensions

Method Description
.WhenReady() Emit when client is connected
.ConnectionStatus() Observable of connection state
.WaitForConnection(timeout) Wait for connection
.Ping() Send ping request
.PingPeriodically(interval) Periodic keep-alive pings
.Disconnect(reason) Graceful disconnect
.Reconnect() Reconnect with previous options

Type Conversion

Method Description
.ToBool() Convert to boolean
.ToByte() Convert to byte
.ToInt16() Convert to short
.ToInt32() Convert to int
.ToInt64() Convert to long
.ToSingle() Convert to float
.ToDouble() Convert to double

📄 License

This project is licensed under the MIT License - see the LICENSE file for details.

Contributing

Issues and PRs are welcome.


MQTTnet.Rx - Empowering Industrial Automation with Reactive Technology ⚡🏭

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
4.0.2 82 1/19/2026
3.2.2 91 1/17/2026
3.1.0 214 1/22/2025
3.0.0 219 1/17/2025
2.3.0 207 10/20/2024
2.2.3 191 9/12/2024
2.2.2 201 9/9/2024
2.2.1 217 9/7/2024
2.2.0 264 9/6/2024
2.1.1 219 8/16/2024
2.0.1 228 3/19/2024
1.3.1 279 12/26/2023
1.2.2 261 10/16/2023
1.2.1 206 10/15/2023
1.1.1 223 9/7/2023
1.1.0 281 8/19/2023
0.9.0 278 4/30/2023
0.8.0 337 3/18/2023
0.7.0 426 1/1/2023
0.2.0 492 10/29/2022
0.1.0 483 8/5/2022

Compatability with Net 8 / 9