Quick.RabbitMQPlus
1.0.5
See the version list below for details.
dotnet add package Quick.RabbitMQPlus --version 1.0.5
NuGet\Install-Package Quick.RabbitMQPlus -Version 1.0.5
<PackageReference Include="Quick.RabbitMQPlus" Version="1.0.5" />
paket add Quick.RabbitMQPlus --version 1.0.5
#r "nuget: Quick.RabbitMQPlus, 1.0.5"
// Install Quick.RabbitMQPlus as a Cake Addin #addin nuget:?package=Quick.RabbitMQPlus&version=1.0.5 // Install Quick.RabbitMQPlus as a Cake Tool #tool nuget:?package=Quick.RabbitMQPlus&version=1.0.5
1、🍟Quick.RabbitMQPlus使用说明
该组件是基于RabbitMQ.Client
和Furion
组件进行封装使用的,目的是为了结合.Net Core更简单的使用RabbitMQ!!!
功能说明:
- 支持
发布订阅模式
、路由模式
,通配符模式
和Headers属性模式
; - 可根据配置文件读取RabbitMQ连接的各个配置;
- 可根据实体定义的特性发布和订阅消息;
- 支持配置将多个队列绑定到交换机;
- 一个消费端支持可以同时消费多个多列的消息等。
2、🍖安装
安装命令如下所示:
Install-Package Quick.RabbitMQPlus
3、🧀生产端
3.1、🥞配置appsettings.json
在appsettings.json
配置文件中创建节点QuickRabbitMQPlus
>QuickRabbitMQPlusConfigs
,QuickRabbitMQPlusConfigs为数组类型(即可配置多个RabbitMQ服务地址),具体配置如下所示:
{
"QuickRabbitMQPlus": {
"QuickRabbitMQPlusConfigs": [
{
"ConnId": 1,
"UserName": "admin",
"Password": "123456",
"HostName": "192.168.3.1",
"Port": 5672,
"ExchangeName": "TestExchangeName",
"QueueNames": [ "TestRabbitMQName1", "TestRabbitMQName2" ],
"RouteKey": "TestRouteKey",
"ExchangeDurable": true,
"QueueDurable": true,
"MessageDurable": true
},
{
"ConnId": 2,
"UserName": "admin",
"Password": "123456",
"HostName": "192.168.3.2",
"Port": 5672,
"ExchangeName": "TestExchangeName",
"QueueNames": [ "TestRabbitMQName1", "TestRabbitMQName2" ],
"RouteKey": "TestRouteKey",
"ExchangeDurable": true,
"QueueDurable": true,
"MessageDurable": true
}
]
}
}
配置说明(消费端通用):
属性名称 | 属性说明 | 是否必填 | 备注 |
---|---|---|---|
ConnId | 连接配置Id,唯一Id | √ | 需要和QuickRabbitMQPlusConfig特性中的ConnId一致 |
ExchangeType | 交换机类型(fanout:发布订阅模式、direct:路由模式、topic:通配符模式、headers:属性匹配模式) | 默认为fanout,目前使用最多的模式 | |
UserName | RabbitMQ连接账户 | √ | |
Password | RabbitMQ连接密码 | √ | |
HostName | RabbitMQ连接IP | √ | |
Port | RabbitMQ连接端口 | 不填就是默认端口5672 | |
ExchangeName | 交换机名称 | √ | |
QueueNames | 队列名称集合(与交换机ExchangeName进行绑定),如果同时设置了实体特性的队列名称集合,那么会优先采用实体的队列集合 | √ | 此处为集合,目的是在发布消息时将消息存储到该队列集合中去 |
RouteKey | 路由名称(或通配符名称) | 需要注意的是,当ExchangeType="direct/topic"时,RouteKey需要有值(但不是非必须)。如果RouteKey为空,默认会采用交换机的名称。如果实体特性也设置了该参数,会优先采用实体的特性值。 | |
ExchangeDurable | 交换机是否持久化,默认为true | 如果采用默认的设置,配置文件可以不要该属性 | |
QueueDurable | 队列是否持久化,默认为true | 如果采用默认的设置,配置文件可以不要该属性 | |
MessageDurable | 消息是否持久化,默认为true | 如果采用默认的设置,配置文件可以不要该属性 |
3.2、🍞配置Program.cs
由于我们使用的是Furion,因此,我们可在程序启动文件中配置如下代码(具体可参考Furion入门指南),目的是注册配置选项QuickRabbitMQPlusOptions:
Serve.Run(RunOptions.DefaultSilence.ConfigureBuilder(builder =>
{
//注册RabbitMQ连接配置对象
builder.Services.AddConfigurableOptions<QuickRabbitMQPlusOptions>();
}).Configure(app =>
{
}));
3.3、🧀定义发送消息实体
如下所示我们可以定义一个消息实体:
namespace Quick.RabbitMQPlus.Publisher
{
[QuickRabbitMQPlusConfig(
connId: 2,
exchangeType: "fanout",
exchangeName: "TestExchangeName",
queueName: "TestRabbitMQName1,TestRabbitMQName2",
routeKey: "TestExchangeName",
exchangeDurable: 1,
queueDurable: 1,
messageDurable: 1)]
public class TestRabbitMQModel
{
public int UserId { get; set; }
public string UserName { get; set; }
public int UserAge { get; set; }
public DateTime CreateTime { get; set; }
}
}
实体特性配置说明(消费端通用):
属性名称 | 属性说明 | 是否必填 | 备注 |
---|---|---|---|
connId | 连接配置Id | √ | 需要和配置文件中的ConnId一致 |
exchangeType | 交换机类型(fanout:发布订阅模式、direct:路由模式、topic:通配符模式、headers:属性匹配模式) | 默认为fanout,目前使用最多的模式 | |
exchangeName | 交换机名称 | 如果实体特性没有定义该值,那么会采用配置文件中的值 | |
queueName | 队列名称(多个队列名称请使用英文逗号,分隔) | 如果同时设置了实体特性的队列名称和配置中的_QueueNames_属性,那么会优先采用实体的队列名称 | |
routeKey | 路由名称(或通配符名称) | 需要注意的是,当ExchangeType="direct/topic"时,RouteKey需要有值(但不是非必须)。如果RouteKey为空,默认会采用交换机的名称。如果实体特性也设置了该参数,会优先采用实体的特性值。如果该实体特性没有设置,会采用配置文件中的值。 | |
exchangeDurable | 交换机是否持久化,默认为2(不设置) | 默认为2(0:不持久化、1:持久化、2:不设置【以配置文件为主】) | |
queueDurable | 队列是否持久化,默认为2(不设置) | 默认为2(0:不持久化、1:持久化、2:不设置【以配置文件为主】) | |
messageDurable | 消息是否持久化,默认为2(不设置) | 默认为2(0:不持久化、1:持久化、2:不设置【以配置文件为主】) |
3.4、🥐发送消息Demo
如下所示为具体的发送消息代码:
发送单条消息:
//定义发送对象
var sendInstance = QuickRabbitMQPlusInstance<TestRabbitMQModel>.Instance();
//发送10条数据
for (int i = 0; i < 10; i++)
{
var msgModel = new TestRabbitMQModel
{
UserId = rand.Next(1, 9999),
UserName = "Quick" + (i + 1),
UserAge = rand.Next(20, 80),
CreateTime = DateTime.Now
};
var sendRet = await sendInstance.Send(msgModel);
if (sendRet.Item1)
{
//发送成功
//消息发送完成后,关闭通道(根据实际情况自行决定要不要调用关闭方法)
sendInstance.Close();
}
else
{
//发送失败
var errMsg = $"失败原因:{sendRet.Item2}";
}
//间隔2秒发送一次
await Task.Delay(2000);
}
发送多条消息:
var sendList = new List<TestRabbitMQModel>{
new TestRabbitMQModel(),
new TestRabbitMQModel()
};
var sendRet = await sendInstance.Send(sendList);
4、🥪消费端
4.1、🍝配置appsettings.json
在appsettings.json
配置文件中创建节点QuickRabbitMQPlus
>QuickRabbitMQPlusConfigs
,QuickRabbitMQPlusConfigs为数组类型(即可配置多个RabbitMQ服务地址),具体配置如下所示:
{
"QuickRabbitMQPlus": {
"QuickRabbitMQPlusConfigs": [
{
"ConnId": 1,
"ExchangeType": "fanout",
"UserName": "admin",
"Password": "123456",
"HostName": "192.168.3.1",
"Port": 5672,
"ExchangeName": "TestExchangeName",
//"QueueNames": [ "TestRabbitMQName1", "TestRabbitMQName2" ],//此处注释掉的原因是
"RouteKey": "TestRouteKey",
//"ExchangeDurable": true,
//"QueueDurable": true,
//"MessageDurable": true
},
{
"ConnId": 1,
"ExchangeType": "fanout",
"UserName": "admin",
"Password": "123456",
"HostName": "192.168.3.2",
"Port": 5672,
"ExchangeName": "TestExchangeName",
//"QueueNames": [ "TestRabbitMQName1", "TestRabbitMQName2" ],
"RouteKey": "TestRouteKey",
//"ExchangeDurable": true,
//"QueueDurable": true,
//"MessageDurable": true
}
]
}
}
配置说明:
配置说明请参见生产端
的具体说明,需要注意的是,如果消费端中的QueueNames属性或者实体特性queueName设置了多个队列,就代表这一个消费端同时接收多个队列的消息
4.2、🌮配置Program.cs
由于我们使用的是Furion,因此,我们可在程序启动文件中配置如下代码(具体可参考Furion入门指南),目的是注册配置选项QuickRabbitMQPlusOptions:
Serve.Run(RunOptions.DefaultSilence.ConfigureBuilder(builder =>
{
//注册RabbitMQ连接配置对象
builder.Services.AddConfigurableOptions<QuickRabbitMQPlusOptions>();
}).Configure(app =>
{
}));
4.3、🧆定义接收消息实体
如下所示我们可以定义3个消息实体(第一个用于接收队列TestRabbitMQName1
的消息,第二个用于接收队列TestRabbitMQName2
的消息,第三个用于接收队列TestRabbitMQName1
和TestRabbitMQName2
):
namespace Quick.RabbitMQPlus.Consumer
{
[QuickRabbitMQPlusConfig(connId: 2, queueName: "TestRabbitMQName1")]
public class TestRabbitMQModel1
{
public int UserId { get; set; }
public string UserName { get; set; }
public int UserAge { get; set; }
public DateTime CreateTime { get; set; }
}
}
namespace Quick.RabbitMQPlus.Consumer
{
[QuickRabbitMQPlusConfig(connId: 2, queueName: "TestRabbitMQName2")]
public class TestRabbitMQModel2
{
public int UserId { get; set; }
public string UserName { get; set; }
public int UserAge { get; set; }
public DateTime CreateTime { get; set; }
}
}
namespace Quick.RabbitMQPlus.Consumer
{
[QuickRabbitMQPlusConfig(connId: 2, queueName: "TestRabbitMQName1,TestRabbitMQName2")]
public class TestRabbitMQModel3
{
public int UserId { get; set; }
public string UserName { get; set; }
public int UserAge { get; set; }
public DateTime CreateTime { get; set; }
}
}
实体特性配置说明:
实体特性配置说明请参见生产端
的具体说明。
4.4、🍨接收消息Demo
定义两个消费端,一个消费端消费一个队列,具体的接收消息代码如下所示(接收单条消息):
//接收队列1的消息
var retRec1 = await QuickRabbitMQPlusInstance<TestRabbitMQModel1>.Instance().Receive(async (data, msg) =>
{
await Task.Delay(1000);
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"\r\n队列1消息:{msg}");
//返回true代表业务逻辑处理成功,会告知MQ这条消息已经接收成功,会从MQ队列中删除
//返回false代表业务逻辑处理失败,会告知MQ这条消息没有处理成功,则MQ会继续推送这条消息
return true;
}, 1);
if (!retRec1.Item1)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine($"\r\n队列1接收失败:{retRec1.Item2}");
}
//接收队列2的消息
var retRec2 = await QuickRabbitMQPlusInstance<TestRabbitMQModel2>.Instance().Receive(async (data, msg) =>
{
await Task.Delay(2500);
Console.ForegroundColor = ConsoleColor.Magenta;
Console.WriteLine($"\r\n队列2消息:{msg}");
//返回true代表业务逻辑处理成功,会告知MQ这条消息已经接收成功,会从MQ队列中删除
//返回false代表业务逻辑处理失败,会告知MQ这条消息没有处理成功,则MQ会继续推送这条消息
return true;
});
if (!retRec2.Item1)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine($"\r\n队列2接收失败:{retRec2.Item2}");
}
定义一个消费端,同时消费两个队列,具体的接收消息代码如下所示(接收单条消息):
//接收队列1的消息
var retRec = await QuickRabbitMQPlusInstance<TestRabbitMQModel3>.Instance().Receive(async (data, msg) =>
{
await Task.Delay(1000);
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"\r\n队列1、2消息:{msg}");
//返回true代表业务逻辑处理成功,会告知MQ这条消息已经接收成功,会从MQ队列中删除
//返回false代表业务逻辑处理失败,会告知MQ这条消息没有处理成功,则MQ会继续推送这条消息
return true;
}, 1);
if (!retRec.Item1)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine($"\r\n队列1、2接收失败:{retRec.Item2}");
}
如果需要接收多条消息,请使用Receives
方法:
//接收队列1的消息
var retRec = await QuickRabbitMQPlusInstance<TestRabbitMQModel3>.Instance().Receives(async (dataList, msg) =>
{
//此处的dataList为List<TestRabbitMQModel3>
return true;
}, 1);
4.5、🍱预览效果
<div style="background:#000;padding:10px;"> <div><font color="DarkCyan">####消费端已启动####</div></br> <div><font color="Green">队列1消息:{"UserId":8739,"UserName":"CCCC1","UserAge":40,"CreateTime":"2022-09-06 10:44:37"}</div></br> <div><font color="Magenta">队列2消息:{"UserId":8739,"UserName":"CCCC1","UserAge":40,"CreateTime":"2022-09-06 10:44:37"}</div></br> <div><font color="Green">队列1消息:{"UserId":8646,"UserName":"CCCC2","UserAge":54,"CreateTime":"2022-09-06 10:44:39"}</div></br> <div><font color="Magenta">队列2消息:{"UserId":8646,"UserName":"CCCC2","UserAge":54,"CreateTime":"2022-09-06 10:44:39"}</div></br> <div><font color="Magenta">队列2消息:{"UserId":1966,"UserName":"CCCC3","UserAge":21,"CreateTime":"2022-09-06 10:44:41"}</div></br> <div><font color="Green">队列1消息:{"UserId":1966,"UserName":"CCCC3","UserAge":21,"CreateTime":"2022-09-06 10:44:41"}</div></br> <div><font color="Magenta">队列2消息:{"UserId":8349,"UserName":"CCCC4","UserAge":25,"CreateTime":"2022-09-06 10:44:43"}</div></br> <div><font color="Green">队列1消息:{"UserId":8349,"UserName":"CCCC4","UserAge":25,"CreateTime":"2022-09-06 10:44:43"}</div></br> <div><font color="Magenta">队列2消息:{"UserId":3770,"UserName":"CCCC5","UserAge":31,"CreateTime":"2022-09-06 10:44:45"}</div></br> <div><font color="Green">队列1消息:{"UserId":3770,"UserName":"CCCC5","UserAge":31,"CreateTime":"2022-09-06 10:44:45"}</div></br> </div>
5、Quick.RabbitMQPlus方法
首先声明Quick.RabbitMQPlus的实例化对象:
var mqInstance = QuickRabbitMQPlusInstance<T>.Instance();
其次就可以使用使用该实例化对象中的发送和接收方法了,具体说明如下所示:
方法名称 方法说明 方法参数 备注 Send 发送消息方法,支持单条消息和多条消息的发送 (data) 方法参数data可以为T或List<T> Receive 接收消息(单条消息) (received, prefetchCount) 方法的第一个参数为回调函数,该回调函数包含2个返回数据(第一个为T,第二个为T对应的字符串),并且该回调函数需要返回bool类型(以便告诉RabbitMQ服务该消息是否处理成功);<br />方法的第二个参数为设置RabbitMQ一次最多推送多少条消息给消费者,默认为10 Receives 接收消息(多条消息) (received, prefetchCount) 方法的第一个参数为回调函数,该回调函数包含2个返回数据(第一个为List<T>,第二个为List<T>对应的字符串),并且该回调函数需要返回bool类型(以便告诉RabbitMQ服务该消息是否处理成功);<br />方法的第二个参数为设置RabbitMQ一次最多推送多少条消息给消费者,默认为10 Close 关闭连接 注意,如果调用了该方法,又想重新使用实例化对象mqInstance发送或接收消息,需要重新实例化该对象。
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net6.0 is compatible. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
-
net6.0
- Furion (>= 4.4.0)
- Newtonsoft.Json (>= 13.0.1)
- RabbitMQ.Client (>= 6.4.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.