Synadia.Orbit.JetStream.Publisher 1.0.0-preview.4

Prefix Reserved
This is a prerelease version of Synadia.Orbit.JetStream.Publisher.
There is a newer prerelease version of this package available.
See the version list below for details.
dotnet add package Synadia.Orbit.JetStream.Publisher --version 1.0.0-preview.4
                    
NuGet\Install-Package Synadia.Orbit.JetStream.Publisher -Version 1.0.0-preview.4
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Synadia.Orbit.JetStream.Publisher" Version="1.0.0-preview.4" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Synadia.Orbit.JetStream.Publisher" Version="1.0.0-preview.4" />
                    
Directory.Packages.props
<PackageReference Include="Synadia.Orbit.JetStream.Publisher" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Synadia.Orbit.JetStream.Publisher --version 1.0.0-preview.4
                    
#r "nuget: Synadia.Orbit.JetStream.Publisher, 1.0.0-preview.4"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Synadia.Orbit.JetStream.Publisher@1.0.0-preview.4
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Synadia.Orbit.JetStream.Publisher&version=1.0.0-preview.4&prerelease
                    
Install as a Cake Addin
#tool nuget:?package=Synadia.Orbit.JetStream.Publisher&version=1.0.0-preview.4&prerelease
                    
Install as a Cake Tool

JetStream Publisher

Higher-level publishers for NATS JetStream:

  • Backpressure publisher (JetStreamPublisher<T>) — high-throughput publishing with an async ack channel so producers can pipeline without awaiting every ack inline.
  • Atomic batch publisher (NatsJSBatchPublisher) — stage a group of messages and commit them atomically (all-or-nothing), with optional flow control. Requires NATS Server 2.12+ and AllowAtomicPublish = true on the stream. See ADR-50.

Backpressure Publisher

// dotnet add package nats.net
// dotnet add package Synadia.Orbit.JetStream.Publisher
await using var client = new NatsClient();
var js = client.CreateJetStreamContext();

var stream = await js.CreateStreamAsync(new StreamConfig("TEST_STREAM", ["test.>"]));
await stream.PurgeAsync(new StreamPurgeRequest { Keep = 0 });

JetStreamPublisher<string> publisher = client.CreateOrbitJetStreamPublisher<string>();

await publisher.StartAsync();

int numberOfMessages = 1_000_000;

var sub = Task.Run(
    async () =>
    {
        int count = 0;
        await foreach (var status in publisher.SubscribeAsync())
        {
            count++;

            if (!status.Acknowledged)
            {
                Console.WriteLine($"Error publishing message: {status.Subject}: {status.Error.GetType().Name}");
            }

            if (count == numberOfMessages)
            {
                break;
            }
        }
    });

Stopwatch stopwatch = Stopwatch.StartNew();

for (int i = 0; i < numberOfMessages; i++)
{
    await publisher.PublishAsync($"test.msg{i}", $"Test message {i}");
}

await sub;

stopwatch.Stop();

Console.WriteLine($"Took {stopwatch.Elapsed} at {numberOfMessages / stopwatch.Elapsed.TotalSeconds:N0} msgs/sec");

Atomic Batch Publishing

Stage messages with AddAsync / AddMsgAsync and finalize with CommitAsync / CommitMsgAsync. The server persists the batch atomically: either every staged message is written, or none are. Call Discard() to abandon a batch without committing. The default server-side limit is 1000 messages per batch.

// dotnet add package nats.net
// dotnet add package Synadia.Orbit.JetStream.Publisher
await using var client = new NatsClient();
var js = client.CreateJetStreamContext();

await js.CreateStreamAsync(new StreamConfig("ORDERS", ["orders.>"])
{
    AllowAtomicPublish = true,
});

await using var batch = new NatsJSBatchPublisher(js);

await batch.AddAsync("orders.1", "first"u8.ToArray());
await batch.AddAsync("orders.2", "second"u8.ToArray());

NatsJSBatchAck ack = await batch.CommitAsync("orders.3", "third"u8.ToArray());

Console.WriteLine($"Committed {ack.BatchSize} messages as batch {ack.BatchId} to {ack.Stream}");

For a one-shot batch of messages already in memory, use the PublishMsgBatchAsync extension on INatsJSContext:

var messages = new[]
{
    new NatsMsg<byte[]> { Subject = "orders.a", Data = "a"u8.ToArray() },
    new NatsMsg<byte[]> { Subject = "orders.b", Data = "b"u8.ToArray() },
    new NatsMsg<byte[]> { Subject = "orders.c", Data = "c"u8.ToArray() },
};

NatsJSBatchAck ack = await js.PublishMsgBatchAsync(messages);

Per-message options

Pass NatsJSBatchMsgOpts to set per-message TTL and server-side expectations:

await batch.AddAsync(
    "orders.1",
    "first"u8.ToArray(),
    new NatsJSBatchMsgOpts
    {
        Stream = "ORDERS",
        LastSeq = 42,
        Ttl = TimeSpan.FromMinutes(5),
    });

Supported options: Ttl, Stream, LastSeq, LastSubjectSeq, LastSubject.

Flow control

Pass NatsJSBatchFlowControl to the publisher to wait for intermediate acks and avoid overrunning the server:

await using var batch = new NatsJSBatchPublisher(
    js,
    new NatsJSBatchFlowControl
    {
        AckFirst = true,                      // wait for ack on the first message
        AckEvery = 100,                       // then wait every Nth message
        AckTimeout = TimeSpan.FromSeconds(5),
    });

Error handling

All batch publish errors derive from NatsJSBatchPublishException. Catch the base type or a specific subtype per server error code:

Exception Error code
NatsJSBatchPublishNotEnabledException 10174
NatsJSBatchPublishMissingSeqException 10175
NatsJSBatchPublishIncompleteException 10176
NatsJSBatchPublishUnsupportedHeaderException 10177
NatsJSBatchPublishExceedsLimitException 10199

NatsJSBatchClosedException is thrown when using a publisher after commit or discard. NatsJSInvalidBatchAckException is thrown when the server's batch ack response does not match the committed batch.

Product Compatible and additional computed target framework versions.
.NET net5.0 was computed.  net5.0-windows was computed.  net6.0 was computed.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
.NET Core netcoreapp2.0 was computed.  netcoreapp2.1 was computed.  netcoreapp2.2 was computed.  netcoreapp3.0 was computed.  netcoreapp3.1 was computed. 
.NET Standard netstandard2.0 is compatible.  netstandard2.1 is compatible. 
.NET Framework net461 was computed.  net462 was computed.  net463 was computed.  net47 was computed.  net471 was computed.  net472 was computed.  net48 was computed.  net481 was computed. 
MonoAndroid monoandroid was computed. 
MonoMac monomac was computed. 
MonoTouch monotouch was computed. 
Tizen tizen40 was computed.  tizen60 was computed. 
Xamarin.iOS xamarinios was computed. 
Xamarin.Mac xamarinmac was computed. 
Xamarin.TVOS xamarintvos was computed. 
Xamarin.WatchOS xamarinwatchos was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.0.0-preview.7 70 5/8/2026
1.0.0-preview.6 47 5/6/2026
1.0.0-preview.5 70 4/29/2026
1.0.0-preview.4 59 4/23/2026
1.0.0-preview.3 74 3/17/2026
1.0.0-preview.2 67 2/23/2026
1.0.0-preview.1 211 3/11/2025