SakontStack.ReactiveStream
1.2.1
dotnet add package SakontStack.ReactiveStream --version 1.2.1
NuGet\Install-Package SakontStack.ReactiveStream -Version 1.2.1
<PackageReference Include="SakontStack.ReactiveStream" Version="1.2.1" />
paket add SakontStack.ReactiveStream --version 1.2.1
#r "nuget: SakontStack.ReactiveStream, 1.2.1"
// Install SakontStack.ReactiveStream as a Cake Addin #addin nuget:?package=SakontStack.ReactiveStream&version=1.2.1 // Install SakontStack.ReactiveStream as a Cake Tool #tool nuget:?package=SakontStack.ReactiveStream&version=1.2.1
Reactive Stream
a Stream
wrapper that provides Read/Write progress reporting through IProgress<StreamProgress>
and IObservable<StreamProgress>
with speed throttling for Read/Write streams separately.
Version | Downloads |
---|---|
Example Usage:
Report file download progress:
- progress using
IProgress<StreamProgress>
using System.Reactive.Linq;
using SakontStack.ReactiveStream;
using ByteSizeLib;
var fileLink = new Uri(@"https://releases.ubuntu.com/jammy/ubuntu-22.04.3-desktop-amd64.iso");
var client = new HttpClient();
var response = await client.GetAsync(fileLink, HttpCompletionOption.ResponseHeadersRead);
var length = long.Parse(response.Content.Headers
.First(h => h.Key.Equals("Content-Length"))
.Value
.First());
var progress = new Progress<ReactiveStream.StreamProgress>(p => Console
.WriteLine($"Downloaded {ByteSize.FromBytes(p.Bytes).ToBinaryString()}" +
$"/{ByteSize.FromBytes(p.TotalBytes ?? 0).ToBinaryString()} " +
$"({p.Percentage:N2}%) " +
$"{ByteSize.FromBytes(p.BytesPerSecond).ToBinaryString()}/sec"));
var stream = new ReactiveStream(new MemoryStream(), progress: progress, totalLength: length);
// stream progress will only start reporting progress after you subscribe to it
stream.Subscribe();
await response.Content.CopyToAsync(stream);
NOTE: setting custom report interval while using IProgress<StreamProgress>
alone, requires using stream.Sample()
operator.
- progress using
IObservable<StreamProgress>
using System.Reactive.Linq;
using SakontStack.ReactiveStream;
using ByteSizeLib;
var fileLink = new Uri(@"https://releases.ubuntu.com/jammy/ubuntu-22.04.3-desktop-amd64.iso");
var client = new HttpClient();
var response = await client.GetAsync(fileLink, HttpCompletionOption.ResponseHeadersRead);
var length = long.Parse(response.Content.Headers
.First(h => h.Key.Equals("Content-Length"))
.Value
.First());
var stream = new ReactiveStream(new MemoryStream(), totalLength: length);
stream
.Sample(TimeSpan.FromSeconds(0.25)) // Only report progress each 0.25 seconds
.Do(p => Console
.WriteLine($"Downloaded {ByteSize.FromBytes(p.Bytes).ToBinaryString()}"+
$"/{ByteSize.FromBytes(p.TotalBytes ?? 0).ToBinaryString()}"+
$"({p.Percentage:N2}%) "+
$"{ByteSize.FromBytes(p.BytesPerSecond).ToBinaryString()}/sec"))
.Subscribe();
await response.Content.CopyToAsync(stream);
- setting download speed limits:
using System.Reactive.Linq;
using SakontStack.ReactiveStream;
using ByteSizeLib;
var fileLink = new Uri(@"https://releases.ubuntu.com/jammy/ubuntu-22.04.3-desktop-amd64.iso");
var client = new HttpClient();
var response = await client.GetAsync(fileLink, HttpCompletionOption.ResponseHeadersRead);
var length = long.Parse(response.Content.Headers
.First(h => h.Key.Equals("Content-Length"))
.Value
.First());
var stream = new ReactiveStream(new MemoryStream(), totalLength: length,
configureStream: s =>
{
// 1 MB/sec speed limit for write streams;
s.WriteSpeedLimit = 1024 * 1024;
});
stream
.Sample(TimeSpan.FromSeconds(0.25)) // Only report progress each 0.25 seconds
.Do(p => Console
.WriteLine($"Downloaded {ByteSize.FromBytes(p.Bytes).ToBinaryString()}"+
$"/{ByteSize.FromBytes(p.TotalBytes ?? 0).ToBinaryString()}"+
$"({p.Percentage:N2}%) "+
$"{ByteSize.FromBytes(p.BytesPerSecond).ToBinaryString()}/sec"))
.Subscribe();
await response.Content.CopyToAsync(stream);
- changing speed limits in realtime:
stream.ModifyOptions(x => x.WriteSpeedLimit = null);
ByteSizeLib
library is not a dependency, and only used in this example to provide sample codes that will print progress with better formatted byte sizes
example output:
...
Downloaded 4.7 MiB/4.69 GiB (0.10%) 680 KiB/sec
Downloaded 5.03 MiB/4.69 GiB (0.10%) 1016 KiB/sec
Downloaded 5.04 MiB/4.69 GiB (0.10%) 1 MiB/sec
Downloaded 5.36 MiB/4.69 GiB (0.11%) 328 KiB/sec
Downloaded 5.7 MiB/4.69 GiB (0.12%) 680 KiB/sec
API:
the ReactiveStream
class implements both Stream
& IObservable<ReactiveStream.StreamProgress>
public class ReactiveStream : Stream, IObservable<StreamProgress>
{
//* ... *//
}
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net6.0 is compatible. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
-
net6.0
- System.Reactive (>= 5.0.0)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on SakontStack.ReactiveStream:
Package | Downloads |
---|---|
QwikHosting.Deno
Host Qwik app using Deno and ASP.NET |
GitHub repositories
This package is not used by any popular GitHub repositories.