Adaptare 2.2.0

dotnet add package Adaptare --version 2.2.0
                    
NuGet\Install-Package Adaptare -Version 2.2.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Adaptare" Version="2.2.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Adaptare" Version="2.2.0" />
                    
Directory.Packages.props
<PackageReference Include="Adaptare" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Adaptare --version 2.2.0
                    
#r "nuget: Adaptare, 2.2.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Adaptare@2.2.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Adaptare&version=2.2.0
                    
Install as a Cake Addin
#tool nuget:?package=Adaptare&version=2.2.0
                    
Install as a Cake Tool

Adaptare

A lightweight, transport-agnostic .NET message queue abstraction library for sending, receiving, and processing messages across different transports (Direct/in-process, NATS, RabbitMQ).

Overview

Adaptare provides a small set of core abstractions that let you implement and register message handlers and processors without coupling your application logic to a specific transport.

Key abstractions:

  • IMessageSender — Publish, Send, or Request messages (the multiplexer is registered at startup).
  • IMessageReceiver<TSubscriptionSettings> — Subscribe to messages for a specific transport.
  • IMessageExchange — Match a subject/header (MatchAsync) and return a transport-specific IMessageSender.
  • IMessageHandler<T> / IMessageProcessor<T,TR> — Server-side handler/processor.
  • ISubscribeRegistration / IMessageQueueBackgroundRegistration — Background registration and subscription lifecycle management.

Register AddMessageQueue() during startup to add the multiplexer sender, and then add transports with their respective extension methods.

Quick start

dotnet restore
dotnet build -c Release
dotnet test -c Release --no-restore

There is a Direct transport sample under samples/Adaptare.Sample.Direct demonstrating a minimal in-process flow.

Getting started (DI and startup)

Adaptare uses a builder pattern that integrates with Microsoft DI. Register the MessageQueue in your application startup:

builder.Services
    .AddMessageQueue();

Select transports and register handlers/processors with the transport-specific extensions. Direct transport example:

builder.Services
    .AddMessageQueue()
    .AddDirectMessageQueue(cfg => cfg
        .AddProcessor<ProcessorType>("subject1")
        .AddHandler<HandlerType>("subject2")
        .AddReplyHandler<HandlerType2>("subject3"))
    .AddDirectGlobPatternExchange("*");

NATS and RabbitMQ have analogous Add{Transport}MessageQueue(...) extension points.

AddMessageQueue() registers a MultiplexerMessageSender as IMessageSender. The multiplexer uses the configured exchanges (MessageExchangeOptions.Exchanges) and selects the first matching IMessageExchange by invoking MatchAsync.

Message semantics

API methods on IMessageSender express different messaging semantics:

  • PublishAsync: fire-and-forget (one-way).
  • RequestAsync<TMessage, TReply>: Request/Reply: waits for a TReply. If the reply contains MQ_Fail in the header, a MessageProcessFailException is thrown at the caller.
  • SendAsync: transport-specific semantics; some transports (for example, NATS) support sending raw bytes and receiving replies; others may throw NotSupportedException.

Example usage:

var messageSender = serviceProvider.GetRequiredService<IMessageSender>();
await messageSender.PublishAsync("subject1", requestBytes, headers, cancellationToken);

var reply = await messageSender.RequestAsync<byte[], MyReply>("subject1", requestBytes, headers);

Headers, tracing, and failures

Use MessageHeaderValue to carry message metadata. The library provides TraceContextPropagator to inject and extract distributed tracing context from headers (Activity/TraceContext propagation).

Common header keys are defined in MessageHeaderValueConsts:

  • MQ_Ask_Id, MQ_Reply_Id, MQ_Reply, and MQ_Fail.

If a remote handler needs to indicate failure, it sets MQ_Fail on the reply header. The request caller then receives MessageProcessFailException.

Subscriptions and ack behavior

Subscriptions are registered with ISubscribeRegistration, and background registrations create actual consumers. Typical RabbitMQ scenarios:

  • SubscribeRegistration<TMessage, THandler> — Basic handler with AutoAck set to true or false. If AutoAck=false, the consumer must call BasicAck on success and can call BasicNack on failure.
  • AcknowledgeSubscribeRegistration<TMessage, THandler> — Provides advanced ack control using IAcknowledgeMessage<T> and IAcknowledgeMessageHandler<T> (Ack, Nak, progress reports, aborts).

Background registration implementations include NatsBackgroundRegistration and RabbitMQBackgroundRegistration.

Adding transports or exchanges

To add a new exchange or transport:

  1. Implement IMessageExchange:
    • MatchAsync(subject, header) — when this exchange should handle the subject.
    • GetMessageSenderAsync(subject, IServiceProvider) — return a transport-specific IMessageSender.
  2. Register your exchange via MessageQueueConfiguration.AddExchange(exchange) or implement an Add{Transport}MessageQueue extension.

Note the multiplexer uses the first matching exchange, so the registration order affects routing. Use MessageQueueConfiguration.PushExchange if you need to prioritize an exchange.

Serializers

Each transport exposes a serializer registry (for example, IRabbitMQSerializerRegistry and INatsSerializerRegistry). Use GetSerializer<T> and GetDeserializer<T> for typed serialization and deserialization.

Testing

  • Tests use xUnit + NSubstitute and are located under Adaptare.*.UnitTests.
  • During unit tests, use AddFake{Transport}MessageQueue to avoid starting background registrations or creating real network connections (see AddFakeNatsMessageQueue and AddFakeRabbitMessageQueue).

Troubleshooting & common issues

  • MessageSenderNotFoundException: thrown when no exchange matches the subject — verify you registered an exchange (or used Add*GlobPatternExchange) and check exchange order.
  • MessageProcessFailException: thrown when a reply header contains MQ_Fail.
  • RabbitMQ ack issues: if AutoAck = false, ensure BasicAck is called when processing succeeds, otherwise messages will be requeued.
  • NATS connectivity issues: verify INatsConnectionManager configuration and registerName settings.

Contributing & code style

See .github/copilot-instructions.md for Copilot guidance and contributor guidelines. Short code style pointers:

  • PascalCase for types, I prefix for interfaces.
  • Private static fields start with _, non-static private fields with m_ (project convention).
  • Tabs for indentation (size 4), lines limited to 100 characters, CRLF line endings.

When adding a new transport or feature, prefer adding an IMessageExchange implementation and writing appropriate tests.


Snippets

Dependency Injection:

builder.Services
    .AddMessageQueue();

Configure a transport (Direct):

builder.Services
    .AddMessageQueue()
    .AddDirectMessageQueue(config => config
        .AddProcessor<ProcessorType>("subject1")
        .AddHandler<HandlerType>("subject2")
        .AddReplyHandler<HandlerType2>("subject3"))
    .AddDirectGlobPatternExchange("*");

Sending messages:

var messageSender = serviceProvider.GetRequiredService<IMessageSender>();

var request = new SendMessageType
{
    // fill request payload
};

await messageSender.PublishAsync("subject1", request.ToByteArray(), cancellationToken).ConfigureAwait(false);

// or

var responseData = await messageSender.RequestAsync<byte[]>("subject1", request.ToByteArray(), cancellationToken).ConfigureAwait(false);

// parse response if typed
var response = ReceiveType.Parser.ParseFrom(responseData.Span);

Receiving messages:

internal class ProcessorType : IMessageProcessor
{
    public async ValueTask<ReadOnlyMemory<byte>> HandleAsync(ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default)
    {
        var request = SendMessageType.Parser.ParseFrom(data.Span);

        // ...process request

        return response.ToByteArray();
    }
}

internal class HandlerType : IMessageHandler
{
    public async ValueTask HandleAsync(ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default)
    {
        var request = SendMessageType.Parser.ParseFrom(data.Span);

        // ...process request
    }
}

License

MIT

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (3)

Showing the top 3 NuGet packages that depend on Adaptare:

Package Downloads
Adaptare.RabbitMQ

Adaptare is a library developed to abstract the sending, receiving, and processing of message transmission.

Adaptare.Direct

Adaptare is a library developed to abstract the sending, receiving, and processing of message transmission.

Adaptare.Nats

Adaptare is a library developed to abstract the sending, receiving, and processing of message transmission.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2.2.0 162 12/12/2025
2.1.0 257 11/24/2025
2.0.2 580 7/28/2025
2.0.1 175 7/28/2025
2.0.0 325 5/18/2025
1.0.1-alpha 192 5/17/2025
1.0.0 225 1/12/2025