NPipeline.Connectors
0.19.0
dotnet add package NPipeline.Connectors --version 0.19.0
NuGet\Install-Package NPipeline.Connectors -Version 0.19.0
<PackageReference Include="NPipeline.Connectors" Version="0.19.0" />
<PackageVersion Include="NPipeline.Connectors" Version="0.19.0" />
<PackageReference Include="NPipeline.Connectors" />
paket add NPipeline.Connectors --version 0.19.0
#r "nuget: NPipeline.Connectors, 0.19.0"
#:package NPipeline.Connectors@0.19.0
#addin nuget:?package=NPipeline.Connectors&version=0.19.0
#tool nuget:?package=NPipeline.Connectors&version=0.19.0
NPipeline.Connectors
NPipeline.Connectors is a comprehensive storage abstraction layer for the NPipeline framework that provides a unified interface for accessing different storage systems. It enables pipeline components to work with various storage backends (local file system, cloud storage, databases) through a consistent API, supporting pluggable storage providers with scheme-based URI resolution.
About NPipeline
NPipeline is a high-performance, extensible data processing framework for .NET that enables developers to build scalable and efficient pipeline-based applications. It provides a rich set of components for data transformation, aggregation, branching, and parallel processing, with built-in support for resilience patterns and error handling.
Installation
dotnet add package NPipeline.Connectors
Requirements
- .NET 8.0, 9.0, or 10.0
- Microsoft.Extensions.DependencyInjection.Abstractions 10.0.0 or later (for DI support)
Key Features
- Storage Abstraction: Unified interface for accessing different storage systems through a common API
- Scheme-based URI Resolution: Support for standard URI schemes (file, s3, azure, etc.) with automatic provider selection
- Pluggable Provider Architecture: Easy extensibility with custom storage providers
- Stream-based I/O: Efficient async-first operations with minimal memory footprint
- Built-in File System Provider: Out-of-the-box support for local file system operations
- Dependency Injection Support: Seamless integration with Microsoft.Extensions.DependencyInjection
- Configuration-driven Setup: Flexible provider configuration through code or configuration files
- Cross-platform Compatibility: Works on Windows, Linux, and macOS
Key Components
StorageProviderFactory
The StorageProviderFactory provides factory methods to create and configure storage provider resolvers without requiring dependency injection:
// Create a resolver with built-in file system provider
var resolver = StorageProviderFactory.CreateResolver();
// Create a resolver with additional custom providers
var customProviders = new[] { new S3StorageProvider(), new AzureBlobStorageProvider() };
var resolverResult = StorageProviderFactory.CreateResolver(new StorageResolverOptions
{
IncludeFileSystem = true,
AdditionalProviders = customProviders,
});
var resolver = resolverResult.Resolver;
// Create from configuration and capture errors
var config = new ConnectorConfiguration
{
Providers = new Dictionary<string, StorageProviderConfig>
{
["S3"] = new StorageProviderConfig
{
ProviderType = "MyApp.S3StorageProvider",
Enabled = true,
Settings = new Dictionary<string, string>
{
["Region"] = "us-west-2",
["AccessKey"] = "your-access-key"
}
}
}
};
var (configuredResolver, errors) = StorageProviderFactory.CreateResolver(new StorageResolverOptions
{
Configuration = config,
CollectErrors = true,
});
if (errors.Count > 0)
{
// log or surface configuration issues here
}
// Register a friendly alias for custom providers
StorageProviderFactory.RegisterProviderAlias("s3", typeof(S3StorageProvider));
StorageResolver
The StorageResolver maintains a thread-safe list of explicitly registered providers and resolves them based on URI schemes:
var resolver = new StorageResolver();
// Register providers manually (factory helpers call this for you)
resolver.RegisterProvider(new FileSystemStorageProvider());
resolver.RegisterProvider(new S3StorageProvider());
// Resolve a provider for a specific URI
var fileUri = StorageUri.FromFilePath("./data/input.csv");
var provider = resolver.ResolveProvider(fileUri);
// List all available providers
var providers = resolver.GetAvailableProviders();
FileSystemStorageProvider
The built-in FileSystemStorageProvider handles local file system operations with the "file" scheme:
using NPipeline.StorageProviders;
var provider = new FileSystemStorageProvider();
var fileUri = StorageUri.FromFilePath("./data/output.csv");
// Check if file exists
bool exists = await provider.ExistsAsync(fileUri);
// Open file for reading
using var readStream = await provider.OpenReadAsync(fileUri);
// Open file for writing (creates directories as needed)
using var writeStream = await provider.OpenWriteAsync(fileUri);
// List files in directory
var directoryUri = StorageUri.FromFilePath("./data/");
await foreach (var item in provider.ListAsync(directoryUri, recursive: true))
{
Console.WriteLine($"{item.Uri} - {item.Size} bytes");
}
// Get file metadata
var metadata = await provider.GetMetadataAsync(fileUri);
if (metadata != null)
{
Console.WriteLine($"Size: {metadata.Size}, Modified: {metadata.LastModified}");
}
Database Connector Abstractions
NPipeline.Connectors also provides database-agnostic abstractions for implementing database connectors (PostgreSQL, SQL Server, MySQL, Oracle, etc.). These abstractions enable:
- Unified Database API: Common interfaces for database operations across different database systems
- Extensible Base Classes: Ready-to-use base classes for source and sink nodes
- Configuration Management: Standardized configuration with validation
- Error Handling: Comprehensive exception hierarchy for database errors
- Security: Built-in SQL injection prevention through identifier validation
- Retry Logic: Configurable retry policies for transient errors
Core Interfaces
IDatabaseConnection - Database connection abstraction:
public interface IDatabaseConnection : IAsyncDisposable
{
bool IsOpen { get; }
Task OpenAsync(CancellationToken cancellationToken = default);
Task CloseAsync(CancellationToken cancellationToken = default);
Task<IDatabaseCommand> CreateCommandAsync(CancellationToken cancellationToken = default);
}
IDatabaseCommand - Database command abstraction:
public interface IDatabaseCommand : IAsyncDisposable
{
string CommandText { get; set; }
int CommandTimeout { get; set; }
System.Data.CommandType CommandType { get; set; }
void AddParameter(string name, object? value);
Task<IDatabaseReader> ExecuteReaderAsync(CancellationToken cancellationToken = default);
Task<int> ExecuteNonQueryAsync(CancellationToken cancellationToken = default);
}
IDatabaseReader - Database reader abstraction:
public interface IDatabaseReader : IAsyncDisposable
{
bool HasRows { get; }
int FieldCount { get; }
string GetName(int ordinal);
Type GetFieldType(int ordinal);
Task<bool> ReadAsync(CancellationToken cancellationToken = default);
Task<bool> NextResultAsync(CancellationToken cancellationToken = default);
T? GetFieldValue<T>(int ordinal);
bool IsDBNull(int ordinal);
}
IDatabaseWriter<T> - Database writer abstraction:
public interface IDatabaseWriter<T>
{
Task WriteAsync(T item, CancellationToken cancellationToken = default);
Task WriteBatchAsync(IEnumerable<T> items, CancellationToken cancellationToken = default);
Task FlushAsync(CancellationToken cancellationToken = default);
}
IDatabaseMapper<T> - Database mapper abstraction:
public interface IDatabaseMapper<T>
{
T MapFromReader(IDatabaseReader reader);
IEnumerable<DatabaseParameter> MapToParameters(T item);
}
Base Classes
DatabaseSourceNode<TReader, T> - Base class for database source nodes:
public abstract class DatabaseSourceNode<TReader, T> : SourceNode<T>
where TReader : IDatabaseReader
{
protected abstract Task<IDatabaseConnection> GetConnectionAsync(CancellationToken cancellationToken);
protected abstract Task<TReader> ExecuteQueryAsync(IDatabaseConnection connection, CancellationToken cancellationToken);
protected abstract T MapRow(TReader reader);
protected virtual bool StreamResults => false;
protected virtual int FetchSize => 100;
protected virtual DeliverySemantic DeliverySemantic => DeliverySemantic.AtLeastOnce;
protected virtual CheckpointStrategy CheckpointStrategy => CheckpointStrategy.None;
}
DatabaseSinkNode<T> - Base class for database sink nodes:
public abstract class DatabaseSinkNode<T> : SinkNode<T>
{
protected abstract Task<IDatabaseConnection> GetConnectionAsync(CancellationToken cancellationToken);
protected abstract Task<IDatabaseWriter<T>> CreateWriterAsync(IDatabaseConnection connection, CancellationToken cancellationToken);
protected virtual bool UseTransaction => false;
protected virtual int BatchSize => 100;
protected virtual DeliverySemantic DeliverySemantic => DeliverySemantic.AtLeastOnce;
protected virtual CheckpointStrategy CheckpointStrategy => CheckpointStrategy.None;
protected virtual bool ContinueOnError => false;
}
DatabaseConfigurationBase - Base configuration class:
public abstract class DatabaseConfigurationBase
{
public string ConnectionString { get; set; } = string.Empty;
public int CommandTimeout { get; set; } = 30;
public int ConnectionTimeout { get; set; } = 15;
public int MinPoolSize { get; set; } = 1;
public int MaxPoolSize { get; set; } = 100;
public bool ValidateIdentifiers { get; set; } = true;
public DeliverySemantic DeliverySemantic { get; set; } = DeliverySemantic.AtLeastOnce;
public CheckpointStrategy CheckpointStrategy { get; set; } = CheckpointStrategy.None;
public virtual void Validate();
}
Configuration Enums
DeliverySemantic - Delivery semantics for database operations:
public enum DeliverySemantic
{
AtLeastOnce, // Items may be delivered multiple times but never lost
AtMostOnce, // Items may be lost but never delivered multiple times
ExactlyOnce // Items are delivered exactly once using transactional semantics
}
CheckpointStrategy - Checkpoint strategies for recovery:
public enum CheckpointStrategy
{
None, // No checkpointing
InMemory, // In-process checkpoint state (lost on restart)
Offset, // Offset-based checkpointing using a monotonic column
KeyBased, // Key-based checkpointing using composite keys
Cursor, // Cursor-based checkpointing
CDC // Change Data Capture checkpointing (WAL/LSN position)
}
Utilities
DatabaseRetryPolicy - Retry policy for transient errors:
var retryPolicy = new DatabaseRetryPolicy
{
MaxRetryAttempts = 3,
InitialDelay = TimeSpan.FromSeconds(1),
MaxDelay = TimeSpan.FromSeconds(30),
ShouldRetry = ex => DatabaseErrorClassifier.IsTransientError(ex)
};
var result = await retryPolicy.ExecuteAsync(async ct =>
await ExecuteDatabaseOperation(ct));
DatabaseErrorClassifier - Error classification:
bool isTransient = DatabaseErrorClassifier.IsTransientError(exception);
bool isConnectionError = DatabaseErrorClassifier.IsConnectionError(exception);
bool isMappingError = DatabaseErrorClassifier.IsMappingError(exception);
bool isConstraintViolation = DatabaseErrorClassifier.IsConstraintViolation(exception);
bool isSyntaxError = DatabaseErrorClassifier.IsSyntaxError(exception);
DatabaseConnectionStringBuilder - Connection string utilities:
// Build connection string from parameters
var parameters = new Dictionary<string, string>
{
["Server"] = "localhost",
["Database"] = "mydb",
["Port"] = "5432"
};
var connectionString = DatabaseConnectionStringBuilder.BuildConnectionString(parameters);
// Parse connection string into parameters
var parsed = DatabaseConnectionStringBuilder.ParseConnectionString(connectionString);
DatabaseIdentifierValidator - SQL injection prevention:
// Validate identifier
if (DatabaseIdentifierValidator.IsValidIdentifier(tableName))
{
// Safe to use in SQL
}
// Quote identifier for safe SQL usage
var quoted = DatabaseIdentifierValidator.QuoteIdentifier(tableName, "\"");
// Validate and throw if invalid
DatabaseIdentifierValidator.ValidateIdentifier(tableName, nameof(tableName));
Exceptions
DatabaseExceptionBase - Base exception class:
public abstract class DatabaseExceptionBase : Exception
{
public string? ErrorCode { get; }
public int? SqlState { get; }
}
Specific Exception Types:
DatabaseException- Generic database errorsDatabaseConnectionException- Connection-related errorsDatabaseMappingException- Mapping errors with property nameDatabaseOperationException- Operation errors with error code and SQL stateDatabaseParameter- Record for database parameters
Dependency Injection
using Microsoft.Extensions.DependencyInjection;
using NPipeline.Connectors.DependencyInjection;
// Add database options
var services = new ServiceCollection();
services.AddDatabaseOptions(options =>
{
options.DefaultConnectionString = "Server=localhost;Database=mydb;";
options.NamedConnections["ReadOnly"] = "Server=localhost;Database=mydb;ReadOnly=true;";
});
// Add database options from configuration
services.AddDatabaseOptions<MyDatabaseOptions>("Database");
Database Storage Providers
NPipeline.Connectors ecosystem includes database storage providers that enable environment-aware configuration through URI-based connections. This approach allows seamless switching between local development databases and cloud-hosted databases (e.g., AWS RDS, Azure SQL) by simply changing a URI.
PostgreSQL URI Format
postgres://user:pass@host:port/database?sslmode=require
SQL Server URI Format
mssql://user:pass@host:port/database?encrypt=true
Environment Switching Example
// Development environment
var devUri = StorageUri.Parse("postgres://localhost:5432/mydb?username=postgres&password=devpass");
// Production environment (AWS RDS)
var prodUri = StorageUri.Parse("postgres://mydb.prod.ap-southeast-2.rds.amazonaws.com:5432/mydb?username=produser&password=${DB_PASSWORD}");
// Same pipeline code works in both environments
var source = new PostgresSourceNode<Customer>(uri: devUri, query: "SELECT * FROM customers");
// Switch to production by changing the URI
var prodSource = new PostgresSourceNode<Customer>(uri: prodUri, query: "SELECT * FROM customers");
Benefits
- Environment-Aware Configuration: Store database URIs in configuration files (appsettings.json, environment variables)
- Easy Switching: Change environments without code modifications
- Unified API: Consistent interface across different database systems
- Secure Credential Management: Use environment variable expansion for passwords
Connector-Specific Documentation
For detailed URI parameters and usage examples specific to each database connector, see:
Supported Storage Schemes
NPipeline.Connectors supports an extensible set of storage schemes through its provider architecture:
Built-in Schemes
- file - Local file system access (Windows, Linux, macOS)
- Supports absolute paths:
file:///C:/data/input.csv - Supports relative paths:
file://./data/input.csv - Supports UNC paths:
file://server/share/data/input.csv
- Supports absolute paths:
Extensible Scheme Support
Additional schemes can be supported by implementing custom storage providers:
- s3 - Amazon S3 and S3-compatible storage
- azure - Microsoft Azure Blob Storage
- gcs - Google Cloud Storage
- ftp - FTP/FTPS servers
- sftp - SFTP servers
- http/https - HTTP/HTTPS endpoints
- database - Database storage (custom implementations)
Usage Examples
Basic File System Access
using NPipeline.Connectors;
using NPipeline.StorageProviders;
// Create a file URI from a path
var inputUri = StorageUri.FromFilePath("./data/input.csv");
var outputUri = StorageUri.FromFilePath("./data/output.csv");
// Create resolver with file system provider
var resolver = StorageProviderFactory.CreateResolver();
var provider = StorageProviderFactory.GetProviderOrThrow(resolver, inputUri);
// Read from file
using var inputStream = await provider.OpenReadAsync(inputUri);
using var reader = new StreamReader(inputStream);
var content = await reader.ReadToEndAsync();
// Write to file
using var outputStream = await provider.OpenWriteAsync(outputUri);
using var writer = new StreamWriter(outputStream);
await writer.WriteAsync(content.ToUpperInvariant());
Provider Registration with Dependency Injection
using Microsoft.Extensions.DependencyInjection;
using NPipeline.Connectors;
using NPipeline.Connectors.DependencyInjection;
// Configure services
var services = new ServiceCollection();
// Add the storage resolver with file system provider
services.AddStorageResolver(includeFileSystem: true);
// Add custom storage providers
services.AddStorageProvider<S3StorageProvider>();
services.AddStorageProvider<AzureBlobStorageProvider>();
// Add provider instance
services.AddStorageProvider(new CustomDatabaseStorageProvider(connectionString));
// Build service provider
var serviceProvider = services.BuildServiceProvider();
// Resolve and use the storage resolver
var resolver = serviceProvider.GetRequiredService<IStorageResolver>();
var s3Uri = StorageUri.Parse("s3://my-bucket/data/input.csv");
var provider = resolver.ResolveProvider(s3Uri);
Custom Provider Example (S3)
using NPipeline.Connectors.Abstractions;
public class S3StorageProvider : IStorageProvider
{
public StorageScheme Scheme => StorageScheme.S3;
public bool CanHandle(StorageUri uri)
{
return Scheme.Equals(uri.Scheme) && !string.IsNullOrEmpty(uri.Host);
}
public async Task<Stream> OpenReadAsync(StorageUri uri, CancellationToken cancellationToken = default)
{
// Implementation for reading from S3
var client = GetS3Client();
var request = new GetObjectRequest
{
BucketName = uri.Host,
Key = uri.Path.TrimStart('/')
};
var response = await client.GetObjectAsync(request, cancellationToken);
return response.ResponseStream;
}
public async Task<Stream> OpenWriteAsync(StorageUri uri, CancellationToken cancellationToken = default)
{
// Implementation for writing to S3
var client = GetS3Client();
var request = new PutObjectRequest
{
BucketName = uri.Host,
Key = uri.Path.TrimStart('/'),
InputStream = new MemoryStream() // Will be replaced with actual stream
};
// Return a stream that uploads to S3 when disposed
return new S3UploadStream(client, request, cancellationToken);
}
// Implement other required methods...
}
Configuration
Provider Registration
using Microsoft.Extensions.DependencyInjection;
using NPipeline.Connectors.DependencyInjection;
var services = new ServiceCollection();
// Method 1: Register individual providers
services.AddStorageProvider<FileSystemStorageProvider>();
services.AddStorageProvider<S3StorageProvider>();
services.AddStorageResolver(includeFileSystem: false); // Skip auto-registration
// Method 2: Register from configuration
services.AddStorageProvidersFromConfiguration(config =>
{
config.Providers["S3"] = new StorageProviderConfig
{
ProviderType = "MyApp.Providers.S3StorageProvider",
Enabled = true,
Settings = new Dictionary<string, string>
{
["Region"] = "us-west-2",
["AccessKey"] = "${S3_ACCESS_KEY}",
["SecretKey"] = "${S3_SECRET_KEY}"
}
};
});
// Method 3: Register all discovered providers
services.AddConnectorsFromConfiguration(config =>
{
config.DefaultScheme = "file";
// Configure providers as needed
});
Configurable Provider Implementation
using NPipeline.Connectors.Configuration;
using NPipeline.Connectors.Abstractions;
public class ConfigurableStorageProvider : IStorageProvider, IConfigurableStorageProvider
{
public StorageScheme Scheme { get; private set; } = StorageScheme.Custom;
public void Configure(IReadOnlyDictionary<string, string> settings)
{
// Apply configuration settings
if (settings.TryGetValue("Scheme", out var scheme))
Scheme = new StorageScheme(scheme);
// Configure other properties...
}
// Implement IStorageProvider methods...
}
// Register with configuration
services.AddStorageProvidersFromConfiguration(config =>
{
config.Providers["Custom"] = new StorageProviderConfig
{
ProviderType = "MyApp.Providers.ConfigurableStorageProvider",
Enabled = true,
Settings = new Dictionary<string, string>
{
["Scheme"] = "custom",
["ConnectionString"] = "Server=myserver;Database=mydb;"
}
};
});
Performance Considerations
Stream Usage
- Always dispose streams properly to release resources
- Use appropriate buffer sizes for large file operations
- Consider using
FileStreamwithFileOptions.SequentialScanfor sequential reads
Provider Resolution
- Provider resolution is cached after first use for performance
- Register providers explicitly to avoid reflection overhead
- Use scheme-specific providers when possible for better performance
Async Operations
- All I/O operations are async-first to prevent thread pool starvation
- Use
ConfigureAwait(false)in library code to avoid deadlocks - Consider cancellation tokens for long-running operations
Memory Management
- Stream-based operations minimize memory usage for large files
- Avoid loading entire files into memory when possible
- Use appropriate buffer sizes based on typical file sizes
Related Packages
- NPipeline - Core pipeline framework
- NPipeline.Analyzers - Roslyn analyzers for pipeline development
- NPipeline.Extensions - Additional pipeline components and utilities
License
MIT License - see LICENSE file for details.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.3)
- NPipeline (>= 0.19.0)
- NPipeline.StorageProviders (>= 0.19.0)
-
net8.0
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.3)
- NPipeline (>= 0.19.0)
- NPipeline.StorageProviders (>= 0.19.0)
-
net9.0
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.3)
- NPipeline (>= 0.19.0)
- NPipeline.StorageProviders (>= 0.19.0)
NuGet packages (9)
Showing the top 5 NuGet packages that depend on NPipeline.Connectors:
| Package | Downloads |
|---|---|
|
NPipeline.Connectors.Csv
CSV source and sink nodes for NPipeline using CsvHelper - read and write CSV files with configurable options |
|
|
NPipeline.Connectors.Excel
Excel source and sink nodes for NPipeline using ExcelDataReader - read and write Excel files (XLS, XLSX) with configurable options |
|
|
NPipeline.Connectors.PostgreSQL
PostgreSQL source and sink nodes for NPipeline using Npgsql - read from and write to PostgreSQL databases with configurable options |
|
|
NPipeline.Connectors.SqlServer
SQL Server source and sink nodes for NPipeline using Microsoft.Data.SqlClient - read from and write to SQL Server databases with configurable options |
|
|
NPipeline.Connectors.Json
JSON source and sink nodes for NPipeline using System.Text.Json - read and write JSON files with configurable options |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 0.19.0 | 0 | 2/26/2026 |
| 0.18.2 | 0 | 2/26/2026 |
| 0.18.1 | 0 | 2/26/2026 |
| 0.18.0 | 52 | 2/25/2026 |
| 0.17.0 | 75 | 2/25/2026 |
| 0.16.0 | 138 | 2/24/2026 |
| 0.15.0 | 162 | 2/19/2026 |
| 0.14.0 | 186 | 2/17/2026 |
| 0.13.1 | 177 | 2/13/2026 |
| 0.13.0 | 179 | 2/13/2026 |
| 0.12.0 | 182 | 2/9/2026 |
| 0.11.0 | 183 | 2/8/2026 |
| 0.10.0 | 170 | 2/6/2026 |
| 0.9.1 | 154 | 2/5/2026 |
| 0.9.0 | 153 | 2/5/2026 |
| 0.8.0 | 154 | 2/3/2026 |
| 0.7.1 | 132 | 2/1/2026 |
| 0.7.0 | 137 | 1/31/2026 |
| 0.6.6 | 119 | 1/21/2026 |
| 0.6.5 | 123 | 1/19/2026 |