Antaeus.CandleFlow.Pipeline
1.0.0
dotnet add package Antaeus.CandleFlow.Pipeline --version 1.0.0
NuGet\Install-Package Antaeus.CandleFlow.Pipeline -Version 1.0.0
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Antaeus.CandleFlow.Pipeline" Version="1.0.0" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Antaeus.CandleFlow.Pipeline" Version="1.0.0" />
<PackageReference Include="Antaeus.CandleFlow.Pipeline" />
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Antaeus.CandleFlow.Pipeline --version 1.0.0
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: Antaeus.CandleFlow.Pipeline, 1.0.0"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Antaeus.CandleFlow.Pipeline@1.0.0
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Antaeus.CandleFlow.Pipeline&version=1.0.0
#tool nuget:?package=Antaeus.CandleFlow.Pipeline&version=1.0.0
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
CandleFlow
A high-performance .NET 8 library for aggregating tick data into OHLCV candlestick charts across multiple timeframes.
Features
- Multi-Timeframe Aggregation: Aggregate ticks into any timeframe (1m, 5m, 15m, 1H, 4H, 1D, 1W, or custom)
- Real-Time Updates: Subscribe to candle updates via IObservable<T> (System.Reactive)
- Batch Processing: Efficient batch import of historical tick data
- Thread-Safe: Designed for high-throughput scenarios (1000+ ticks/second)
- Persistence: EF Core implementation for storing candles and aggregator state
- Retention Policies: Configurable count-based and time-based retention
- Gap Handling: Automatic forward-fill of empty candles during market gaps
Installation
dotnet add package Antaeus.CandleFlow.Core
dotnet add package Antaeus.CandleFlow.Aggregation
dotnet add package Antaeus.CandleFlow.Persistence.EfCore # Optional: for persistence
Quick Start
Basic Usage
using Antaeus.CandleFlow.Aggregation;
using Antaeus.CandleFlow.Core.Entities;
// Create the aggregator with options
var options = new CandleAggregatorOptions
{
DefaultTimeFrames = new[] { TimeFrame.FiveMinutes, TimeFrame.OneHour }
};
using var aggregator = new CandleAggregator(options);
// Add a symbol to track
aggregator.AddSymbol("BTC/USD");
// Process ticks
var tick = new Tick(DateTime.UtcNow, price: 40000m, volume: 1.5m, TradeDirection.Buy);
aggregator.ProcessTick("BTC/USD", tick);
// Get current candle
var currentCandle = aggregator.GetCurrentCandle("BTC/USD", TimeFrame.FiveMinutes);
Console.WriteLine($"Current: O={currentCandle.Open} H={currentCandle.High} L={currentCandle.Low} C={currentCandle.Close}");
// Get closed candles
var closedCandles = aggregator.GetClosedCandles("BTC/USD", TimeFrame.FiveMinutes, count: 10);
Dependency Injection
using Antaeus.CandleFlow.Aggregation;
// In your Startup/Program.cs
services.AddCandleFlow(options =>
{
options.DefaultTimeFrames = new[] { TimeFrame.OneMinute, TimeFrame.FiveMinutes, TimeFrame.OneHour };
options.EnableBatchOptimization = true;
options.BatchThreshold = 100;
});
// Inject ICandleAggregator where needed
public class MyService
{
private readonly ICandleAggregator _aggregator;
public MyService(ICandleAggregator aggregator)
{
_aggregator = aggregator;
}
}
Subscribing to Updates
// Subscribe to all updates (tick-by-tick)
using var subscription = aggregator.Subscribe("BTC/USD", TimeFrame.FiveMinutes)
.Subscribe(update =>
{
Console.WriteLine($"Update: {update.UpdateType} - Close: {update.Candle.Close}");
update.Acknowledge(); // Mark as processed
});
// Subscribe to closed candles only
using var closedSub = aggregator.SubscribeToClosedOnly("BTC/USD", TimeFrame.FiveMinutes)
.Subscribe(update =>
{
Console.WriteLine($"Candle closed: {update.Candle.OpenTime} - OHLC: {update.Candle.Open}/{update.Candle.High}/{update.Candle.Low}/{update.Candle.Close}");
});
Batch Processing
// Process historical tick data efficiently
var historicalTicks = LoadTicksFromDatabase(); // Your data source
aggregator.ProcessBatch("BTC/USD", historicalTicks);
Custom Timeframes
// Use predefined timeframes
aggregator.AddSymbol("ETH/USD", TimeFrame.OneMinute, TimeFrame.FiveMinutes, TimeFrame.OneHour);
// Or create custom timeframes
var sevenMinutes = new TimeFrame(TimeSpan.FromMinutes(7), "7m");
var twoHours = new TimeFrame(TimeSpan.FromHours(2), "2H");
aggregator.AddSymbol("SOL/USD", sevenMinutes, twoHours);
Core Concepts
Tick
A Tick represents a single trade or price update:
public record Tick(
DateTime Timestamp, // UTC timestamp
decimal Price, // Trade price
decimal Volume, // Trade volume
TradeDirection Direction // Buy, Sell, or Unknown
);
Candle
A Candle represents an OHLCV bar:
- Open: First price in the period
- High: Highest price in the period
- Low: Lowest price in the period
- Close: Last price in the period
- Volume: Total volume in the period
- TickCount: Number of ticks aggregated
- IsClosed: Whether the candle period has ended
TimeFrame
Predefined timeframes:
TimeFrame.OneMinute(1m)TimeFrame.FiveMinutes(5m)TimeFrame.FifteenMinutes(15m)TimeFrame.ThirtyMinutes(30m)TimeFrame.OneHour(1H)TimeFrame.FourHours(4H)TimeFrame.OneDay(1D)TimeFrame.OneWeek(1W)
Persistence (Optional)
Setup with EF Core
// Add DbContext
services.AddDbContext<CandleFlowDbContext>(options =>
options.UseSqlServer(connectionString));
// Register repositories
services.AddScoped<ICandleRepository, EfCoreCandleRepository>();
services.AddScoped<IAggregatorStateRepository, EfCoreAggregatorStateRepository>();
Saving Candles
var repository = serviceProvider.GetRequiredService<ICandleRepository>();
// Save a single candle
await repository.SaveAsync(candle);
// Save multiple candles
await repository.SaveBatchAsync(candles);
// Query candles
var recentCandles = await repository.GetRangeAsync(
"BTC/USD",
TimeFrame.FiveMinutes,
DateTime.UtcNow.AddDays(-1),
DateTime.UtcNow);
Retention Policies
using Antaeus.CandleFlow.Aggregation.Retention;
// Count-based retention (keep last N candles)
var countPolicy = new CountBasedRetention(maxCount: 1000);
// Time-based retention (keep candles younger than max age)
var timePolicy = new TimeBasedRetention(maxAge: TimeSpan.FromDays(30));
// Composite policy (both conditions must be met)
var composite = new CompositeRetention(countPolicy, timePolicy, requireAll: true);
// Use with retention manager
var manager = new RetentionManager(composite);
manager.TrackCandle(candle);
manager.Acknowledge(candle);
var evicted = manager.ApplyRetention();
Architecture
Antaeus.CandleFlow.Core
├── Entities/
│ ├── Tick.cs
│ ├── Candle.cs
│ ├── TimeFrame.cs
│ └── TradeDirection.cs
Antaeus.CandleFlow.Aggregation
├── ICandleAggregator.cs
├── CandleAggregator.cs
├── CandleAggregatorOptions.cs
├── Building/
│ ├── ICandleBuilder.cs
│ └── CandleBuilder.cs
├── Hierarchy/
│ ├── ITimeFrameHierarchy.cs
│ └── TimeFrameHierarchy.cs
├── Subscriptions/
│ ├── ICandleUpdate.cs
│ └── SubscriptionManager.cs
├── Retention/
│ ├── IRetentionPolicy.cs
│ └── RetentionManager.cs
└── Threading/
├── CandleStore.cs
└── AsyncTickProcessor.cs
Antaeus.CandleFlow.Persistence.EfCore
├── ICandleRepository.cs
├── EfCoreCandleRepository.cs
├── IAggregatorStateRepository.cs
└── EfCoreAggregatorStateRepository.cs
Performance
CandleFlow is designed for high-throughput scenarios:
- Thread-safe with
ReaderWriterLockSlimfor candle updates Channel<T>for async tick processing- Batch processing with deferred cascade updates
- Efficient memory usage with configurable retention
Benchmarks (typical):
- Single tick processing: ~5 microseconds
- Batch processing: 100,000 ticks in < 2 seconds
- Memory: ~500 bytes per candle
License
MIT License - see LICENSE file for details.
Contributing
Contributions are welcome! Please read our contributing guidelines and submit pull requests.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
-
net8.0
- Antaeus.CandleFlow.Aggregation (>= 1.2.0)
- Antaeus.CandleFlow.Core (>= 1.2.0)
- Antaeus.CandleFlow.Persistence.EfCore (>= 1.2.0)
- Antaeus.CandleFlow.Persistence.Parquet (>= 1.2.0)
- Antaeus.CandleFlow.Persistence.QuestDB (>= 1.2.0)
- Antaeus.CandleFlow.Persistence.Tiered (>= 1.2.0)
- AspNetCore.HealthChecks.Kafka (>= 9.0.0)
- MassTransit (>= 8.4.0)
- MassTransit.Kafka (>= 8.4.0)
- Microsoft.Extensions.Hosting.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Options.ConfigurationExtensions (>= 10.0.0)
- OpenTelemetry.Exporter.OpenTelemetryProtocol (>= 1.11.2)
- OpenTelemetry.Extensions.Hosting (>= 1.11.2)
- OpenTelemetry.Instrumentation.Process (>= 0.5.0-beta.7)
- OpenTelemetry.Instrumentation.Runtime (>= 1.11.1)
- Polly (>= 8.5.2)
- Polly.Extensions (>= 8.5.2)
- System.Text.Json (>= 10.0.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 1.0.0 | 148 | 11/22/2025 |