Rystem.Queue
10.0.7
dotnet add package Rystem.Queue --version 10.0.7
NuGet\Install-Package Rystem.Queue -Version 10.0.7
<PackageReference Include="Rystem.Queue" Version="10.0.7" />
<PackageVersion Include="Rystem.Queue" Version="10.0.7" />
<PackageReference Include="Rystem.Queue" />
paket add Rystem.Queue --version 10.0.7
#r "nuget: Rystem.Queue, 10.0.7"
#:package Rystem.Queue@10.0.7
#addin nuget:?package=Rystem.Queue&version=10.0.7
#tool nuget:?package=Rystem.Queue&version=10.0.7
What is Rystem?
Rystem.Queue
Rystem.Queue is a batching queue library built on top of Rystem.BackgroundJob.
Items are stored in an IQueue<T> implementation and periodically flushed to an IQueueManager<T> implementation. The package includes in-memory FIFO and LIFO backends, plus an integration hook for custom queue providers.
It is most useful for:
- batching writes to external systems
- lightweight buffered ingestion
- collecting events before periodic processing
- swapping queue backends while keeping the same enqueue and flush contracts
The best real examples for this package come from the source itself and the unit test in src/Extensions/Queue/Test/Rystem.Queue.UnitTest/QueueTest.cs.
Resources
- Complete Documentation: https://rystem.net
- MCP Server for AI: https://rystem.cloud/mcp
- Discord Community: https://discord.gg/tkWvy4WPjt
- Support the Project: https://www.buymeacoffee.com/keyserdsoze
Installation
dotnet add package Rystem.Queue
The current 10.x package targets net10.0 and builds on top of Rystem.BackgroundJob.
Package Architecture
The package is centered around five pieces.
| Piece | Purpose |
|---|---|
IQueue<T> |
Storage abstraction for queued items |
IQueueManager<T> |
Batch processor invoked when a flush happens |
QueueProperty<T> |
Per-queue settings for thresholds and schedules |
QueueJobManager<T> |
Internal background job that decides when to flush |
AddQueueIntegration(...) |
DI entry point that wires queue, manager, settings, and background job |
The two built-in queue backends are:
MemoryQueue<T>for FIFO behaviorMemoryStackQueue<T>for LIFO behavior
At a high level, the flow is:
- register a queue and a queue manager
- enqueue items through
IQueue<T> - run
WarmUpAsync()so the background job starts - the queue job checks size and schedule conditions
- when a flush occurs, it dequeues items and passes them to
IQueueManager<T>.ManageAsync(...)
Table of Contents
- Package Architecture
- Implement a Queue Manager
- Register a Queue
- QueueProperty
- Flush Behavior and Warm-up
- Using IQueue
- Custom Queue Backends
- Repository Examples
Implement a Queue Manager
IQueueManager<T> is the consumer that receives a flushed batch.
using Rystem.Queue;
public sealed class SampleQueueManager : IQueueManager<Sample>
{
private readonly ILogger<SampleQueueManager> _logger;
public SampleQueueManager(ILogger<SampleQueueManager> logger)
{
_logger = logger;
}
public Task ManageAsync(IEnumerable<Sample> items)
{
_logger.LogInformation("Processing {Count} items", items.Count());
return Task.CompletedTask;
}
}
IQueueManager contract
public interface IQueueManager<in T>
{
Task ManageAsync(IEnumerable<T> items);
}
The manager should be able to process the whole batch in one call.
Dependency injection behavior
AddQueueIntegration(...) registers IQueueManager<T> as Transient.
When a flush happens, QueueJobManager<T> resolves the manager from a fresh DI scope:
var service = _serviceProvider.CreateScope().ServiceProvider.GetService<IQueueManager<T>>();
That means scoped dependencies behave per flush, not for the lifetime of the application. The unit test manager in src/Extensions/Queue/Test/Rystem.Queue.UnitTest/QueueTest.cs demonstrates this by resolving singleton, scoped, and transient dependencies inside the manager.
Register a Queue
In-memory FIFO queue
Register the built-in FIFO queue with AddMemoryQueue<T, TQueueManager>():
services.AddMemoryQueue<Sample, SampleQueueManager>(options =>
{
options.MaximumBuffer = 1000;
options.MaximumRetentionCronFormat = "*/3 * * * * *";
options.BackgroundJobCronFormat = "*/1 * * * * *";
});
This uses MemoryQueue<T>, which is backed by ConcurrentQueue<T>.
In-memory LIFO stack queue
If you want stack-like behavior instead, use AddMemoryStackQueue<T, TQueueManager>():
services.AddMemoryStackQueue<Sample, SampleQueueManager>(options =>
{
options.MaximumBuffer = 1000;
options.MaximumRetentionCronFormat = "*/3 * * * * *";
options.BackgroundJobCronFormat = "*/1 * * * * *";
});
This uses MemoryStackQueue<T>, which is backed by ConcurrentStack<T>.
Generic queue integration
The common registration path is:
services.AddQueueIntegration<T, TQueueManager, TQueue>(options =>
{
// configure QueueProperty<T>
});
Internally it registers:
QueueProperty<T>as singletonIQueue<T>as singletonIQueueManager<T>as transientQueueJobManager<T>throughAddBackgroundJob(...)
The background queue worker is configured with:
x.Cron = settings.BackgroundJobCronFormat;
x.RunImmediately = false;
So queue processing always depends on the Rystem.BackgroundJob scheduler and does not run immediately at startup unless the first scheduled tick occurs.
QueueProperty
QueueProperty<T> contains the queue settings:
public sealed class QueueProperty<T>
{
public int MaximumBuffer { get; set; } = 5000;
public string MaximumRetentionCronFormat { get; set; } = "*/1 * * * *";
public string BackgroundJobCronFormat { get; set; } = "*/1 * * * *";
}
| Property | Default | Purpose |
|---|---|---|
MaximumBuffer |
5000 |
Flush when the queued item count goes above this value |
MaximumRetentionCronFormat |
"*/1 * * * *" |
Retention schedule used by QueueJobManager<T> when computing flush timing |
BackgroundJobCronFormat |
"*/1 * * * *" |
How often the background worker checks the queue |
QueueProperty<T> is generic only so it can be registered separately per queue type.
Flush Behavior and Warm-up
Warm-up starts the queue worker
Because the queue worker is implemented as a background job, it starts only after warm-up runs:
var app = builder.Build();
await app.Services.WarmUpAsync();
app.Run();
The unit test does the same with:
serviceProvider.WarmUpAsync().ToResult();
Without warm-up, the scheduled queue flushes never begin.
Buffer threshold
The internal queue worker flushes when:
await _queue.CountAsync() > _property.MaximumBuffer
Note the comparison is > rather than >=.
That is why the test uses 1001 items when MaximumBuffer = 1000:
for (int i = 0; i < 1001; i++)
await queue.AddAsync(new Sample { Id = i.ToString() });
After a short wait, the queue is empty again:
Assert.Equal(0, await queue.CountAsync());
Retention and polling cadence
The queue worker runs on BackgroundJobCronFormat, and that scheduled execution is the outer polling loop for flushes.
Inside the worker, MaximumRetentionCronFormat is parsed with Cronos to compute the next retention occurrence when flush logic runs. In practice, the background job cadence is what determines how often the queue can be inspected, so keep BackgroundJobCronFormat at or below the level of responsiveness you want.
The test-backed example configures:
options.MaximumRetentionCronFormat = "*/3 * * * * *";
options.BackgroundJobCronFormat = "*/1 * * * * *";
With that setup, the queue is checked every second and flushed during the scheduled processing loop.
Using IQueue
Inject IQueue<T> wherever items should be buffered.
using Rystem.Queue;
public sealed class OrderService
{
private readonly IQueue<Sample> _queue;
public OrderService(IQueue<Sample> queue)
{
_queue = queue;
}
public async Task EnqueueAsync()
{
for (int i = 0; i < 100; i++)
await _queue.AddAsync(new Sample { Id = i.ToString() });
}
}
IQueue contract
public interface IQueue<T>
{
Task AddAsync(T entity);
Task<IEnumerable<T>> DequeueAsync(int? top = null);
Task<IEnumerable<T>> ReadAsync(int? top = null);
Task<int> CountAsync();
}
Typical usage:
await queue.AddAsync(new Sample { Id = "1" });
IEnumerable<Sample> preview = await queue.ReadAsync(top: 10);
IEnumerable<Sample> batch = await queue.DequeueAsync(top: 50);
int count = await queue.CountAsync();
Memory queue semantics
The built-in in-memory implementations behave like this:
MemoryQueue<T>is FIFOMemoryStackQueue<T>is LIFO- both implementations are singleton-backed, so queued items live for the application lifetime unless dequeued
Custom Queue Backends
To plug in your own queue storage, implement IQueue<T> and register it through AddQueueIntegration(...).
using Rystem.Queue;
public sealed class ServiceBusQueue<T> : IQueue<T>
{
public Task AddAsync(T entity) => Task.CompletedTask;
public Task<IEnumerable<T>> ReadAsync(int? top = null) => Task.FromResult(Enumerable.Empty<T>());
public Task<IEnumerable<T>> DequeueAsync(int? top = null) => Task.FromResult(Enumerable.Empty<T>());
public Task<int> CountAsync() => Task.FromResult(0);
}
services.AddQueueIntegration<Sample, SampleQueueManager, ServiceBusQueue<Sample>>(options =>
{
options.MaximumBuffer = 1000;
options.MaximumRetentionCronFormat = "*/3 * * * * *";
options.BackgroundJobCronFormat = "*/1 * * * * *";
});
When writing a custom backend, keep these semantics aligned with the queue worker:
CountAsync()should reflect the current queued count as accurately as possibleDequeueAsync()should remove the returned itemsReadAsync()should not remove items
Repository Examples
The most useful references for this package are:
- Queue registration entry point: src/Extensions/Queue/Rystem.Queue/ServiceCollectionExtensions/ServiceCollectionExtensions.cs
- Queue settings model: src/Extensions/Queue/Rystem.Queue/Models/QueueProperty.cs
- Queue worker background job: src/Extensions/Queue/Rystem.Queue/BackgroundJob/QueueJobManager.cs
- Queue contract: src/Extensions/Queue/Rystem.Queue/Interfaces/IQueue.cs
- Queue manager contract: src/Extensions/Queue/Rystem.Queue/Interfaces/IQueueManager.cs
- FIFO in-memory backend: src/Extensions/Queue/Rystem.Queue/InMemory/MemoryQueue.cs
- LIFO in-memory backend: src/Extensions/Queue/Rystem.Queue/InMemory/MemoryStackQueue.cs
- Unit test: src/Extensions/Queue/Test/Rystem.Queue.UnitTest/QueueTest.cs
This README is intentionally architecture-first because Rystem.Queue is more than just an in-memory queue. It is a small batching pipeline built from a queue abstraction, a batch manager abstraction, and a scheduled worker from Rystem.BackgroundJob.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Rystem.BackgroundJob (>= 10.0.7)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 10.0.7 | 92 | 3/26/2026 |
| 10.0.6 | 200,054 | 3/3/2026 |
| 10.0.5 | 128 | 2/22/2026 |
| 10.0.4 | 150 | 2/9/2026 |
| 10.0.3 | 147,923 | 1/28/2026 |
| 10.0.1 | 209,136 | 11/12/2025 |
| 9.1.3 | 298 | 9/2/2025 |
| 9.1.2 | 764,537 | 5/29/2025 |
| 9.1.1 | 97,838 | 5/2/2025 |
| 9.0.32 | 186,672 | 4/15/2025 |
| 9.0.31 | 5,827 | 4/2/2025 |
| 9.0.30 | 88,885 | 3/26/2025 |
| 9.0.29 | 9,075 | 3/18/2025 |
| 9.0.28 | 292 | 3/17/2025 |
| 9.0.27 | 292 | 3/16/2025 |
| 9.0.26 | 326 | 3/13/2025 |
| 9.0.25 | 52,166 | 3/9/2025 |
| 9.0.20 | 19,601 | 3/6/2025 |
| 9.0.19 | 350 | 3/6/2025 |
| 9.0.18 | 378 | 3/4/2025 |