Synadia.Orbit.JetStream.Publisher
1.0.0-preview.6
Prefix Reserved
See the version list below for details.
dotnet add package Synadia.Orbit.JetStream.Publisher --version 1.0.0-preview.6
NuGet\Install-Package Synadia.Orbit.JetStream.Publisher -Version 1.0.0-preview.6
<PackageReference Include="Synadia.Orbit.JetStream.Publisher" Version="1.0.0-preview.6" />
<PackageVersion Include="Synadia.Orbit.JetStream.Publisher" Version="1.0.0-preview.6" />
<PackageReference Include="Synadia.Orbit.JetStream.Publisher" />
paket add Synadia.Orbit.JetStream.Publisher --version 1.0.0-preview.6
#r "nuget: Synadia.Orbit.JetStream.Publisher, 1.0.0-preview.6"
#:package Synadia.Orbit.JetStream.Publisher@1.0.0-preview.6
#addin nuget:?package=Synadia.Orbit.JetStream.Publisher&version=1.0.0-preview.6&prerelease
#tool nuget:?package=Synadia.Orbit.JetStream.Publisher&version=1.0.0-preview.6&prerelease
JetStream Publisher
Higher-level publishers for NATS JetStream:
- Backpressure publisher (
JetStreamPublisher<T>) — high-throughput publishing with an async ack channel so producers can pipeline without awaiting every ack inline. - Atomic batch publisher (
NatsJSBatchPublisher) — stage a group of messages and commit them atomically (all-or-nothing), with optional flow control. Requires NATS Server 2.12+ andAllowAtomicPublish = trueon the stream. See ADR-50. - Fast-ingest publisher (
NatsJSFastPublisher) — high-throughput batch publishing without atomicity. Server-driven flow control, optional per-message gap reporting. Requires NATS Server 2.14+ andAllowBatchPublish = trueon the stream.
Backpressure Publisher
// dotnet add package nats.net
// dotnet add package Synadia.Orbit.JetStream.Publisher
await using var client = new NatsClient();
var js = client.CreateJetStreamContext();
var stream = await js.CreateStreamAsync(new StreamConfig("TEST_STREAM", ["test.>"]));
await stream.PurgeAsync(new StreamPurgeRequest { Keep = 0 });
JetStreamPublisher<string> publisher = client.CreateOrbitJetStreamPublisher<string>();
await publisher.StartAsync();
int numberOfMessages = 1_000_000;
var sub = Task.Run(
async () =>
{
int count = 0;
await foreach (var status in publisher.SubscribeAsync())
{
count++;
if (!status.Acknowledged)
{
Console.WriteLine($"Error publishing message: {status.Subject}: {status.Error.GetType().Name}");
}
if (count == numberOfMessages)
{
break;
}
}
});
Stopwatch stopwatch = Stopwatch.StartNew();
for (int i = 0; i < numberOfMessages; i++)
{
await publisher.PublishAsync($"test.msg{i}", $"Test message {i}");
}
await sub;
stopwatch.Stop();
Console.WriteLine($"Took {stopwatch.Elapsed} at {numberOfMessages / stopwatch.Elapsed.TotalSeconds:N0} msgs/sec");
Atomic Batch Publishing
Stage messages with AddAsync / AddMsgAsync and finalize with CommitAsync /
CommitMsgAsync. The server persists the batch atomically: either every staged message
is written, or none are. Call Discard() to abandon a batch without committing. The
default server-side limit is 1000 messages per batch.
// dotnet add package nats.net
// dotnet add package Synadia.Orbit.JetStream.Publisher
await using var client = new NatsClient();
var js = client.CreateJetStreamContext();
await js.CreateStreamAsync(new StreamConfig("ORDERS", ["orders.>"])
{
AllowAtomicPublish = true,
});
await using var batch = new NatsJSBatchPublisher(js);
await batch.AddAsync("orders.1", "first"u8.ToArray());
await batch.AddAsync("orders.2", "second"u8.ToArray());
NatsJSBatchAck ack = await batch.CommitAsync("orders.3", "third"u8.ToArray());
Console.WriteLine($"Committed {ack.BatchSize} messages as batch {ack.BatchId} to {ack.Stream}");
For a one-shot batch of messages already in memory, use the PublishMsgBatchAsync
extension on INatsJSContext:
var messages = new[]
{
new NatsMsg<byte[]> { Subject = "orders.a", Data = "a"u8.ToArray() },
new NatsMsg<byte[]> { Subject = "orders.b", Data = "b"u8.ToArray() },
new NatsMsg<byte[]> { Subject = "orders.c", Data = "c"u8.ToArray() },
};
NatsJSBatchAck ack = await js.PublishMsgBatchAsync(messages);
Per-message options
Pass NatsJSBatchMsgOpts to set per-message TTL and server-side expectations:
await batch.AddAsync(
"orders.1",
"first"u8.ToArray(),
new NatsJSBatchMsgOpts
{
Stream = "ORDERS",
LastSeq = 42,
Ttl = TimeSpan.FromMinutes(5),
});
Supported options: Ttl, Stream, LastSeq, LastSubjectSeq, LastSubject.
Flow control
Pass NatsJSBatchFlowControl to the publisher to wait for intermediate acks and avoid
overrunning the server:
await using var batch = new NatsJSBatchPublisher(
js,
new NatsJSBatchFlowControl
{
AckFirst = true, // wait for ack on the first message
AckEvery = 100, // then wait every Nth message
AckTimeout = TimeSpan.FromSeconds(5),
});
Error handling
All batch publish errors derive from NatsJSBatchPublishException. Catch the base type
or a specific subtype per server error code:
| Exception | Error code |
|---|---|
NatsJSBatchPublishNotEnabledException |
10174 |
NatsJSBatchPublishMissingSeqException |
10175 |
NatsJSBatchPublishIncompleteException |
10176 |
NatsJSBatchPublishUnsupportedHeaderException |
10177 |
NatsJSBatchPublishExceedsLimitException |
10199 |
NatsJSBatchClosedException is thrown when using a publisher after commit or discard.
NatsJSInvalidBatchAckException is thrown when the server's batch ack response does not
match the committed batch.
Fast-Ingest Batch Publishing
NatsJSFastPublisher delivers high-throughput batch publishing without atomicity.
Individual messages may be lost (gaps) and a partial batch may persist. The server
drives flow control: AddAsync / AddMsgAsync stalls when the configured outstanding
ack window is exceeded. Stream message-count limit per batch is unbounded.
// dotnet add package nats.net
// dotnet add package Synadia.Orbit.JetStream.Publisher
await using var client = new NatsClient();
var js = client.CreateJetStreamContext();
await js.CreateStreamAsync(new StreamConfig("METRICS", ["metrics.>"])
{
AllowBatchPublish = true,
});
await using var batch = js.CreateOrbitFastPublisher();
for (int i = 0; i < 100_000; i++)
{
await batch.AddAsync($"metrics.{i % 16}", $"value-{i}"u8.ToArray());
}
NatsJSBatchAck ack = await batch.CloseAsync();
Console.WriteLine($"Persisted {ack.BatchSize} messages to {ack.Stream}");
Use CommitAsync / CommitMsgAsync to publish a final message and commit, or
CloseAsync to commit without storing a final message (end-of-batch).
Flow control and gap handling
await using var batch = js.CreateOrbitFastPublisher(new NatsJSFastPublisherOpts
{
ContinueOnGap = true, // server reports gaps but keeps going
ErrorHandler = ex => Console.WriteLine(ex), // gaps and per-message errors
FlowControl = new NatsJSFastPublishFlowControl
{
Flow = 200,
MaxOutstandingAcks = 4,
AckTimeout = TimeSpan.FromSeconds(5),
},
});
When ContinueOnGap is false (default) the server abandons the batch on the first
gap. When true, gaps surface via ErrorHandler as NatsJSFastPublishGapException
and the batch continues. Per-message server errors arrive as
NatsJSFastPublishMessageException.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net5.0 was computed. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
| .NET Core | netcoreapp2.0 was computed. netcoreapp2.1 was computed. netcoreapp2.2 was computed. netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
| .NET Standard | netstandard2.0 is compatible. netstandard2.1 is compatible. |
| .NET Framework | net461 was computed. net462 was computed. net463 was computed. net47 was computed. net471 was computed. net472 was computed. net48 was computed. net481 was computed. |
| MonoAndroid | monoandroid was computed. |
| MonoMac | monomac was computed. |
| MonoTouch | monotouch was computed. |
| Tizen | tizen40 was computed. tizen60 was computed. |
| Xamarin.iOS | xamarinios was computed. |
| Xamarin.Mac | xamarinmac was computed. |
| Xamarin.TVOS | xamarintvos was computed. |
| Xamarin.WatchOS | xamarinwatchos was computed. |
-
.NETStandard 2.0
- Microsoft.Bcl.AsyncInterfaces (>= 8.0.0)
- Microsoft.Bcl.Memory (>= 9.0.14)
- NATS.Client.JetStream (>= 2.8.0-preview.2)
- System.Diagnostics.DiagnosticSource (>= 8.0.1)
- System.Threading.Channels (>= 8.0.0)
-
.NETStandard 2.1
- NATS.Client.JetStream (>= 2.8.0-preview.2)
- System.Diagnostics.DiagnosticSource (>= 8.0.1)
- System.Threading.Channels (>= 8.0.0)
-
net10.0
- NATS.Client.JetStream (>= 2.8.0-preview.2)
-
net8.0
- NATS.Client.JetStream (>= 2.8.0-preview.2)
-
net9.0
- NATS.Client.JetStream (>= 2.8.0-preview.2)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 1.0.0-preview.7 | 70 | 5/8/2026 |
| 1.0.0-preview.6 | 47 | 5/6/2026 |
| 1.0.0-preview.5 | 70 | 4/29/2026 |
| 1.0.0-preview.4 | 59 | 4/23/2026 |
| 1.0.0-preview.3 | 74 | 3/17/2026 |
| 1.0.0-preview.2 | 67 | 2/23/2026 |
| 1.0.0-preview.1 | 211 | 3/11/2025 |