RabbitMQ连接池+SpringBoot实现
在使用RabbitMQ官方的Client时,Connection对象创建的是TCP连接,TCP连接的创建和销毁本身就是很耗时。因此需要使用连接池技术预生成Connection,每次使用都从连接池中获取Connection。
在使用SpringBoot时,RabbitTemplate比较死板,不能够满足项目中动态创建队列并发送的需求。
解决方案
- 创建RabbitMQ连接池管理工具,在网上找到一些关于连接池的示例,在此之上进行了修改与整合。
- 整合springboot
直接上代码
pop.xml
<project>
<groupId>blog</groupId>
<artifactId>rabbitmq,pool</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.3.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
</project>
HtRabbitTemplate.java
public class HtRabbitTemplate {
private ConnectionWrapper connectionWrapper;
private Client client;
public HtRabbitTemplate(String url, String username, String password,String managerUrl,String managerUserName,String managerPssword){
connectionWrapper = new ConnectionWrapper(url,username,password);
try {
client = new Client(managerUrl, managerUserName, managerPssword);
} catch (MalformedURLException|URISyntaxException e) {
e.printStackTrace();
}
}
/**
* 发送消息
* @param queueName 队列名称
* @param content 内容
* @param exchange 交换器
* @param routingKey 路由id
* @return 是否成功
*/
public boolean sendMessage(String queueName,String content,String exchange,String routingKey){
Connection connection = connectionWrapper.getConnection();
try {
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchange, ExchangeTypes.DIRECT,true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName,exchange,routingKey);
channel.confirmSelect();
channel.basicPublish("", queueName, com.rabbitmq.client.MessageProperties.PERSISTENT_BASIC, content.getBytes("UTF-8"));
return channel.waitForConfirms();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
} finally {
connectionWrapper.releaseConnection(connection);
}
return false;
}
/**
* 发送消息队列
* @param queueName 队列名称
* @param content 内容
* @return 是否成功
*/
public boolean sendMessage(String queueName,String content){
Connection connection = connectionWrapper.getConnection();
try {
Channel channel = connection.createChannel();
channel.queueDeclare(queueName, true, false, false, null);
channel.confirmSelect();
channel.basicPublish("", queueName, com.rabbitmq.client.MessageProperties.PERSISTENT_BASIC, content.getBytes("UTF-8"));
return channel.waitForConfirms();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
} finally {
connectionWrapper.releaseConnection(connection);
}
return false;
}
/**
* 判断队列是否存在
* @param queueName 队列名称
* @return 是否存在
*/
public boolean isNotExist(String queueName){
QueueInfo qi = client.getQueue("/",queueName);
return (qi==null);
}
public List<QueueInfo> getQueueList(String prefix){
List<QueueInfo> list = client.getQueues();
List<QueueInfo> _list = new ArrayList<>();
for (QueueInfo qi:list) {
if(qi.getName().contains(prefix)){
_list.add(qi);
}
}
return _list;
}
public ConnectionWrapper getConnectionWrapper() {
return connectionWrapper;
}
}
RabbitPoolConfig.java
public class RabbitPoolConfig {
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.host}")
private String url;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${rabbitmq.management.host}")
private String managerUrl;
@Value("${rabbitmq.management.account}")
private String managerUserName;
@Value("${rabbitmq.management.password}")
private String managerPssword;
@Bean
public HtRabbitTemplate htRabbitTemplate(){
return new HtRabbitTemplate(url, username, password,managerUrl,managerUserName,managerPssword);
}
@Bean
public Client client(){
try {
return new Client(managerUrl,managerUserName,managerPssword);
} catch (MalformedURLException|URISyntaxException e) {
e.printStackTrace();
}
return null;
}
本地示例主要实现了消息队列的发送、队列列表的获取等功能。
- springboot整合源码下载地址:http://download.csdn.net/download/u013623958/10249930
- rabbitmq连接池下载地址:http://download.csdn.net/download/yinch2/10129047
本文由 创作,采用 知识共享署名4.0 国际许可协议进行许可。本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名。最后编辑时间为: 2021/06/22 07:46