Nethereum.RPC.Reactive 5.8.0

Prefix Reserved
dotnet add package Nethereum.RPC.Reactive --version 5.8.0
                    
NuGet\Install-Package Nethereum.RPC.Reactive -Version 5.8.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Nethereum.RPC.Reactive" Version="5.8.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Nethereum.RPC.Reactive" Version="5.8.0" />
                    
Directory.Packages.props
<PackageReference Include="Nethereum.RPC.Reactive" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Nethereum.RPC.Reactive --version 5.8.0
                    
#r "nuget: Nethereum.RPC.Reactive, 5.8.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Nethereum.RPC.Reactive@5.8.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Nethereum.RPC.Reactive&version=5.8.0
                    
Install as a Cake Addin
#tool nuget:?package=Nethereum.RPC.Reactive&version=5.8.0
                    
Install as a Cake Tool

Nethereum.RPC.Reactive

Nethereum.RPC.Reactive provides Reactive Extensions (Rx.NET) support for Ethereum RPC operations. It enables reactive programming patterns for monitoring blockchain events, streaming blocks and transactions, and building real-time Ethereum applications using observables.

Note: This package builds on Nethereum.JsonRpc.WebSocketStreamingClient, which provides the core IObservable infrastructure for WebSocket subscriptions. For detailed documentation on Observable handlers and subscription internals, see the Nethereum.JsonRpc.WebSocketStreamingClient package.

Features

  • WebSocket Subscriptions - eth_subscribe support for real-time events
    • New block headers (eth_subscribe + newHeads)
    • Pending transactions (eth_subscribe + newPendingTransactions)
    • Event logs (eth_subscribe + logs)
  • Polling-Based Streams - Reactive streams using HTTP/RPC polling
    • Block streams with configurable polling intervals
    • Transaction streams
    • Pending transaction streams
  • Observable Operators - Rx.NET operators for blockchain data
  • Flexible Polling - Custom polling strategies
  • Event Processing - LINQ-style queries over blockchain events
  • Backpressure Handling - Built-in support for managing event flow

Installation

dotnet add package Nethereum.RPC.Reactive

Dependencies

  • Nethereum.RPC - Core RPC functionality
  • System.Reactive (4.1.3+) - Reactive Extensions

Quick Start

WebSocket Subscriptions (Real-time)

using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using Nethereum.RPC.Eth.DTOs;

var client = new StreamingWebSocketClient("ws://127.0.0.1:8546");

// Subscribe to new block headers
var subscription = new EthNewBlockHeadersObservableSubscription(client);

subscription.GetSubscriptionDataResponsesAsObservable().Subscribe(block =>
{
    Console.WriteLine($"New block: {block.Number.Value}");
    Console.WriteLine($"Block hash: {block.BlockHash}");
    Console.WriteLine($"Timestamp: {block.Timestamp.Value}");
});

// Handle subscription confirmation
subscription.GetSubscribeResponseAsObservable().Subscribe(subscriptionId =>
{
    Console.WriteLine($"Subscription ID: {subscriptionId}");
});

await client.StartAsync();
await subscription.SubscribeAsync();

// Keep running...
await Task.Delay(Timeout.Infinite);

Polling-Based Streams (HTTP/RPC)

using Nethereum.Web3;
using Nethereum.RPC.Reactive.Polling;
using System.Reactive.Linq;

var web3 = new Web3("https://mainnet.infura.io/v3/YOUR-PROJECT-ID");

// Stream new blocks as they arrive
web3.Eth.GetBlocksWithTransactionHashes()
    .Subscribe(block =>
    {
        Console.WriteLine($"New block: {block.Number}");
        Console.WriteLine($"Transaction count: {block.TransactionHashes.Length}");
    });

// Keep running...
await Task.Delay(Timeout.Infinite);

Subscription Types

EthNewBlockHeadersObservableSubscription

Subscribe to new block headers in real-time via WebSocket:

using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;

var client = new StreamingWebSocketClient("ws://127.0.0.1:8546");
var subscription = new EthNewBlockHeadersObservableSubscription(client);

subscription.GetSubscriptionDataResponsesAsObservable()
    .Subscribe(
        block => Console.WriteLine($"Block {block.Number.Value}: {block.BlockHash}"),
        error => Console.WriteLine($"Error: {error.Message}"),
        () => Console.WriteLine("Subscription completed")
    );

await client.StartAsync();
await subscription.SubscribeAsync();

EthNewPendingTransactionObservableSubscription

Monitor pending transactions in the mempool:

using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;

var client = new StreamingWebSocketClient("ws://127.0.0.1:8546");
var subscription = new EthNewPendingTransactionObservableSubscription(client);

subscription.GetSubscriptionDataResponsesAsObservable()
    .Subscribe(txHash =>
    {
        Console.WriteLine($"Pending transaction: {txHash}");
    });

await client.StartAsync();
await subscription.SubscribeAsync();

EthLogsObservableSubscription

Subscribe to event logs with filters:

using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using Nethereum.RPC.Eth.DTOs;

var client = new StreamingWebSocketClient("ws://127.0.0.1:8546");
var subscription = new EthLogsObservableSubscription(client);

// Filter for specific contract events
var filterInput = new NewFilterInput
{
    Address = new[] { "0x6B175474E89094C44Da98b954EedeAC495271d0F" }, // DAI contract
    Topics = new[] { "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef" } // Transfer event
};

subscription.GetSubscriptionDataResponsesAsObservable()
    .Subscribe(log =>
    {
        Console.WriteLine($"Log from block: {log.BlockNumber.Value}");
        Console.WriteLine($"Transaction: {log.TransactionHash}");
    });

await client.StartAsync();
await subscription.SubscribeAsync(filterInput);

Polling Extensions

Block Streaming

using Nethereum.Web3;
using Nethereum.RPC.Reactive.Polling;
using System.Reactive.Linq;

var web3 = new Web3("https://mainnet.infura.io/v3/YOUR-PROJECT-ID");

// Stream blocks with transaction hashes
web3.Eth.GetBlocksWithTransactionHashes()
    .Subscribe(block =>
    {
        Console.WriteLine($"Block: {block.Number}");
        Console.WriteLine($"Transactions: {block.TransactionHashes.Length}");
    });

// Stream blocks with full transaction details
web3.Eth.GetBlocksWithTransactions()
    .Subscribe(block =>
    {
        Console.WriteLine($"Block: {block.Number}");
        foreach (var tx in block.Transactions)
        {
            Console.WriteLine($"  From: {tx.From} To: {tx.To} Value: {tx.Value}");
        }
    });

// Stream specific block range
web3.Eth.GetBlocksWithTransactionHashes(
    start: new BlockParameter(18000000),
    end: new BlockParameter(18000100)
).Subscribe(block =>
{
    Console.WriteLine($"Historical block: {block.Number}");
});

From: src/Nethereum.RPC.Reactive/Polling/BlockStreamExtensions.cs:10

Custom Polling Interval

using Nethereum.Web3;
using Nethereum.RPC.Reactive.Polling;
using System.Reactive.Linq;

var web3 = new Web3("https://mainnet.infura.io/v3/YOUR-PROJECT-ID");

// Custom poller - check every 5 seconds
var customPoller = Observable
    .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(5))
    .Select(_ => Unit.Default);

web3.Eth.GetBlocksWithTransactionHashes(poller: customPoller)
    .Subscribe(block =>
    {
        Console.WriteLine($"Block (5s interval): {block.Number}");
    });

From: src/Nethereum.RPC.Reactive/Polling/PollingExtensions.cs:15

Examples

Example 1: Real-time Block Monitor with Error Handling

using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using System.Reactive.Linq;

public class BlockMonitor
{
    private StreamingWebSocketClient client;
    private EthNewBlockHeadersObservableSubscription subscription;

    public async Task StartAsync(string wsUrl)
    {
        client = new StreamingWebSocketClient(wsUrl);
        client.Error += Client_Error;

        subscription = new EthNewBlockHeadersObservableSubscription(client);

        // Subscribe to blocks
        subscription.GetSubscriptionDataResponsesAsObservable()
            .Subscribe(
                onNext: block =>
                {
                    Console.WriteLine($"Block {block.Number.Value}");
                    Console.WriteLine($"   Hash: {block.BlockHash}");
                    Console.WriteLine($"   Time: {DateTimeOffset.FromUnixTimeSeconds((long)block.Timestamp.Value)}");
                    Console.WriteLine($"   Transactions: {block.TransactionHashes?.Length ?? 0}");
                },
                onError: error =>
                {
                    Console.WriteLine($"Subscription error: {error.Message}");
                },
                onCompleted: () =>
                {
                    Console.WriteLine("Subscription completed");
                }
            );

        // Log subscription confirmation
        subscription.GetSubscribeResponseAsObservable()
            .Subscribe(subscriptionId =>
            {
                Console.WriteLine($"Subscribed with ID: {subscriptionId}");
            });

        await client.StartAsync();
        await subscription.SubscribeAsync();
    }

    private async void Client_Error(object sender, Exception ex)
    {
        Console.WriteLine($"WebSocket error: {ex.Message}");
        Console.WriteLine("Attempting to reconnect...");

        // Stop and restart
        await ((StreamingWebSocketClient)sender).StopAsync();
        await StartAsync(((StreamingWebSocketClient)sender).Path);
    }

    public async Task StopAsync()
    {
        if (client != null)
        {
            await client.StopAsync();
        }
    }
}

// Usage
var monitor = new BlockMonitor();
await monitor.StartAsync("ws://127.0.0.1:8546");

From: consoletests/Nethereum.Parity.Reactive.ConsoleTest/Program.cs:32

Example 2: Custom Account Balance Monitoring (Parity PubSub)

using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.RpcStreaming;
using Nethereum.RPC.Eth;
using Nethereum.RPC.Eth.DTOs;
using Nethereum.Hex.HexTypes;

// Custom PubSub subscription for account balance changes
var client = new StreamingWebSocketClient("ws://127.0.0.1:8546");
client.Error += (sender, ex) => Console.WriteLine($"Error: {ex.Message}");

var accountAddress = "0x12890d2cce102216644c59daE5baed380d84830c";

// Create balance subscription (Parity-specific)
var balanceSubscription = new ParityPubSubObservableSubscription<HexBigInteger>(client);

// Build balance query request
var ethBalanceRequest = new EthGetBalance().BuildRequest(
    accountAddress,
    BlockParameter.CreateLatest()
);

// Subscribe to balance updates
balanceSubscription.GetSubscriptionDataResponsesAsObservable()
    .Subscribe(
        newBalance => Console.WriteLine($"New Balance: {Web3.Convert.FromWei(newBalance.Value)} ETH"),
        onError => Console.WriteLine($"Error: {onError.Message}")
    );

// Log subscription confirmation
balanceSubscription.GetSubscribeResponseAsObservable()
    .Subscribe(x => Console.WriteLine($"Balance subscription ID: {x}"));

await client.StartAsync();
await balanceSubscription.SubscribeAsync(ethBalanceRequest);

// Keep monitoring
await Task.Delay(Timeout.Infinite);

From: consoletests/Nethereum.Parity.Reactive.ConsoleTest/Program.cs:35

Example 3: Filtering Blocks with LINQ Operators

using Nethereum.Web3;
using Nethereum.RPC.Reactive.Polling;
using System.Reactive.Linq;

var web3 = new Web3("https://mainnet.infura.io/v3/YOUR-PROJECT-ID");

// Only blocks with more than 100 transactions
web3.Eth.GetBlocksWithTransactionHashes()
    .Where(block => block.TransactionHashes.Length > 100)
    .Subscribe(block =>
    {
        Console.WriteLine($"Busy block: {block.Number} ({block.TransactionHashes.Length} txs)");
    });

// Calculate average transactions per block (last 10 blocks)
web3.Eth.GetBlocksWithTransactionHashes()
    .Buffer(10) // Group last 10 blocks
    .Select(blocks => blocks.Average(b => b.TransactionHashes.Length))
    .Subscribe(avgTxs =>
    {
        Console.WriteLine($"Average transactions per block (last 10): {avgTxs:F2}");
    });

// Alert on large base fee
web3.Eth.GetBlocksWithTransactionHashes()
    .Where(block => block.BaseFeePerGas?.Value > 100000000000) // > 100 gwei
    .Subscribe(block =>
    {
        Console.WriteLine($"High base fee alert! Block {block.Number}: {block.BaseFeePerGas.Value} wei");
    });

Example 4: Transaction Stream with Filtering

using Nethereum.Web3;
using Nethereum.RPC.Reactive.Polling;
using System.Reactive.Linq;

var web3 = new Web3("https://mainnet.infura.io/v3/YOUR-PROJECT-ID");

// Monitor large ETH transfers
web3.Eth.GetBlocksWithTransactions()
    .SelectMany(block => block.Transactions) // Flatten to transaction stream
    .Where(tx => tx.Value?.Value > Web3.Convert.ToWei(100)) // > 100 ETH
    .Subscribe(tx =>
    {
        var ethValue = Web3.Convert.FromWei(tx.Value.Value);
        Console.WriteLine($"Whale transfer: {ethValue} ETH");
        Console.WriteLine($"   From: {tx.From}");
        Console.WriteLine($"   To: {tx.To}");
        Console.WriteLine($"   Tx: {tx.TransactionHash}");
    });

Example 5: Pending Transaction Monitoring

using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using Nethereum.Web3;
using System.Reactive.Linq;

var client = new StreamingWebSocketClient("ws://127.0.0.1:8546");
var web3 = new Web3(client);

var subscription = new EthNewPendingTransactionObservableSubscription(client);

subscription.GetSubscriptionDataResponsesAsObservable()
    .Buffer(TimeSpan.FromSeconds(5)) // Group pending txs every 5 seconds
    .Subscribe(async txHashes =>
    {
        Console.WriteLine($"{txHashes.Count} pending transactions in last 5 seconds");

        // Get details for first pending transaction
        if (txHashes.Count > 0)
        {
            var tx = await web3.Eth.Transactions.GetTransactionByHash.SendRequestAsync(txHashes[0]);
            if (tx != null)
            {
                Console.WriteLine($"   Sample tx: {tx.TransactionHash}");
                Console.WriteLine($"   Gas price: {tx.GasPrice?.Value ?? 0} wei");
            }
        }
    });

await client.StartAsync();
await subscription.SubscribeAsync();

Example 6: Event Log Streaming with Decoding

using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using Nethereum.RPC.Eth.DTOs;
using Nethereum.Contracts;
using Nethereum.ABI.FunctionEncoding.Attributes;

// Transfer event DTO
[Event("Transfer")]
public class TransferEventDTO : IEventDTO
{
    [Parameter("address", "from", 1, true)]
    public string From { get; set; }

    [Parameter("address", "to", 2, true)]
    public string To { get; set; }

    [Parameter("uint256", "value", 3, false)]
    public BigInteger Value { get; set; }
}

var client = new StreamingWebSocketClient("ws://127.0.0.1:8546");
var subscription = new EthLogsObservableSubscription(client);

// Filter for USDC transfers
var filterInput = new NewFilterInput
{
    Address = new[] { "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48" }, // USDC
    Topics = new[] { Event<TransferEventDTO>.GetEventABI().Sha3Signature } // Transfer signature
};

subscription.GetSubscriptionDataResponsesAsObservable()
    .Subscribe(log =>
    {
        try
        {
            var decoded = Event<TransferEventDTO>.DecodeEvent(log);
            var amount = Web3.Convert.FromWei(decoded.Event.Value, 6); // USDC has 6 decimals

            Console.WriteLine($"USDC Transfer: {amount} USDC");
            Console.WriteLine($"   From: {decoded.Event.From}");
            Console.WriteLine($"   To: {decoded.Event.To}");
            Console.WriteLine($"   Block: {log.BlockNumber.Value}");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error decoding: {ex.Message}");
        }
    });

await client.StartAsync();
await subscription.SubscribeAsync(filterInput);

Example 7: Combining Multiple Observables

using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using System.Reactive.Linq;

var client = new StreamingWebSocketClient("ws://127.0.0.1:8546");

var blockSubscription = new EthNewBlockHeadersObservableSubscription(client);
var pendingTxSubscription = new EthNewPendingTransactionObservableSubscription(client);

// Combine streams
var blocks = blockSubscription.GetSubscriptionDataResponsesAsObservable()
    .Select(block => $"Block: {block.Number.Value}");

var pendingTxs = pendingTxSubscription.GetSubscriptionDataResponsesAsObservable()
    .Buffer(TimeSpan.FromSeconds(10))
    .Select(txs => $"Pending: {txs.Count} txs");

// Merge and display
Observable.Merge(blocks, pendingTxs)
    .Subscribe(message => Console.WriteLine(message));

await client.StartAsync();
await blockSubscription.SubscribeAsync();
await pendingTxSubscription.SubscribeAsync();

Example 8: Backpressure with Throttling

using Nethereum.Web3;
using Nethereum.RPC.Reactive.Polling;
using System.Reactive.Linq;

var web3 = new Web3("https://mainnet.infura.io/v3/YOUR-PROJECT-ID");

// Throttle block stream to process at most 1 block per second
web3.Eth.GetBlocksWithTransactionHashes()
    .Throttle(TimeSpan.FromSeconds(1))
    .Subscribe(block =>
    {
        Console.WriteLine($"Processing block: {block.Number}");
        // Expensive processing here
    });

// Sample blocks - only process every 10th block
web3.Eth.GetBlocksWithTransactionHashes()
    .Buffer(10)
    .Select(blocks => blocks.Last()) // Take last block from each group
    .Subscribe(block =>
    {
        Console.WriteLine($"Sampled block: {block.Number}");
    });

Example 9: Historical Block Range Processing

using Nethereum.Web3;
using Nethereum.RPC.Reactive.Polling;
using Nethereum.RPC.Eth.DTOs;
using System.Reactive.Linq;

var web3 = new Web3("https://mainnet.infura.io/v3/YOUR-PROJECT-ID");

// Process blocks from 18,000,000 to 18,001,000
var startBlock = new BlockParameter(18000000);
var endBlock = new BlockParameter(18001000);

web3.Eth.GetBlocksWithTransactionHashes(startBlock, endBlock)
    .Do(block => Console.WriteLine($"Processing: {block.Number}"))
    .SelectMany(block => block.TransactionHashes) // Flatten to transaction hashes
    .Count()
    .Subscribe(totalTxs =>
    {
        Console.WriteLine($"Total transactions in range: {totalTxs}");
    });

Example 10: Error Recovery and Retry

using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using System.Reactive.Linq;

var client = new StreamingWebSocketClient("ws://127.0.0.1:8546");
var subscription = new EthNewBlockHeadersObservableSubscription(client);

subscription.GetSubscriptionDataResponsesAsObservable()
    .Retry(3) // Retry up to 3 times on error
    .Catch<Block, Exception>(ex =>
    {
        Console.WriteLine($"Error after retries: {ex.Message}");
        return Observable.Empty<Block>(); // Complete gracefully
    })
    .Subscribe(block =>
    {
        Console.WriteLine($"Block: {block.Number.Value}");
    });

await client.StartAsync();
await subscription.SubscribeAsync();

Advanced Patterns

Custom Observable Handlers

using Nethereum.JsonRpc.Client;
using Nethereum.RPC.Reactive.RpcStreaming;
using System.Reactive.Linq;

// Create custom observable for any RPC method
var client = new RpcClient(new Uri("https://mainnet.infura.io/v3/YOUR-PROJECT-ID"));

var blockNumberObservable = Observable
    .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
    .SelectMany(async _ =>
    {
        var ethBlockNumber = new Nethereum.RPC.Eth.Blocks.EthBlockNumber(client);
        return await ethBlockNumber.SendRequestAsync();
    });

blockNumberObservable.Subscribe(blockNumber =>
{
    Console.WriteLine($"Current block: {blockNumber.Value}");
});

Stream Composition

using Nethereum.Web3;
using Nethereum.RPC.Reactive.Polling;
using System.Reactive.Linq;

var web3 = new Web3("https://mainnet.infura.io/v3/YOUR-PROJECT-ID");

// Create complex stream pipelines
var largeBlockTransfers = web3.Eth.GetBlocksWithTransactions()
    .SelectMany(block => block.Transactions)
    .Where(tx => tx.Value?.Value > Web3.Convert.ToWei(10))
    .GroupBy(tx => tx.From)
    .SelectMany(group => group
        .Buffer(TimeSpan.FromMinutes(1))
        .Where(txs => txs.Count >= 3) // 3+ large transfers in 1 minute from same address
        .Select(txs => new { From = group.Key, Transactions = txs })
    );

largeBlockTransfers.Subscribe(result =>
{
    Console.WriteLine($"Suspicious activity from {result.From}");
    Console.WriteLine($"   {result.Transactions.Count} large transfers in 1 minute");
});

Best Practices

  1. Use WebSocket Subscriptions for Real-time Data: More efficient than polling

    // Good - WebSocket subscription (real-time)
    var subscription = new EthNewBlockHeadersObservableSubscription(client);
    
    // Less efficient - HTTP polling
    web3.Eth.GetBlocksWithTransactionHashes()
    
  2. Handle Errors Gracefully: Always provide error handlers

    observable.Subscribe(
        onNext: data => ProcessData(data),
        onError: ex => LogError(ex),
        onCompleted: () => Console.WriteLine("Completed")
    );
    
  3. Dispose Subscriptions: Clean up resources when done

    var disposable = observable.Subscribe(...);
    // Later...
    disposable.Dispose();
    
  4. Use Appropriate Polling Intervals: Balance freshness vs. resource usage

    // For fast-changing data
    var fastPoller = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(100));
    
    // For slow-changing data
    var slowPoller = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(30));
    
  5. Apply Backpressure Operators: Prevent overwhelming downstream consumers

    observable
        .Throttle(TimeSpan.FromSeconds(1))  // Max 1 per second
        .Buffer(10)                          // Process in batches of 10
        .Sample(TimeSpan.FromSeconds(5))    // Sample every 5 seconds
    
  6. Reconnect on WebSocket Errors: Implement automatic reconnection

    client.Error += async (sender, ex) =>
    {
        await ((StreamingWebSocketClient)sender).StopAsync();
        await ReconnectAsync();
    };
    

Rx.NET Operators for Blockchain Data

Common operators for blockchain data processing:

Operator Use Case Example
Where Filter blocks/transactions Where(block => block.Number > 1000000)
Select Transform data Select(block => block.Number.Value)
SelectMany Flatten nested data SelectMany(block => block.Transactions)
Buffer Group items by count/time Buffer(TimeSpan.FromSeconds(10))
Throttle Rate limiting Throttle(TimeSpan.FromSeconds(1))
Sample Periodic sampling Sample(TimeSpan.FromSeconds(5))
Take Limit results Take(100) // First 100 blocks
Skip Skip initial items Skip(10) // Skip first 10
Distinct Remove duplicates Distinct(tx => tx.TransactionHash)
GroupBy Group by key GroupBy(tx => tx.From)
Merge Combine streams Observable.Merge(blocks, txs)
Zip Combine pairwise blocks.Zip(receipts)
Retry Retry on error Retry(3)
Catch Handle errors Catch<T, Exception>(...)

Polling vs. WebSocket Subscriptions

When to Use Polling

  • HTTP/HTTPS RPC endpoints only
  • Infrequent data access
  • Historical data processing
  • Simple deployment (no WebSocket support needed)

When to Use WebSocket Subscriptions

  • Real-time data (blocks, transactions, events)
  • High-frequency updates
  • Lower latency requirements
  • Reduced server load (push vs. pull)

Troubleshooting

Subscription Not Receiving Data

Observable never fires

Solution: Ensure WebSocket client is started:

await client.StartAsync();
await subscription.SubscribeAsync();

Memory Leaks

Memory usage grows over time

Solution: Always dispose subscriptions:

var disposable = observable.Subscribe(...);
// Later
disposable.Dispose();

Missed Blocks

Blocks are skipped in the stream

Solution: Adjust polling interval or use WebSocket subscriptions for guaranteed delivery.

  • Nethereum.JsonRpc.WebSocketStreamingClient - Core IObservable infrastructure for WebSocket subscriptions (foundation for this package)
  • Nethereum.RPC - Core RPC functionality
  • Nethereum.JsonRpc.WebSocketClient - WebSocket client for subscriptions
  • Nethereum.Web3 - High-level Web3 API
  • System.Reactive - Reactive Extensions library

Additional Resources

License

MIT License - see LICENSE file for details

Product Compatible and additional computed target framework versions.
.NET net5.0 was computed.  net5.0-windows was computed.  net6.0 is compatible.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
.NET Core netcoreapp2.0 was computed.  netcoreapp2.1 was computed.  netcoreapp2.2 was computed.  netcoreapp3.0 was computed.  netcoreapp3.1 was computed. 
.NET Standard netstandard2.0 is compatible.  netstandard2.1 was computed. 
.NET Framework net461 is compatible.  net462 was computed.  net463 was computed.  net47 was computed.  net471 was computed.  net472 was computed.  net48 was computed.  net481 was computed. 
MonoAndroid monoandroid was computed. 
MonoMac monomac was computed. 
MonoTouch monotouch was computed. 
Tizen tizen40 was computed.  tizen60 was computed. 
Xamarin.iOS xamarinios was computed. 
Xamarin.Mac xamarinmac was computed. 
Xamarin.TVOS xamarintvos was computed. 
Xamarin.WatchOS xamarinwatchos was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (3)

Showing the top 3 NuGet packages that depend on Nethereum.RPC.Reactive:

Package Downloads
Nethereum.Parity.Reactive

Nethereum.Parity.Reactive, Reactive Client Subscriptions pub / sub for Parity

PlangLibrary

Plang language core

Ktrix.Evm

A library to interact with EVM networks

GitHub repositories (2)

Showing the top 2 popular GitHub repositories that depend on Nethereum.RPC.Reactive:

Repository Stars
ChainSafe/web3.unity
🕹 Unity SDK for building games that interact with blockchains.
JayArrowz/PancakeTokenSniper
BSC BNB Pancake token sniper, buy, take profit and rug check
Version Downloads Last Updated
5.8.0 212 1/6/2026
5.0.0 17,604 5/28/2025
4.29.0 12,011 2/10/2025
4.28.0 11,130 1/7/2025
4.27.1 564 12/24/2024
4.27.0 275 12/24/2024
4.26.0 7,998 10/1/2024
4.25.0 3,154 9/19/2024
4.21.4 17,191 8/9/2024
4.21.3 1,523 7/22/2024
4.21.2 2,590 6/26/2024
4.21.1 783 6/26/2024
4.21.0 3,348 6/18/2024
4.20.0 8,234 3/28/2024
4.19.0 7,660 2/16/2024
4.18.0 41,992 11/21/2023
4.17.1 19,843 9/28/2023
4.17.0 258 9/27/2023
4.16.0 3,702 8/14/2023
4.15.2 13,603 7/11/2023
4.15.1 312 7/11/2023
4.15.0 322 7/11/2023
4.14.0 16,569 3/19/2023
4.13.0 47,171 2/18/2023
4.12.0 6,701 12/9/2022
4.11.0 16,620 10/27/2022
4.9.0 17,495 9/27/2022
4.8.0 4,027 8/24/2022
4.7.0 9,012 7/20/2022
4.6.1 2,265 6/18/2022
4.6.0 931 6/16/2022
4.5.0 4,993 5/13/2022
4.4.1 4,038 4/27/2022
4.4.0 977 4/27/2022
4.3.0 6,019 4/12/2022
4.2.0 8,182 2/18/2022
4.1.1 18,307 11/4/2021
4.1.0 2,678 10/15/2021
4.0.5 4,781 8/12/2021
4.0.4 1,769 8/10/2021
4.0.3 778 8/8/2021
4.0.2 837 8/5/2021
4.0.1 1,062 7/28/2021
4.0.0 1,327 7/26/2021
3.8.0 14,547 7/3/2020
3.7.1 4,126 2/13/2020
3.7.0 892 2/13/2020
3.6.0 982 1/27/2020
3.5.0 1,113 12/31/2019
3.4.0 9,443 7/29/2019
3.3.0 1,228 4/23/2019
3.2.0 1,082 4/8/2019
3.1.2 5,553 3/13/2019
3.1.1 1,020 3/12/2019
3.1.0 1,097 3/12/2019