Galosys.Foundation.RabbitMQ.Client
26.5.20.1
dotnet add package Galosys.Foundation.RabbitMQ.Client --version 26.5.20.1
NuGet\Install-Package Galosys.Foundation.RabbitMQ.Client -Version 26.5.20.1
<PackageReference Include="Galosys.Foundation.RabbitMQ.Client" Version="26.5.20.1" />
<PackageVersion Include="Galosys.Foundation.RabbitMQ.Client" Version="26.5.20.1" />
<PackageReference Include="Galosys.Foundation.RabbitMQ.Client" />
paket add Galosys.Foundation.RabbitMQ.Client --version 26.5.20.1
#r "nuget: Galosys.Foundation.RabbitMQ.Client, 26.5.20.1"
#:package Galosys.Foundation.RabbitMQ.Client@26.5.20.1
#addin nuget:?package=Galosys.Foundation.RabbitMQ.Client&version=26.5.20.1
#tool nuget:?package=Galosys.Foundation.RabbitMQ.Client&version=26.5.20.1
Galosys.Foundation.RabbitMQ.Client
�����: ?? �ȶ� �� ������ RabbitMQ �ͻ��ˣ����ӳ� + �����ܵ� + Publisher Confirms + OTel �ɹ۲���
���� RabbitMQ.Client ��������ʵ�֣��̳� Galosys.Foundation.Amqp ����㣬�ṩ���ӳء����������ܵ���Publisher Confirms�����˻��桢DLX/DLQ ���š�OpenTelemetry ��/ָ�����ҵ�����ԡ�
�ܹ�
Galosys.Foundation.Amqp �� ����㣨AmqpTemplate / AmqpPublisher / AmqpConsumer��
��
��
Galosys.Foundation.RabbitMQ.Client �� ��ģ�飨RabbitTemplate / RabbitPublisher / RabbitConsumer��
��������
- ���ӳ� �� ������/�����߶������ӳأ�Round-Robin Channel ���䣬�ź�������
- ֱ��ͨ������ �� ͬͨ�����á�Properties ���á�����������롢Publisher Confirm �� + ��ʱ����
- �Զ����˹��� ��
TopologyCache�����������Ķ���/�������������Զ��ؽ� - DLX/DLQ ���� �� ����ʧ���Զ�·�ɵ����Ž����� + ���Ŷ���
- ָ���˱����� �� ���Ѷ�ʧ���Զ����ԣ�x-delayed-message �����������ת�� DLQ
- ������֧�� �� Ping ѡ���Žڵ㣬�Զ�����ָ�
- OTel �� �� W3C traceparent ������Producer/Consumer Span
- OTel ָ�� �� 7 �� Counter/Histogram/Gauge�������������������ܾ��������������ӳ١�������С����Ծ��������
- ������� ��
RabbitMQHealthCheck�ۺ�������/���������ӳ�״̬��Healthy/Degraded/Unhealthy�� - IMessageBus ��
RabbitMessageBusʵ�� Core ��IMessageBus�ӿڣ�Send + Publish��\n
ǰ������
- RabbitMQ 3.x+ �����
- ����
rabbitmq_delayed_message_exchange���������������Ҫ�� - ���鿪��
rabbitmq_management�������ά��أ�
��װ
<PackageReference Include="Galosys.Foundation.RabbitMQ.Client" Version="x.x.x" />
���ٿ�ʼ
1. ������
{
"RabbitMQ": {
"HostName": "localhost",
"UserName": "guest",
"Password": "guest",
"VirtualHost": "/"
}
}
AppNameĬ��ȡӦ�����ƣ�AutoCreateEnabledĬ�� true �Զ���������/��������
2. ע�����
// ��ʽһ��ģ���Զ����֣��Ƽ���
await Host.CreateDefaultBuilder()
.UseModularization()
.Locate()
.RunAsync();
// ��ʽ�����ֶ�ע��
services.AddRabbitMQ(configuration);
3. ������Ϣ
using RabbitMQ.Client;
public class OrderCreatedMessage
{
public string OrderNo { get; set; }
}
public class OrderService
{
private readonly AmqpTemplate _amqp;
public OrderService(AmqpTemplate amqp) => _amqp = amqp;
// ��������
public async Task SendAsync() =>
await _amqp.SendAsync("order_created", new[] { new OrderCreatedMessage { OrderNo = "ORD001" } });
// ��������
public async Task SendBatchAsync() =>
await _amqp.SendAsync("order_created", new[]
{
new OrderCreatedMessage { OrderNo = "ORD001" },
new OrderCreatedMessage { OrderNo = "ORD002" }
});
// �㲥��Ϣ��fanout��
public async Task BroadcastAsync() =>
await _amqp.BroadcastAsync("order_events", new[] { new OrderCreatedMessage { OrderNo = "ORD001" } });
// �ӳ���Ϣ
public async Task DelayAsync() =>
await _amqp.DelayAsync("order_created", new[] { new OrderCreatedMessage { OrderNo = "ORD001" } }, delayMillionSeconds: 5000);
// �Զ�����Ϣ���ԣ��� MessageId��
public async Task SendWithPropsAsync() =>
await _amqp.SendAsync("order_created", new[] { new OrderCreatedMessage { OrderNo = "ORD001" } },
postMessageProcessor: props =>
{
props.MessageId = Guid.NewGuid().ToString();
props.SetCustomProperty("x-correlation-id", "corr-123");
});
}
4. ������Ϣ
using RabbitMQ.Client;
[Handler]
public class OrderHandler : IMessageHandler
{
[AmqpHandler("order_created")]
public async Task<bool> HandleAsync(IMessage msg)
{
// ������Ϣ
return true; // true = ACK, false = Nack �� ���Ի���� DLQ
}
// �㲥���ѣ�fanout + �־û����У�
[AmqpHandler("order_events", Broadcast = true, Durable = true)]
public async Task<bool> OnEventAsync(IMessage msg)
{
return true;
}
}
ע����ƣ� ���ע
[Handler]�� DI �Զ�ע�������ע[AmqpHandler]�� ����ʱ�Զ�ɨ�趩�ġ�
5. ʹ�� IMessageBus
Outbox 模式下,MessageBusOutboxDecorator 收集消息到 PendingMsgCol(AsyncLocal),OutboxSaveChangesInterceptor 在 SaveChanges 时同事务写入 base_outbox_msg。
public class MyService
{
private readonly IMessageBus _bus;
public MyService(IMessageBus bus) => _bus = bus;
public async Task PublishAsync(MyEvent evt) =>
await _bus.PublishAsync("my_event", evt);
}
��ϸ����
��������ʾ��
{
"RabbitMQ": {
"HostName": "rabbit1, rabbit2, rabbit3",
"UserName": "user",
"Password": "pass",
"VirtualHost": "/",
"AutoCreateEnabled": true,
"ConsumerDispatchConcurrency": 16,
"NetworkRecoveryInterval": 10,
"RequestedHeartbeat": 45,
"ContinuationTimeout": 20,
"Limit": {
"ProducerMaxConnections": 4,
"ProducerChannelMaxPerConnection": 10,
"ConsumerMaxConnections": 4,
"ConsumerChannelMaxPerConnection": 50,
"MaxMessageSize": 65536,
"MaxTps": 0,
"BatchSize": 100,
"BatchTimeoutMs": 50,
"PublishConfirmTimeoutMs": 3000,
"RetryCount": 3,
"RetryBaseDelayMs": 1000,
"PipelineCapacity": 5000
},
"Handlers": {
"order_created": {
"Concurrency": 4,
"MaxRetryCount": 3,
"RetryBaseDelayMs": 1000,
"PrefetchMultiplier": 2,
"DurableQueue": false,
"QueueName": null
}
}
}
}
��������ϸ˵���� Amqp ģ���ĵ���
�ɹ۲���
�������
�Զ�ע�ὡ����飬���� /health ���ɲ鿴 RabbitMQ ״̬��
�����������Ϊ
"rabbitmq"����ǩ["rabbitmq", "messaging"]����̬��Healthy��ȫ������������/ Degraded���������ӶϿ���/ Unhealthy��ȫ���Ͽ�����
OpenTelemetry ��
ActivitySource ���ƣ�Galosys.Foundation.RabbitMQ
| Span | ������ | �ؼ� Tag |
|---|---|---|
rabbitmq.publish |
SendAsync / BroadcastAsync |
messaging.destination��RoutingKey����W3C traceparent |
rabbitmq.consume |
���ѻص� | messaging.destination��RoutingKey����Consumer Tag |
ָ��
Meter ���ƣ�Galosys.Foundation.RabbitMQ
| ָ���� | ���� | ˵�� |
|---|---|---|
rabbitmq.messages.published |
Counter | ������Ϣ�� |
rabbitmq.messages.consumed |
Counter | ������Ϣ�� |
rabbitmq.messages.rejected |
Counter | �ܾ���Ϣ�� |
rabbitmq.messages.retried |
Counter | ������Ϣ�� |
rabbitmq.messages.publish_latency |
Histogram | ������ʱ��ms�� |
rabbitmq.messages.processing_duration |
Histogram | ������ʱ��ms�� |
rabbitmq.batch.size |
Histogram | ������С |
rabbitmq.connections.active |
ObservableGauge | ��Ծ������ |
rabbitmq.channels.active |
ObservableGauge | ��Ծ Channel �� |
������
| ���� | ˵�� |
|---|---|
RabbitTemplate |
ͳһ��Ϣģ�壨Facade������� RabbitPublisher + RabbitConsumer |
RabbitPublisher |
��Ϣ�����������л� + ���� + ֱ��ͨ�����ͣ�ͨ�����á�Properties ���á��������롢ȷ���٣� |
RabbitConsumer |
��Ϣ�����ߣ��ź������� + ָ���˱����� + DLX/DLQ |
RabbitBootstrapper |
������������Ԥ�����ӳ� + ɨ�� Handler + ���Źر� |
ConnectionPool |
�����ӳأ�Round-Robin Channel ���� + �Զ����� |
IConnectionPool |
���ӳؽӿ� |
ChannelLease |
Channel ��Լ��RAII��Dispose �黹�� |
BatchPublishPipeline |
���������ܵ����н� Channel + Publisher Confirms + ���ԣ��������ݣ���ͨ�����Ͳ���ʹ�ã� |
PendingMessage |
�ܵ��еĴ�������Ϣ���� TCS ȷ�ϣ� |
TopologyCache |
���˻��棬ConcurrentDictionary + �����ؽ� |
RabbitMQMeter |
OTel ָ�꣨7 �� Counter/Histogram/Gauge�� |
RabbitMQTracing |
OTel �٣�W3C traceparent ע��/��ȡ�� |
RabbitMQHealthCheck |
������飨�ۺ�������/�����߳�״̬�� |
RabbitMessageBus |
IMessageBus ʵ�֣��㲥������ |
����
Galosys.Foundation.Amqp������㣩Galosys.Foundation.Core��IMessageBus��IObjectSerializer��RabbitMQ.Client��NuGet��
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- Galosys.Foundation.Amqp (>= 26.5.20.1)
- Galosys.Foundation.Core (>= 26.5.20.1)
- rabbitmq.client (>= 7.1.0)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on Galosys.Foundation.RabbitMQ.Client:
| Package | Downloads |
|---|---|
|
Galosys.Foundation.RabbitMQ.Benchmarks
Galosys.Foundation快速开发库 |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 26.5.20.1 | 92 | 5/20/2026 |
| 26.5.19.1 | 93 | 5/19/2026 |
| 26.5.18.1 | 100 | 5/18/2026 |
| 26.5.15.1 | 99 | 5/15/2026 |
| 26.5.12.3 | 94 | 5/12/2026 |
| 26.5.12.2 | 96 | 5/12/2026 |
| 26.4.27.1-rc1 | 112 | 4/26/2026 |
| 26.4.25.1-rc1 | 101 | 4/25/2026 |
| 26.4.22.2-rc7 | 93 | 4/22/2026 |
| 26.4.22.2-rc6 | 94 | 4/22/2026 |
| 26.4.22.2-rc4 | 92 | 4/22/2026 |
| 26.4.22.2-rc3 | 92 | 4/22/2026 |
| 26.4.19.1-rc1 | 133 | 4/19/2026 |