TurboMqtt 0.2.0
dotnet add package TurboMqtt --version 0.2.0
NuGet\Install-Package TurboMqtt -Version 0.2.0
<PackageReference Include="TurboMqtt" Version="0.2.0" />
paket add TurboMqtt --version 0.2.0
#r "nuget: TurboMqtt, 0.2.0"
// Install TurboMqtt as a Cake Addin #addin nuget:?package=TurboMqtt&version=0.2.0 // Install TurboMqtt as a Cake Tool #tool nuget:?package=TurboMqtt&version=0.2.0
TurboMqtt
TurboMqtt is a high-speed Message Queue Telemetry Transport (MQTT) client designed to support large-scale IOT workloads, handling over 100k msg/s from any MQTT 3.1.1+ broker.
TurboMqtt is written on top of Akka.NET and Akka.Streams, which is the secret to its efficient use of resources and high throughput.
Key Features
- MQTT 3.1.1 support;
- Extremely high performance - hundreds of thousands of messages per second;
- Extremely resource-efficient - pools memory and leverages asynchronous I/O best practices;
- Extremely robust fault tolerance - this is one of Akka.NET's great strengths and we've leveraged it in TurboMqtt;
- Supports all MQTT quality of service levels, with automatic publishing retries for QoS 1 and 2;
- Full OpenTelemetry support;
- Automatic retry-reconnect in broker disconnect scenarios;
- Full support for IAsyncEnumerable and backpressure on the receiver side;
- Automatically de-duplicates packets on the receiver side; and
- Automatically acks QoS 1 and QoS 2 packets.
Simple interface that works at very high rates of speed with minimal resource utilization.
Documentation
- QuickStart
- Performance
- OpenTelemetry Support
- MQTT 3.1.1 Roadmap
- MQTT 5.0 Roadmap
- MQTT over Quic Roadmap
QuickStart
To get started with TurboMqtt:
dotnet add package TurboMqtt
And from there, you can call AddTurboMqttClientFactory
on your IServiceCollection
:
var builder = new HostBuilder();
builder
.ConfigureAppConfiguration(configBuilder =>
{
configBuilder
.AddJsonFile("appsettings.json", optional: false);
})
.ConfigureLogging(logging =>
{
logging.ClearProviders();
logging.AddConsole();
})
.ConfigureServices(s =>
{
s.AddTurboMqttClientFactory();
// HostedService is going to use TurboMqtt
s.AddHostedService<MqttProducerService>();
});
var host = builder.Build();
await host.RunAsync();
And inject IMqttClientFactory
into your ASP.NET Controllers, SignalR Hubs, gRPC services, IHostedService
s, etc and create IMqttClient
instances:
var tcpClientOptions = new MqttClientTcpOptions(config.Host, config.Port);
var clientConnectOptions = new MqttClientConnectOptions(config.ClientId, MqttProtocolVersion.V3_1_1)
{
UserName = config.User,
Password = config.Password
};
await using IMqttClient client = await _clientFactory.CreateTcpClient(clientConnectOptions, tcpClientOptions);
// connect to the broker
var connectResult = await client.ConnectAsync(linkedCts.Token);
if (!connectResult.IsSuccess)
{
_logger.LogError("Failed to connect to MQTT broker at {0}:{1} - {2}", config.Host, config.Port,
connectResult.Reason);
return;
}
Publishing Messages
Publishing messages with TurboMqtt is easy:
foreach (var i in Enumerable.Range(0, config.MessageCount))
{
var msg = new MqttMessage(config.Topic, CreatePayload(i, TargetMessageSize.EightKb))
{
QoS = QualityOfService.AtLeastOnce
};
IPublishResult publishResult = await client.PublishAsync(msg, stoppingToken);
if(i % 1000 == 0)
{
_logger.LogInformation("Published {0} messages", i);
}
}
The IPublishResult.IsSuccess
property will return true
when:
QualityOfService.AtMostOnce
(QoS 0) - as soon as the message has queued for delivery;QualityOfService.AtLeastOnce
(QoS 1) - after we've received aPubAck
from the broker, confirming receipt; andQualityOfService.ExactlyOnce
(QoS 2) - after we've completed the full MQTT QoS 2 exchange and received the finalPubComp
acknowledgement from the broker.
TurboMqtt will automatically retry delivery of messages in the event of overdue ACKs from the broker.
Receiving Messages
TurboMqtt is backpressure-aware and thus exposes the stream of received MqttMessage
s to consumers via System.Threading.Channel<MqttMessage>
:
ISubscribeResult subscribeResult = await client.SubscribeAsync(config.Topic, config.QoS, linkedCts.Token);
if (!subscribeResult.IsSuccess)
{
_logger.LogError("Failed to subscribe to topic {0} - {1}", config.Topic, subscribeResult.Reason);
return;
}
_logger.LogInformation("Subscribed to topic {0}", config.Topic);
var received = 0;
ChannelRead<MqttMessage> receivedMessages = client.ReceivedMessages;
while (await receivedMessages.WaitToReadAsync(stoppingToken))
{
while (receivedMessages.TryRead(out MqttMessage m))
{
_logger.LogInformation("Received message [{0}] for topic [{1}]", m.Payload, m.Topic);
}
}
If we've subscribed using QualityOfService.AtMostOnce
or QualityOfService.ExactlyOnce
, TurboMqtt has already fully ACKed the message for you by the time you receive it and we use a per-topic de-duplication buffer to detect and remove duplicates.
Licensing
TurboMqtt is available under the Apache 2.0 license.
Support
To get support with TurboMqtt, either fill out the help form on Sdkbin or file an issue on the TurboMqtt repository.
TurboMqtt developed and maintained by Petabridge, the company behind Akka.NET.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
-
net8.0
- Akka (>= 1.5.25)
- Akka.Streams (>= 1.5.25)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 8.0.1)
- OpenTelemetry.Api.ProviderBuilderExtensions (>= 1.8.1)
- System.IO.Pipelines (>= 8.0.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
0.2.0 | 334 | 6/17/2024 |
0.1.1 | 372 | 5/2/2024 |
0.1.0 | 101 | 5/1/2024 |
0.1.0-beta3 | 110 | 4/26/2024 |
* License has been migrated to Apache 2.0
* Upgraded to [Akka.NET v1.5.25](https://github.com/akkadotnet/akka.net/releases/tag/1.5.25).