SerialPortRx 3.4.4
dotnet add package SerialPortRx --version 3.4.4
NuGet\Install-Package SerialPortRx -Version 3.4.4
<PackageReference Include="SerialPortRx" Version="3.4.4" />
<PackageVersion Include="SerialPortRx" Version="3.4.4" />
<PackageReference Include="SerialPortRx" />
paket add SerialPortRx --version 3.4.4
#r "nuget: SerialPortRx, 3.4.4"
#:package SerialPortRx@3.4.4
#addin nuget:?package=SerialPortRx&version=3.4.4
#tool nuget:?package=SerialPortRx&version=3.4.4
SerialPortRx
A Reactive Serial, TCP, and UDP I/O library that exposes incoming data as IObservable streams and accepts writes via simple methods. Ideal for event-driven, message-framed, and polling scenarios.
Features
- SerialPortRx: Reactive wrapper for System.IO.Ports.SerialPort
- UdpClientRx and TcpClientRx: Reactive wrappers exposing a common IPortRx interface
- Observables:
- DataReceived: IObservable<char> for serial text flow
- DataReceivedBytes: IObservable<byte> for raw byte stream (auto-receive mode)
- Lines: IObservable<string> of complete lines split by NewLine
- BytesReceived: IObservable<int> for byte stream emitted when using ReadAsync
- IsOpenObservable: IObservable<bool> for connection state
- ErrorReceived: IObservable<Exception> for errors
- PinChanged: IObservable<SerialPinChangedEventArgs> for pin state changes (Windows only)
- Synchronous read methods for manual data consumption
- TCP/UDP batched reads:
- TcpClientRx.DataReceivedBatches: IObservable<byte[]> chunks per read loop
- UdpClientRx.DataReceivedBatches: IObservable<byte[]> per received datagram
- Helpers:
- PortNames(): reactive port enumeration with change notifications
- BufferUntil(): message framing between start and end delimiters with timeout
- WhileIsOpen(): periodic observable that fires only while a port is open
- Cross-targeted: netstandard2.0, net8.0, net9.0, net10.0, and Windows-specific TFMs
Installation
- dotnet add package SerialPortRx
Supported target frameworks
- netstandard2.0
- net8.0, net9.0, net10.0
- net8.0-windows10.0.19041.0, net9.0-windows10.0.19041.0, net10.0-windows10.0.19041.0 (adds Windows-only APIs guarded by HasWindows)
Quick start (Serial)
using System;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using CP.IO.Ports;
using ReactiveMarbles.Extensions;
var disposables = new CompositeDisposable();
var port = new SerialPortRx("COM3", 115200) { ReadTimeout = -1, WriteTimeout = -1 };
// Observe line/state/errors
port.IsOpenObservable.Subscribe(isOpen => Console.WriteLine($"Open: {isOpen}")).DisposeWith(disposables);
port.ErrorReceived.Subscribe(ex => Console.WriteLine($"Error: {ex.Message}")).DisposeWith(disposables);
// Raw character stream
port.DataReceived.Subscribe(ch => Console.Write(ch)).DisposeWith(disposables);
await port.Open();
port.WriteLine("AT");
// Close when done
port.Close();
disposables.Dispose();
Discovering serial ports
// Emits the list of available port names whenever it changes
SerialPortRx.PortNames(pollInterval: 500)
.Subscribe(names => Console.WriteLine(string.Join(", ", names)));
To auto-connect when a specific COM port appears:
var target = "COM3";
var comDisposables = new CompositeDisposable();
SerialPortRx.PortNames()
.Do(names =>
{
if (comDisposables.Count == 0 && Array.Exists(names, n => string.Equals(n, target, StringComparison.OrdinalIgnoreCase)))
{
var port = new SerialPortRx(target, 115200);
port.DisposeWith(comDisposables);
port.ErrorReceived.Subscribe(Console.WriteLine).DisposeWith(comDisposables);
port.IsOpenObservable.Subscribe(open => Console.WriteLine($"{target}: {(open ? "Open" : "Closed")}"))
.DisposeWith(comDisposables);
port.Open();
}
else if (!Array.Exists(names, n => string.Equals(n, target, StringComparison.OrdinalIgnoreCase)))
{
comDisposables.Dispose(); // auto-cleanup if device removed
}
})
.ForEach()
.Subscribe();
Message framing with BufferUntil
BufferUntil helps extract framed messages from the character stream between a start and end delimiter within a timeout.
// Example: messages start with '!' and end with '\n' and must complete within 100ms
var start = 0x21.AsObservable(); // '!'
var end = 0x0a.AsObservable(); // '\n'
port.DataReceived
.BufferUntil(start, end, timeOut: 100)
.Subscribe(msg => Console.WriteLine($"MSG: {msg}"));
A variant returns a default message on timeout:
port.DataReceived
.BufferUntil(start, end, defaultValue: Observable.Return("<timeout>"), timeOut: 100)
.Subscribe(msg => Console.WriteLine($"MSG: {msg}"));
Periodic work while the port is open
// Write a heartbeat every 500ms but only while the port remains open
port.WhileIsOpen(TimeSpan.FromMilliseconds(500))
.Subscribe(_ => port.Write("PING\n"));
Reading raw bytes with ReadAsync
Use ReadAsync for binary protocols or fixed-length reads. Each byte successfully read is also pushed to BytesReceived.
var buffer = new byte[64];
int read = await port.ReadAsync(buffer, 0, buffer.Length);
Console.WriteLine($"Read {read} bytes");
port.BytesReceived.Subscribe(b => Console.WriteLine($"Byte: {b:X2}"));
Notes:
- DataReceived is a char stream produced from SerialPort.ReadExisting() when EnableAutoDataReceive is true (default).
- DataReceivedBytes emits raw bytes alongside DataReceived in auto-receive mode.
- BytesReceived emits bytes read by your ReadAsync calls (not from ReadExisting()).
- Concurrent ReadAsync calls are serialized internally for safety.
Automatic vs Manual Data Reception
By default, EnableAutoDataReceive = true automatically feeds incoming data to DataReceived and DataReceivedBytes observables. Set this to false before calling Open() if you want to use synchronous read methods instead.
// Automatic mode (default) - data flows to observables
var port = new SerialPortRx("COM3", 115200);
port.DataReceived.Subscribe(ch => Console.Write(ch));
await port.Open();
// Manual mode - use synchronous reads
var port = new SerialPortRx("COM3", 115200) { EnableAutoDataReceive = false };
await port.Open();
string data = port.ReadExisting();
If you disable auto-receive but later want reactive streaming, call StartDataReception():
port.EnableAutoDataReceive = false;
await port.Open();
// Later, enable reactive streaming manually
var reception = port.StartDataReception(pollingIntervalMs: 10);
port.DataReceived.Subscribe(ch => Console.Write(ch));
// Stop when done
reception.Dispose();
Synchronous Read Methods
When EnableAutoDataReceive = false, use these synchronous methods for manual data consumption:
var port = new SerialPortRx("COM3", 115200) { EnableAutoDataReceive = false, ReadTimeout = 1000 };
await port.Open();
// Read all available data as string
string existing = port.ReadExisting();
// Read a single byte (-1 if none available)
int b = port.ReadByte();
// Read a single character (-1 if none available)
int ch = port.ReadChar();
// Read into a byte buffer
var buffer = new byte[64];
int bytesRead = port.Read(buffer, 0, buffer.Length);
// Read into a char buffer
var charBuffer = new char[64];
int charsRead = port.Read(charBuffer, 0, charBuffer.Length);
// Read until newline (respects NewLine property)
string line = port.ReadLine();
// Read until a specific delimiter
string data = port.ReadTo(">");
Reading lines
Use ReadLineAsync to await a single complete line split by the configured NewLine. Supports single- and multi-character newline sequences and respects ReadTimeout (> 0).
port.NewLine = "\r\n"; // optional: default is "\n"
var line = await port.ReadLineAsync();
Console.WriteLine($"Line: {line}");
You can also pass a CancellationToken:
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var line = await port.ReadLineAsync(cts.Token);
ReadToAsync
Read data up to a specific delimiter asynchronously:
// Read until '>' delimiter
var data = await port.ReadToAsync(">");
Console.WriteLine($"Received: {data}");
// With cancellation
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var data = await port.ReadToAsync(">", cts.Token);
Line streaming with Lines
Subscribe to Lines to get a continuous stream of complete lines:
port.NewLine = "\n";
port.Lines.Subscribe(line => Console.WriteLine($"LINE: {line}"));
Writing
port.Write(string text)- Write a stringport.WriteLine(string text)- Write a string followed by NewLineport.Write(byte[] buffer)- Write entire byte arrayport.Write(byte[] buffer, int offset, int count)- Write portion of byte arrayport.Write(char[] buffer)- Write entire char arrayport.Write(char[] buffer, int offset, int count)- Write portion of char array
Modern .NET Write Overloads (net8.0+)
On modern .NET targets, additional Span-based overloads are available:
// Write from ReadOnlySpan<byte>
ReadOnlySpan<byte> data = stackalloc byte[] { 0x01, 0x02, 0x03 };
port.Write(data);
// Write from ReadOnlyMemory<byte>
ReadOnlyMemory<byte> memory = new byte[] { 0x01, 0x02, 0x03 };
port.Write(memory);
// Write from ReadOnlySpan<char>
ReadOnlySpan<char> chars = "Hello".AsSpan();
port.Write(chars);
Error handling and state
- Subscribe to
port.ErrorReceivedfor exceptions and serial errors. - Subscribe to
port.IsOpenObservableto react to open/close transitions. - Call
port.Close()or dispose subscriptions (DisposeWith) to release the port.
Buffer Management
// Discard pending input data
port.DiscardInBuffer();
// Discard pending output data
port.DiscardOutBuffer();
// Check buffer sizes
Console.WriteLine($"Bytes to read: {port.BytesToRead}");
Console.WriteLine($"Bytes to write: {port.BytesToWrite}");
Windows-only: Pin Changed Events
On Windows targets, subscribe to pin state changes:
#if HasWindows
port.PinChanged.Subscribe(args =>
Console.WriteLine($"Pin changed: {args.EventType}"));
#endif
TCP/UDP variants
The TcpClientRx and UdpClientRx classes implement the same IPortRx interface for a similar reactive experience with sockets.
TCP example:
var tcp = new TcpClientRx("example.com", 80);
await tcp.Open();
var req = System.Text.Encoding.ASCII.GetBytes("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n");
tcp.Write(req, 0, req.Length);
var buf = new byte[1024];
var n = await tcp.ReadAsync(buf, 0, buf.Length);
Console.WriteLine(System.Text.Encoding.ASCII.GetString(buf, 0, n));
UDP example:
var udp = new UdpClientRx(12345);
await udp.Open();
var buf = new byte[16];
var n = await udp.ReadAsync(buf, 0, buf.Length);
Console.WriteLine($"UDP read {n} bytes");
Batched receive (TCP/UDP)
Subscribe to batched byte arrays for throughput-sensitive pipelines:
// TCP batched chunks per read loop
new TcpClientRx("example.com", 80).DataReceivedBatches
.Subscribe(chunk => Console.WriteLine($"TCP chunk size: {chunk.Length}"));
// UDP per-datagram batches
new UdpClientRx(12345).DataReceivedBatches
.Subscribe(datagram => Console.WriteLine($"UDP datagram size: {datagram.Length}"));
Threading and scheduling
- The DataReceived and other streams run on the underlying event threads. Use ObserveOn to marshal to a UI or a dedicated scheduler when needed.
- ReadAsync uses a lightweight lock and offloads blocking reads, avoiding CPU spin.
Tips and best practices
- Subscribe before calling Open() to ensure you donβt miss events.
- Tune Encoding (default ASCII), BaudRate, Parity, StopBits, and Handshake to match your device.
- Use BufferUntil for delimited protocols. For binary protocols, use ReadAsync with fixed sizes.
- Use Lines when dealing with text protocols; use ReadLineAsync when you need a one-shot line.
- Always dispose subscriptions (DisposeWith) and call Close() when done.
Example program (complete)
using System;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using CP.IO.Ports;
using ReactiveMarbles.Extensions;
internal static class Program
{
private static async System.Threading.Tasks.Task Main()
{
const string comPortName = "COM1";
const string dataToWrite = "DataToWrite";
var dis = new CompositeDisposable();
var startChar = 0x21.AsObservable(); // '!'
var endChar = 0x0a.AsObservable(); // '\n'
var comdis = new CompositeDisposable();
SerialPortRx.PortNames().Do(names =>
{
if (comdis.Count == 0 && names.Contains(comPortName))
{
var port = new SerialPortRx(comPortName, 9600);
port.DisposeWith(comdis);
port.ErrorReceived.Subscribe(Console.WriteLine).DisposeWith(comdis);
port.IsOpenObservable.Subscribe(open => Console.WriteLine($"{comPortName} {(open ? "Open" : "Closed")}"))
.DisposeWith(comdis);
port.DataReceived
.BufferUntil(startChar, endChar, 100)
.Subscribe(data => Console.WriteLine($"Data: {data}"))
.DisposeWith(comdis);
port.WhileIsOpen(TimeSpan.FromMilliseconds(500))
.Subscribe(_ => port.Write(dataToWrite))
.DisposeWith(comdis);
port.Open().Wait();
}
else if (!names.Contains(comPortName))
{
comdis.Dispose();
Console.WriteLine($"Port {comPortName} Disposed");
}
}).ForEach().Subscribe(Console.WriteLine).DisposeWith(dis);
Console.ReadLine();
comdis.Dispose();
dis.Dispose();
}
}
π License
This project is licensed under the MIT License - see the LICENSE file for details.
π€ Contributing
Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
Sponsorship
If you find this library useful and would like to support its development, consider sponsoring the project on GitHub Sponsors.
SerialPortRx - Empowering Industrial Automation with Reactive Technology β‘π
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net8.0-windows10.0.19041 is compatible. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net9.0-windows10.0.19041 is compatible. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. net10.0-windows10.0.19041 is compatible. |
| .NET Framework | net462 is compatible. net463 was computed. net47 was computed. net471 was computed. net472 is compatible. net48 was computed. net481 is compatible. |
-
.NETFramework 4.6.2
- ReactiveUI.Extensions (>= 2.1.1)
- System.IO.Ports (>= 10.0.2)
- System.Memory (>= 4.6.3)
- System.Reactive (>= 6.1.0)
-
.NETFramework 4.7.2
- ReactiveUI.Extensions (>= 2.1.1)
- System.IO.Ports (>= 10.0.2)
- System.Memory (>= 4.6.3)
- System.Reactive (>= 6.1.0)
-
.NETFramework 4.8.1
- ReactiveUI.Extensions (>= 2.1.1)
- System.IO.Ports (>= 10.0.2)
- System.Memory (>= 4.6.3)
- System.Reactive (>= 6.1.0)
-
net10.0
- ReactiveUI.Extensions (>= 2.1.1)
- System.IO.Ports (>= 10.0.2)
- System.Reactive (>= 6.1.0)
-
net10.0-windows10.0.19041
- ReactiveUI.Extensions (>= 2.1.1)
- System.IO.Ports (>= 10.0.2)
- System.Reactive (>= 6.1.0)
-
net8.0
- ReactiveUI.Extensions (>= 2.1.1)
- System.IO.Ports (>= 10.0.2)
- System.Reactive (>= 6.1.0)
-
net8.0-windows10.0.19041
- ReactiveUI.Extensions (>= 2.1.1)
- System.IO.Ports (>= 10.0.2)
- System.Reactive (>= 6.1.0)
-
net9.0
- ReactiveUI.Extensions (>= 2.1.1)
- System.IO.Ports (>= 10.0.2)
- System.Reactive (>= 6.1.0)
-
net9.0-windows10.0.19041
- ReactiveUI.Extensions (>= 2.1.1)
- System.IO.Ports (>= 10.0.2)
- System.Reactive (>= 6.1.0)
NuGet packages (2)
Showing the top 2 NuGet packages that depend on SerialPortRx:
| Package | Downloads |
|---|---|
|
ModbusRx
An Reactive version of NModbus4 |
|
|
MQTTnet.Rx.SerialPort
Reactive extensions for MQTTnet Broker |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 3.4.4 | 149 | 1/16/2026 |
| 3.4.3 | 81 | 1/16/2026 |
| 3.3.2 | 105 | 12/30/2025 |
| 3.2.7 | 476 | 9/18/2025 |
| 3.1.2 | 654 | 8/18/2025 |
| 3.1.1 | 221 | 8/17/2025 |
| 3.0.3 | 2,370 | 11/24/2023 |
| 3.0.2 | 138 | 11/11/2023 |
| 2.3.4 | 1,100 | 2/18/2023 |
| 2.3.2 | 525 | 1/10/2023 |
| 2.3.1 | 440 | 1/10/2023 |
| 2.2.6 | 7,496 | 7/22/2022 |
| 2.1.4 | 591 | 7/21/2022 |
| 2.1.3 | 565 | 7/21/2022 |
| 2.1.2 | 601 | 7/20/2022 |
| 2.1.1 | 866 | 12/13/2021 |
| 1.4.4 | 931 | 5/1/2021 |
| 1.4.3 | 1,074 | 2/24/2020 |
| 1.4.2 | 842 | 1/19/2020 |
| 1.4.1 | 800 | 12/29/2019 |
| 1.4.0 | 833 | 12/29/2019 |
| 1.3.1 | 1,221 | 1/8/2019 |
| 1.3.0 | 1,087 | 10/17/2018 |
| 1.2.0 | 1,033 | 9/19/2018 |
| 1.1.0 | 1,179 | 8/15/2018 |
| 1.0.1 | 1,570 | 6/27/2018 |
| 1.0.0 | 1,556 | 5/28/2018 |
| 0.0.0.5 | 1,566 | 2/15/2018 |
| 0.0.0.4 | 1,580 | 9/29/2016 |
| 0.0.0.3 | 1,386 | 9/27/2016 |
| 0.0.0.2 | 1,328 | 9/26/2016 |
| 0.0.0.1 | 1,313 | 9/25/2016 |
Compatability with Net 8/ 9/ 10 and netstandard2.0