ObservablePipelines 1.0.5

dotnet add package ObservablePipelines --version 1.0.5                
NuGet\Install-Package ObservablePipelines -Version 1.0.5                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="ObservablePipelines" Version="1.0.5" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add ObservablePipelines --version 1.0.5                
#r "nuget: ObservablePipelines, 1.0.5"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install ObservablePipelines as a Cake Addin
#addin nuget:?package=ObservablePipelines&version=1.0.5

// Install ObservablePipelines as a Cake Tool
#tool nuget:?package=ObservablePipelines&version=1.0.5                

ObservablePipelines

A simple .NET package to achieve a clean pipeline architecture with Observables and injectable pipes.

When to use

This package is useful in scenarios, where you want to explicitly state what happens step by step. A Pipe would represent the perfect Single Responsibility Object, as it will only handle one explicit case, e.g. filtering, or transforming.

Additionally you can construct Pipelines with Dependency Injection, and thus save a lot of construction code and unnecessary dependencies.

Introduction

This package contains a few simple interfaces and logic, that allows you to configure and build Pipelines extremely clean, fast and dynamic with Dependency Injection and IObservable's. The only reference needed is "Microsoft.Extensions.DependencyInjection", as the PipelineBuilder uses this internally to dynamically construct the Pipelines.

Observables are very well suited for this job, as they are already extremly well usable with the package System.Reactive.Linq. When using Observables with this package, you are able to handle streams of events with a LINQ like query. They also enable you to filter the event streams. This would not be possible with using T instead of IObservable<T>, as you would need to return null inside a pipe or anything like that.

If you have not yet worked with Observables, I highly recommend checking out this tutorial:

Introduction to Rx

Usage

1. Install the package:

Install-Package ObservablePipelines
or
dotnet add package ObservablePipelines
Install-Package System.Reactive.Linq
or
dotnet add package System.Reactive.Linq

3. Add Dependencies to DI-Container

using Microsoft.Extensions.DependencyInjection;

services.AddObservablePipelines()

4. Implement IPipe<TIn, TOut>

internal class LoggerPipe : IPipe<ChatMessage, ChatMessage>
{
    private readonly ILogger<LoggerPipe> logger;

    public LoggerPipe(ILogger<LoggerPipe> logger) {
        this.logger = logger
            ?? throw new ArgumentNullException(nameof(logger));
    }

    public IObservable<ChatMessage> Handle(IObservable<ChatMessage> source) {
        return source
            .Do(m =>
                logger.LogInformation($"Pipeline triggered for message: '{m.Message}'.")
            );
    }
}

Inside the Pipe, I recommend you to use System.Reactive.Linq to handle the event streams. In the constructor you can add all dependencies, that you set up the DI-Container with.

6. Inject and use IPipelineBuilder

var pipeline = pipelineBuilder
    .ConfigureOptions(builder => builder
        .Add(new MessageFilterPipeOptions(Guid.Empty))
    )
    .ConfigurePipeline(builder => builder
        .AddSource(chatMessages)
        .AddStep<LoggerPipe, ChatMessage>()
        .AddStep<MessageFilterPipe, ChatMessage>()
        .AddStep<MessageTransformPipe, IdentifiedChatMessage>()
        .AddStep(new ConsoleLoggerPipe())
    )
    .Build();

pipeline.Subscribe(m =>
    logger.LogInformation($"New Message from {m.SenderName}: {m.Message}.")
);

Here you can add Options-Instances, that were not added to the global DI-Container. The PipelineBuilder clones the global DI-Container and adds all Options-Instances to it. With them you are able to explitly configure your Pipes for certain usecases and you can use the same Pipe with different configurations.

First, you need to add the source of the Pipeline with AddSource(). After that, you can add your IPipes as generics, where you also have to specify the type of the output. This way it will be built, using Dependency Injection.

Alternatively, you can add an instance and do not need to specify generic type arguments.

Configuration

As the PipelineBuilder uses Dependency-Injection, you can add Options-Instances to the builders ServiceCollection, by calling Configure(). The type, that you add to the ConfigurationBuilder should be unique, so that it can be injected into the correct pipe.

internal record MessageFilterPipeOptions(Guid ReceiverId);

internal class MessageFilterPipe : IPipe<ChatMessage, ChatMessage>
{
    private readonly MessageFilterPipeOptions options;

    public MessageFilterPipe(MessageFilterPipeOptions options) {
        this.options = options;
    }
}

pipelineBuilder
    .Configure(builder => builder
        .Add(new MessageFilterPipeOptions(Guid.Empty))
    );

Types to use

  • IPipe<TIn,TOut>: Pipe Interface with ingoing and outgoing IObservable Stream
  • IPipelineBuilder: Builder, to set up a Pipeline with Configurations, a Source and multiple Pipes.
Product Compatible and additional computed target framework versions.
.NET net5.0 was computed.  net5.0-windows was computed.  net6.0 was computed.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
.NET Core netcoreapp3.0 was computed.  netcoreapp3.1 was computed. 
.NET Standard netstandard2.1 is compatible. 
MonoAndroid monoandroid was computed. 
MonoMac monomac was computed. 
MonoTouch monotouch was computed. 
Tizen tizen60 was computed. 
Xamarin.iOS xamarinios was computed. 
Xamarin.Mac xamarinmac was computed. 
Xamarin.TVOS xamarintvos was computed. 
Xamarin.WatchOS xamarinwatchos was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
1.0.5 1,406 1/18/2023
1.0.4 299 12/6/2022
1.0.3 410 10/24/2022
1.0.2 409 10/20/2022
0.0.2 639 3/5/2022
0.0.1 383 3/5/2022