LanguageExt.Streaming 5.0.0-beta-54

This is a prerelease version of LanguageExt.Streaming.
dotnet add package LanguageExt.Streaming --version 5.0.0-beta-54
                    
NuGet\Install-Package LanguageExt.Streaming -Version 5.0.0-beta-54
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="LanguageExt.Streaming" Version="5.0.0-beta-54" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="LanguageExt.Streaming" Version="5.0.0-beta-54" />
                    
Directory.Packages.props
<PackageReference Include="LanguageExt.Streaming" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add LanguageExt.Streaming --version 5.0.0-beta-54
                    
#r "nuget: LanguageExt.Streaming, 5.0.0-beta-54"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#addin nuget:?package=LanguageExt.Streaming&version=5.0.0-beta-54&prerelease
                    
Install LanguageExt.Streaming as a Cake Addin
#tool nuget:?package=LanguageExt.Streaming&version=5.0.0-beta-54&prerelease
                    
Install LanguageExt.Streaming as a Cake Tool

The Streaming library of language-ext is all about compositional streams. There are two key types of streaming functionality: closed-streams and open-streams...

Closed streams

Closed streams are facilitated by the Pipes system. The types in the Pipes system are compositional monad-transformers that 'fuse' together to produce an EffectT<M, A>. This effect is a closed system, meaning that there is no way (from the API) to directly interact with the effect from the outside: it can be executed and will return a result if it terminates.

The pipeline components are:

These are the components that fuse together (using the | operator) to make an EffectT<M, A>. The types are monad-transformers that support lifting monads with the MonadIO trait only (which constrains M). This makes sense, otherwise the closed-system would have no effect other than heating up the CPU.

There are also more specialised versions of the above that only support the lifting of the Eff<RT, A> effect-monad:

They all fuse together into an Effect<RT, A>.

Pipes are especially useful if you want to build reusable streaming components that you can glue together ad infinitum. Pipes are, arguably, less useful for day-to-day stream processing, like handling events, but your mileage may vary.

More details on the Pipes page.

Open streams

Open streams are closer to what most C# devs have used classically. They are like events or IObservable streams. They yield values and (under certain circumstances) accept inputs.

  • Source and SourceT yield values synchronously or asynchronously depending on their construction.
  • Sink and SinkT receives values and propagates them through the channel they're attached to.
  • Conduit and ConduitT provides and input transducer (acts like a Sink), an internal buffer, and an output transducer (acts like a Source).

I'm calling these 'open streams' because we can Post values to a Sink/SinkT and we can Reduce values yielded by Source/SourceT. So, they are 'open' for public manipulation, unlike Pipes which fuse the public access away.

Source

Source<A> is the 'classic stream': you can lift any of the following types into it: System.Threading.Channels.Channel<A>, IEnumerable<A>, IAsyncEnumerable<A>, or singleton values. To process a stream, you need to use one of the Reduce or ReduceAsync variants. These take Reducer delegates as arguments. They are essentially a fold over the stream of values, which results in an aggregated state once the stream has completed. These reducers can be seen to play a similar role to Subscribe in IObservable streams, but are more principled because they return a value (which we can leverage to carry state for the duration of the stream).

Source also supports some built-in reducers:

  • Last - aggregates no state, simply returns the last item yielded
  • Iter - this forces evaluation of the stream, aggregating no state, and ignoring all yielded values.
  • Collect - adds all yielded values to a Seq<A>, which is then returned upon stream completion.

SourceT

SourceT<M, A> is the classic-stream embellished - it turns the stream into a monad-transformer that can lift any MonadIO-enabled monad (M), allowing side effects to be embedded into the stream in a principled way.

So, for example, to use the IO<A> monad with SourceT, simply use: SourceT<IO, A>. Then you can use one of the following static methods on the SourceT type to lift IO<A> effects into a stream:

  • SourceT.liftM(IO<A> effect) creates a singleton-stream
  • SourceT.foreverM(IO<A> effect) creates an infinite stream, repeating the same effect over and over
  • SourceT.liftM(Channel<IO<A>> channel) lifts a System.Threading.Channels.Channel of effects
  • SourceT.liftM(IEnumerable<IO<A>> effects) lifts an IEnumerable of effects
  • SourceT.liftM(IAsyncEnumerable<IO<A>> effects) lifts an IAsyncEnumerable of effects

Obviously, when lifting non-IO monads, the types above change.

SourceT also supports the same built-in convenience reducers as Source (Last, Iter, Collect).

Sink

Sink<A> provides a way to accept many input values. The values are buffered until consumed. The sink can be thought of as a System.Threading.Channels.Channel (which is the buffer that collects the values) that happens to manipulate the values being posted to the buffer just before they are stored.

This manipulation is possible because the Sink is a CoFunctor (contravariant functor). This is the dual of Functor: we can think of Functor.Map as converting a value from A -> B. Whereas CoFunctor.Comap converts from B -> A.

So, to manipulate values coming into the Sink, use Comap. It will give you a new Sink with the manipulation 'built-in'.

SinkT

SinkT<M, A> provides a way to accept many input values. The values are buffered until consumed. The sink can be thought of as a System.Threading.Channels.Channel (which is the buffer that collects the values) that happens to manipulate the values being posted to the buffer just before they are stored.

This manipulation is possible because the SinkT is a CoFunctor (contravariant functor). This is the dual of Functor: we can think of Functor.Map as converting a value from A -> B. Whereas CoFunctor.Comap converts from B -> A.

So, to manipulate values coming into the SinkT, use Comap. It will give you a new SinkT with the manipulation 'built-in'.

SinkT is also a transformer that lifts types of K<M, A>.

Conduit

Conduit<A, B> can be pictured as so:

+----------------------------------------------------------------+
|                                                                |
|  A --> Transducer --> X --> Buffer --> X --> Transducer --> B  |
|                                                                |
+----------------------------------------------------------------+
  • A value of A is posted to the Conduit (via Post)
  • It flows through an input Transducer, mapping the A value to X (an internal type you can't see)
  • The X value is then stored in the conduit's internal buffer (a System.Threading.Channels.Channel)
  • Any invocation of Reduce will force the consumption of the values in the buffer
  • Flowing each value X through the output Transducer

So the input and output transducers allow for pre and post-processing of values as they flow through the conduit.
Conduit is a CoFunctor, call Comap to manipulate the pre-processing transducer. Conduit is also a Functor, call Map to manipulate the post-processing transducer. There are other non-trait, but common behaviours, like FoldWhile, Filter, Skip, Take, etc.

Conduit supports access to a Sink and a Source for more advanced processing.

ConduitT

ConduitT<M, A, B> can be pictured as so:

+------------------------------------------------------------------------------------------+
|                                                                                          |
|  K<M, A> --> TransducerM --> K<M, X> --> Buffer --> K<M, X> --> TransducerM --> K<M, B>  |
|                                                                                          |
+------------------------------------------------------------------------------------------+
  • A value of K<M, A> is posted to the Conduit (via Post)
  • It flows through an input TransducerM, mapping the K<M, A> value to K<M, X> (an internal type you can't see)
  • The K<M, X> value is then stored in the conduit's internal buffer (a System.Threading.Channels.Channel)
  • Any invocation of Reduce will force the consumption of the values in the buffer
  • Flowing each value K<M, A> through the output TransducerM

So the input and output transducers allow for pre and post-processing of values as they flow through the conduit.
ConduitT is a CoFunctor, call Comap to manipulate the pre-processing transducer. Conduit is also a Functor, call Map to manipulate the post-processing transducer. There are other non-trait, but common behaviours, like FoldWhile, Filter, Skip, Take, etc.

ConduitT supports access to a SinkT and a SourceT for more advanced processing.

Open to closed streams

Clearly, even for 'closed systems' like the Pipes system, it would be beneficial to be able to post values into the streams from the outside. And so, the open-stream components can all be converted into Pipes components like ProducerT and ConsumerT.

  • Conduit and ConduitT support ToProducer, ToProducerT, ToConsumer, and ToConsumerT.
  • Sink and SinkT supports ToConsumer, and ToConsumerT.
  • Source and SourceT supports ToProducer, and ToProducerT.

This allows for the ultimate flexibility in your choice of streaming effect. It also allows for efficient concurrency in the more abstract and compositional world of the pipes. In fact ProducerT.merge, which merges many streams into one, uses ConduitT internally to collect the values and to merge them into a single ProducerT.

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (2)

Showing the top 2 NuGet packages that depend on LanguageExt.Streaming:

Package Downloads
LanguageExt.Parsec

Parser combinators library based on Haskell Parsec. This is part of the LanguageExt functional framework and requires LanguageExt.Core

LanguageExt.Sys

Extensions to language-ext framework effects system that wraps the IO behaviours from the .NET BCL

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
5.0.0-beta-54 695 25 days ago
5.0.0-beta-53 121 25 days ago
5.0.0-beta-52 158 a month ago
5.0.0-beta-51 143 a month ago