Synadia.Orbit.JetStream.Extensions
1.0.0-preview.3
Prefix Reserved
dotnet add package Synadia.Orbit.JetStream.Extensions --version 1.0.0-preview.3
NuGet\Install-Package Synadia.Orbit.JetStream.Extensions -Version 1.0.0-preview.3
<PackageReference Include="Synadia.Orbit.JetStream.Extensions" Version="1.0.0-preview.3" />
<PackageVersion Include="Synadia.Orbit.JetStream.Extensions" Version="1.0.0-preview.3" />
<PackageReference Include="Synadia.Orbit.JetStream.Extensions" />
paket add Synadia.Orbit.JetStream.Extensions --version 1.0.0-preview.3
#r "nuget: Synadia.Orbit.JetStream.Extensions, 1.0.0-preview.3"
#:package Synadia.Orbit.JetStream.Extensions@1.0.0-preview.3
#addin nuget:?package=Synadia.Orbit.JetStream.Extensions&version=1.0.0-preview.3&prerelease
#tool nuget:?package=Synadia.Orbit.JetStream.Extensions&version=1.0.0-preview.3&prerelease
JetStream Extensions
These are utilities extends NATS JetStream client functionality.
Direct batch
Direct batch is beta. It only works with the 2.11.x NATS Server.
The direct batch functionality leverages the direct message capabilities introduced in NATS Server 2.11 The functionality is described in ADR-31.
// dotnet add package nats.net
// dotnet add package Synadia.Orbit.JetStream.Extensions --prerelease
await using var client = new NatsClient();
INatsJSContext js = client.CreateJetStreamContext();
await js.CreateStreamAsync(new StreamConfig(name, [suject]) { AllowDirect = true }, ct);
for (int i = 0; i < 10; i++)
{
await js.PublishAsync(subject: suject, i, cancellationToken: ct);
}
StreamMsgBatchGetRequest request = new()
{
Batch = 8,
Seq = 1,
};
int count = 0;
await foreach (NatsMsg<int> msg in js.GetBatchDirectAsync<int>(name, request, cancellationToken: ct))
{
Assert.Equal(count++, msg.Data);
Console.WriteLine($"GetBatchDirectAsync: {msg.Data}");
}
Assert.Equal(8, count);
Scheduled Messages
A stream can be configured to allow scheduled messages. A scheduled message is a message published
to a subject that will in turn publish the message to a target subject at a specified time or on a
repeating interval. See ADR-51
for the full specification. The stream must have AllowMsgSchedules = true.
Server requirement: Features marked 2.14+ are not yet released. To test them, install the server from main (requires Go):
go install github.com/nats-io/nats-server/v2@main
| Use case | Constructor | Source | Server version |
|---|---|---|---|
| Delayed publish | NatsMsgSchedule(DateTimeOffset, target) |
null | 2.12+ |
| Recurring publish | NatsMsgSchedule(TimeSpan, target) |
null | 2.14+ |
| Data sampling | NatsMsgSchedule(TimeSpan, target) { Source = ... } |
set | 2.14+ |
Note: Cron expressions and timezone support are defined in ADR-51 but not yet implemented in the server. The
NatsMsgSchedule(string, string)raw constructor is available for forward compatibility with future schedule types.
Delayed Publish (NATS Server 2.12+)
Use @at to deliver a message once at a future time:
// dotnet add package nats.net
// dotnet add package synadia.orbit.jetstream.extensions --prerelease
await using var client = new NatsClient();
var js = client.CreateJetStreamContext();
var stream = await js.CreateStreamAsync(new StreamConfig("SCHEDULING_STREAM", ["scheduling.>", "events.>"])
{
AllowMsgSchedules = true,
AllowMsgTTL = true,
});
// Schedule a message for 10 seconds from now
var scheduleAt = DateTimeOffset.UtcNow.AddSeconds(10);
var schedule = new NatsMsgSchedule(scheduleAt, "events.it_is_time")
{
Ttl = TimeSpan.FromSeconds(15), // Optional: TTL on the produced message
};
var ack = await js.PublishScheduledAsync(
subject: "scheduling.check_later",
data: $"message for later {scheduleAt}",
schedule: schedule);
ack.EnsureSuccess();
Recurring Publish (NATS Server 2.14+)
Use the TimeSpan constructor for repeating schedules (minimum interval is 1 second):
var schedule = new NatsMsgSchedule(TimeSpan.FromMinutes(5), "events.periodic_check");
var ack = await js.PublishScheduledAsync(
subject: "scheduling.repeater",
data: "periodic payload",
schedule: schedule);
ack.EnsureSuccess();
Data Sampling with Source (NATS Server 2.14+)
Combine a repeating schedule with a source subject to periodically republish the latest message from one subject to another. When the schedule fires, it sources the latest message's data and headers from the source subject and publishes them to the target.
var stream = await js.CreateStreamAsync(new StreamConfig("SENSORS", ["sensors.*"])
{
AllowMsgSchedules = true,
AllowMsgTTL = true,
});
// Sensor data is published to sensors.raw by some producer
// ...
// Sample the latest sensor reading every 5 minutes
var schedule = new NatsMsgSchedule(TimeSpan.FromMinutes(5), "sensors.sampled")
{
Source = "sensors.raw",
Ttl = TimeSpan.FromMinutes(6),
};
var ack = await js.PublishScheduledAsync("sensors.schedule", schedule);
ack.EnsureSuccess();
// Every 5 minutes the server will:
// 1. Load the latest message from sensors.raw
// 2. Publish its data + headers to sensors.sampled
// 3. Add Nats-Scheduler and Nats-Schedule-Next headers
The source subject must be a literal (no wildcards) and must not match the schedule or target subjects. If no message exists on the source subject when the schedule fires, the schedule is removed without producing a message.
Source also works with one-shot @at schedules for a single delayed republish:
var schedule = new NatsMsgSchedule(DateTimeOffset.UtcNow.AddMinutes(10), "sensors.snapshot")
{
Source = "sensors.raw",
};
TTL Options
- Minimum TTL is 1 second
- Use
TimeSpan.MaxValueto indicate the produced message should never expire ("never") - The stream must have
AllowMsgTTL = truewhen using TTL
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net5.0 was computed. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
| .NET Core | netcoreapp2.0 was computed. netcoreapp2.1 was computed. netcoreapp2.2 was computed. netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
| .NET Standard | netstandard2.0 is compatible. netstandard2.1 is compatible. |
| .NET Framework | net461 was computed. net462 was computed. net463 was computed. net47 was computed. net471 was computed. net472 was computed. net48 was computed. net481 was computed. |
| MonoAndroid | monoandroid was computed. |
| MonoMac | monomac was computed. |
| MonoTouch | monotouch was computed. |
| Tizen | tizen40 was computed. tizen60 was computed. |
| Xamarin.iOS | xamarinios was computed. |
| Xamarin.Mac | xamarinmac was computed. |
| Xamarin.TVOS | xamarintvos was computed. |
| Xamarin.WatchOS | xamarinwatchos was computed. |
-
.NETStandard 2.0
- Microsoft.Bcl.AsyncInterfaces (>= 8.0.0)
- NATS.Client.JetStream (>= 2.7.2)
- System.Buffers (>= 4.5.1)
- System.Diagnostics.DiagnosticSource (>= 8.0.1)
- System.Memory (>= 4.5.5)
- System.Threading.Channels (>= 8.0.0)
-
.NETStandard 2.1
- NATS.Client.JetStream (>= 2.7.2)
- System.Diagnostics.DiagnosticSource (>= 8.0.1)
- System.Threading.Channels (>= 8.0.0)
-
net10.0
- NATS.Client.JetStream (>= 2.7.2)
-
net8.0
- NATS.Client.JetStream (>= 2.7.2)
-
net9.0
- NATS.Client.JetStream (>= 2.7.2)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 1.0.0-preview.3 | 44 | 2/23/2026 |
| 1.0.0-preview.2 | 154 | 11/27/2025 |
| 1.0.0-preview.1 | 616 | 2/27/2025 |