ReactiveUI.Extensions 2.0.1

Prefix Reserved
dotnet add package ReactiveUI.Extensions --version 2.0.1
                    
NuGet\Install-Package ReactiveUI.Extensions -Version 2.0.1
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="ReactiveUI.Extensions" Version="2.0.1" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="ReactiveUI.Extensions" Version="2.0.1" />
                    
Directory.Packages.props
<PackageReference Include="ReactiveUI.Extensions" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add ReactiveUI.Extensions --version 2.0.1
                    
#r "nuget: ReactiveUI.Extensions, 2.0.1"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package ReactiveUI.Extensions@2.0.1
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=ReactiveUI.Extensions&version=2.0.1
                    
Install as a Cake Addin
#tool nuget:?package=ReactiveUI.Extensions&version=2.0.1
                    
Install as a Cake Tool

ReactiveUI.Extensions

A focused collection of high�value Reactive Extensions (Rx) operators that do not ship with System.Reactive but are commonly needed when building reactive .NET applications.

The goal of this library is to:

  • Reduce boilerplate for frequent reactive patterns (timers, buffering, throttling, heartbeats, etc.)
  • Provide pragmatic, allocation?aware helpers for performance sensitive scenarios
  • Avoid additional dependencies � only System.Reactive is required

Supported Target Frameworks: .NET Standard 2.0, .NET 8, .NET 9, .NET 10.


Table of Contents

  1. Installation
  2. Quick Start
  3. API Catalog
  4. Operator Categories & Examples
  5. Performance Notes
  6. Thread Safety
  7. License

Installation

# Package coming soon (example)
dotnet add package ReactiveUI.Extensions

Reference the project directly while developing locally.


Quick Start

using System;
using System.Reactive.Linq;
using ReactiveUI.Extensions;

var source = Observable.Interval(TimeSpan.FromMilliseconds(120))
                       .Take(10)
                       .Select(i => (long?) (i % 3 == 0 ? null : i));

// 1. Filter nulls + convert to a Unit signal.
var signal = source.WhereIsNotNull().AsSignal();

// 2. Add a heartbeat if the upstream goes quiet for 500ms.
var withHeartbeat = source.WhereIsNotNull()
                          .Heartbeat(TimeSpan.FromMilliseconds(500), Scheduler.Default);

// 3. Retry with exponential backoff up to 5 times.
var resilient = Observable.Defer(() =>
        Observable.Throw<long>(new InvalidOperationException("Boom")))
    .RetryWithBackoff(maxRetries: 5, initialDelay: TimeSpan.FromMilliseconds(100));

// 4. Conflate bursty updates.
var conflated = source.Conflate(TimeSpan.FromMilliseconds(300), Scheduler.Default);

using (conflated.Subscribe(Console.WriteLine))
{
    Console.ReadLine();
}

API Catalog

Below is the full list of extension methods (grouped logically).
Some overloads omitted for brevity.

Category Operators
Null & Signal WhereIsNotNull, AsSignal
Timing & Scheduling SyncTimer, Schedule (overloads), ScheduleSafe, ThrottleFirst, DebounceImmediate
Inactivity / Liveness Heartbeat, DetectStale, BufferUntilInactive
Error Handling CatchIgnore, CatchAndReturn, OnErrorRetry (overloads), RetryWithBackoff
Combining & Aggregation CombineLatestValuesAreAllTrue, CombineLatestValuesAreAllFalse, GetMax, GetMin, Partition
Logical / Boolean Not, WhereTrue, WhereFalse
Async / Task SelectAsyncSequential, SelectLatestAsync, SelectAsyncConcurrent, SubscribeAsync (overloads), SynchronizeSynchronous, SynchronizeAsync, SubscribeSynchronous (overloads)
Backpressure Conflate
Filtering / Conditional Filter (Regex), TakeUntil (predicate), WaitUntil
Buffering BufferUntil, BufferUntilInactive
Transformation & Utility Shuffle, ForEach, FromArray, Using, While, Start, OnNext (params helper), DoOnSubscribe, DoOnDispose

Operator Categories & Examples

Null / Signal Helpers

IObservable<string?> raw = GetPossiblyNullStream();
IObservable<string> cleaned = raw.WhereIsNotNull();
IObservable<Unit> signal = cleaned.AsSignal();

Timing, Scheduling & Flow Control

// Shared timer for a given period (one underlying timer per distinct TimeSpan)
var sharedTimer = ReactiveExtensions.SyncTimer(TimeSpan.FromSeconds(1));

// Delay emission of a single value
42.Schedule(TimeSpan.FromMilliseconds(250), Scheduler.Default)
  .Subscribe(v => Console.WriteLine($"Delayed: {v}"));

// Safe scheduling when a scheduler may be null
IScheduler? maybeScheduler = null;
maybeScheduler.ScheduleSafe(() => Console.WriteLine("Ran inline"));

// ThrottleFirst: allow first item per window, ignore rest
var throttled = Observable.Interval(TimeSpan.FromMilliseconds(50))
                          .ThrottleFirst(TimeSpan.FromMilliseconds(200));

// DebounceImmediate: emit first immediately then debounce rest
var debounced = Observable.Interval(TimeSpan.FromMilliseconds(40))
                          .DebounceImmediate(TimeSpan.FromMilliseconds(250));

Inactivity / Liveness

// Heartbeat emits IHeartbeat<T> where IsHeartbeat == true during quiet periods
var heartbeats = Observable.Interval(TimeSpan.FromMilliseconds(400))
                           .Take(5)
                           .Heartbeat(TimeSpan.FromMilliseconds(300), Scheduler.Default);

// DetectStale emits IStale<T>: one stale marker after inactivity, or fresh update wrappers
var staleAware = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(500))
                           .Take(3)
                           .DetectStale(TimeSpan.FromMilliseconds(300), Scheduler.Default);

// BufferUntilInactive groups events separated by inactivity
var bursts = Observable.Interval(TimeSpan.FromMilliseconds(60)).Take(20);
var groups = bursts.BufferUntilInactive(TimeSpan.FromMilliseconds(200));

Error Handling & Resilience

var flaky = Observable.Create<int>(o =>
{
    o.OnNext(1);
    o.OnError(new InvalidOperationException("Fail"));
    return () => { };
});

// Ignore all errors and complete silently
a flakySafe = flaky.CatchIgnore();

// Replace error with a fallback value
var withFallback = flaky.CatchAndReturn(-1);

// Retry only specific exception type with logging
var retried = flaky.OnErrorRetry<int, InvalidOperationException>(ex => Console.WriteLine(ex.Message), retryCount: 3);

// Retry with exponential backoff
var backoff = flaky.RetryWithBackoff(maxRetries: 5, initialDelay: TimeSpan.FromMilliseconds(100));

Combining, Partitioning & Logical Helpers

var a = Observable.Interval(TimeSpan.FromMilliseconds(150)).Select(i => i % 2 == 0);
var b = Observable.Interval(TimeSpan.FromMilliseconds(170)).Select(i => i % 3 == 0);

var allTrue = new[] { a, b }.CombineLatestValuesAreAllTrue();
var allFalse = new[] { a, b }.CombineLatestValuesAreAllFalse();

var numbers = Observable.Range(1, 10);
var (even, odd) = numbers.Partition(n => n % 2 == 0); // Partition stream

var toggles = a.Not(); // Negate booleans

Async / Task Integration

IObservable<int> inputs = Observable.Range(1, 5);

// Sequential (preserves order)
var seq = inputs.SelectAsyncSequential(async i => { await Task.Delay(50); return i * 2; });

// Latest only (cancels previous)
var latest = inputs.SelectLatestAsync(async i => { await Task.Delay(100); return i; });

// Limited parallelism
var concurrent = inputs.SelectAsyncConcurrent(async i => { await Task.Delay(100); return i; }, maxConcurrency: 2);

// Asynchronous subscription (serializing tasks)
inputs.SubscribeAsync(async i => await Task.Delay(10));

// Synchronous gate: ensures per-item async completion before next is emitted
a inputs.SubscribeSynchronous(async i => await Task.Delay(25));

Backpressure / Conflation

// Conflate: enforce minimum spacing between emissions while always outputting the most recent value
a var noisy = Observable.Interval(TimeSpan.FromMilliseconds(20)).Take(30);
var conflated = noisy.Conflate(TimeSpan.FromMilliseconds(200), Scheduler.Default);

Selective & Conditional Emission

// TakeUntil predicate (inclusive)
var untilFive = Observable.Range(1, 100).TakeUntil(x => x == 5);

// WaitUntil first match then complete
var firstEven = Observable.Range(1, 10).WaitUntil(x => x % 2 == 0);

Buffering & Transformation

// BufferUntil - collect chars between delimiters
var chars = "<a><bc><d>".ToCharArray().ToObservable();
var frames = chars.BufferUntil('<', '>'); // emits "<a>", "<bc>", "<d>"

// Shuffle arrays in-place
var arrays = Observable.Return(new[] { 1, 2, 3, 4, 5 });
var shuffled = arrays.Shuffle();

Subscription & Side Effects

var stream = Observable.Range(1, 3)
    .DoOnSubscribe(() => Console.WriteLine("Subscribed"))
    .DoOnDispose(() => Console.WriteLine("Disposed"));

using (stream.Subscribe(Console.WriteLine))
{
    // auto dispose at using end
}

Utility & Miscellaneous

// Emit list contents quickly with low allocations
var listSource = Observable.Return<IEnumerable<int>>(new List<int> { 1, 2, 3 });
listSource.ForEach().Subscribe(Console.WriteLine);

// Using helper for deterministic disposal
var value = new MemoryStream().Using(ms => ms.Length);

// While loop (reactive)
var counter = 0;
ReactiveExtensions.While(() => counter++ < 3, () => Console.WriteLine(counter))
                   .Subscribe();

// Batch push with OnNext params
var subj = new Subject<int>();
subj.OnNext(1, 2, 3, 4);

Performance Notes

  • FastForEach path avoids iterator allocations for List<T>, IList<T>, and arrays.
  • SyncTimer ensures only one shared timer per period reducing timer overhead.
  • Conflate helps tame high�frequency producers without dropping the final value of a burst.
  • Heartbeat and DetectStale use lightweight scheduling primitives.
  • Most operators avoid capturing lambdas in hot loops where practical.

Thread Safety

  • All operators are pure functional transformations unless documented otherwise.
  • SyncTimer uses a ConcurrentDictionary and returns a hot IConnectableObservable that connects once per unique TimeSpan.
  • Methods returning shared observables (SyncTimer, Partition result sequences) are safe for multi-subscriber usage unless the upstream is inherently side-effecting.

License

MIT � see LICENSE file.


Contributing

Issues / PRs welcome. Please keep additions dependency�free and focused on broadly useful reactive patterns.


Change Log (Excerpt)

(Keep this section updated as the library evolves.)

  • Added async task projection helpers (SelectAsyncSequential, SelectLatestAsync, SelectAsyncConcurrent).
  • Added liveness operators (Heartbeat, DetectStale, BufferUntilInactive).
  • Added resilience (RetryWithBackoff, expanded OnErrorRetry overloads).
  • Added flow control (Conflate, ThrottleFirst, DebounceImmediate).

Happy reactive coding!

Product Compatible and additional computed target framework versions.
.NET net5.0 was computed.  net5.0-windows was computed.  net6.0 was computed.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
.NET Core netcoreapp2.0 was computed.  netcoreapp2.1 was computed.  netcoreapp2.2 was computed.  netcoreapp3.0 was computed.  netcoreapp3.1 was computed. 
.NET Standard netstandard2.0 is compatible.  netstandard2.1 was computed. 
.NET Framework net461 was computed.  net462 was computed.  net463 was computed.  net47 was computed.  net471 was computed.  net472 was computed.  net48 was computed.  net481 was computed. 
MonoAndroid monoandroid was computed. 
MonoMac monomac was computed. 
MonoTouch monotouch was computed. 
Tizen tizen40 was computed.  tizen60 was computed. 
Xamarin.iOS xamarinios was computed. 
Xamarin.Mac xamarinmac was computed. 
Xamarin.TVOS xamarintvos was computed. 
Xamarin.WatchOS xamarinwatchos was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on ReactiveUI.Extensions:

Package Downloads
SerialPortRx

A Reactive Serial Port for use in cross platform applications

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2.0.1 290 9/16/2025