RabbitMQ连接池+SpringBoot实现

/ MQ / 没有评论 / 2251浏览

RabbitMQ连接池+SpringBoot实现

在使用RabbitMQ官方的Client时,Connection对象创建的是TCP连接,TCP连接的创建和销毁本身就是很耗时。因此需要使用连接池技术预生成Connection,每次使用都从连接池中获取Connection。

在使用SpringBoot时,RabbitTemplate比较死板,不能够满足项目中动态创建队列并发送的需求。

解决方案

  1. 创建RabbitMQ连接池管理工具,在网上找到一些关于连接池的示例,在此之上进行了修改与整合。
  2. 整合springboot

直接上代码

pop.xml

<project>
    <groupId>blog</groupId>
    <artifactId>rabbitmq,pool</artifactId>
    <version>1.0-SNAPSHOT</version>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.3.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

</project>
HtRabbitTemplate.java

public class HtRabbitTemplate {

    private ConnectionWrapper connectionWrapper;

    private Client client;

    public HtRabbitTemplate(String url, String username, String password,String managerUrl,String managerUserName,String managerPssword){
        connectionWrapper = new ConnectionWrapper(url,username,password);
        try {
            client = new Client(managerUrl, managerUserName, managerPssword);
        } catch (MalformedURLException|URISyntaxException e) {
            e.printStackTrace();
        }
    }

    /**
     * 发送消息
     * @param queueName 队列名称
     * @param content 内容
     * @param exchange 交换器
     * @param routingKey 路由id
     * @return 是否成功
     */
    public boolean sendMessage(String queueName,String content,String exchange,String routingKey){
        Connection connection = connectionWrapper.getConnection();
        try {
            Channel channel = connection.createChannel();
            channel.exchangeDeclare(exchange, ExchangeTypes.DIRECT,true);
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName,exchange,routingKey);
            channel.confirmSelect();
            channel.basicPublish("", queueName, com.rabbitmq.client.MessageProperties.PERSISTENT_BASIC, content.getBytes("UTF-8"));
            return channel.waitForConfirms();
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            connectionWrapper.releaseConnection(connection);
        }
        return false;
    }

    /**
     * 发送消息队列
     * @param queueName 队列名称
     * @param content 内容
     * @return 是否成功
     */
    public boolean sendMessage(String queueName,String content){
        Connection connection = connectionWrapper.getConnection();
        try {
            Channel channel = connection.createChannel();
            channel.queueDeclare(queueName, true, false, false, null);
            channel.confirmSelect();
            channel.basicPublish("", queueName, com.rabbitmq.client.MessageProperties.PERSISTENT_BASIC, content.getBytes("UTF-8"));
            return channel.waitForConfirms();
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            connectionWrapper.releaseConnection(connection);
        }
        return false;
    }


    /**
     * 判断队列是否存在
     * @param queueName 队列名称
     * @return 是否存在
     */
    public boolean isNotExist(String queueName){
        QueueInfo qi = client.getQueue("/",queueName);
        return (qi==null);
    }

    public List<QueueInfo> getQueueList(String prefix){
        List<QueueInfo> list = client.getQueues();
        List<QueueInfo> _list = new ArrayList<>();
        for (QueueInfo qi:list) {
            if(qi.getName().contains(prefix)){
                _list.add(qi);
            }
        }
        return _list;
    }


    public ConnectionWrapper getConnectionWrapper() {
        return connectionWrapper;
    }
}

RabbitPoolConfig.java

public class RabbitPoolConfig {
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.host}")
    private String url;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${rabbitmq.management.host}")
    private String managerUrl;
    @Value("${rabbitmq.management.account}")
    private String managerUserName;
    @Value("${rabbitmq.management.password}")
    private String managerPssword;


    @Bean
    public HtRabbitTemplate htRabbitTemplate(){

        return new HtRabbitTemplate(url, username, password,managerUrl,managerUserName,managerPssword);
    }

    @Bean
    public Client client(){
        try {
            return new Client(managerUrl,managerUserName,managerPssword);
        } catch (MalformedURLException|URISyntaxException e) {
            e.printStackTrace();
        }
        return null;
    }

本地示例主要实现了消息队列的发送、队列列表的获取等功能。