Springboot RabbitMq源码解析之配置类

/ MQ / 没有评论 / 1672浏览

Springboot RabbitMq源码解析之配置类

1. RabbitAutoConfiguration类

RabbitAutoConfiguration类是springboot的自动配置类。

@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
  //省略具体代码
}

我们从类上的注解可以看出,RabbitAutoConfiguration通过EnableConfigurationProperties注解注入了RabbitProperties类以识别rabbit的相关配置,并且通过import注解注入了另一个rabbit的配置类RabbitAnnotationDrivenConfiguration。另外,RabbitAutoConfiguration包含的3个内部类也都是配置类。

1.1 RabbitConnectionFactoryCreator

可以看出,其基于rabbit的配置文件内容在用户没有自定义的情况下初始化了CachingConnectionFactory类作为rabbit默认的ConnectionFactory。

@Configuration
@ConditionalOnMissingBean(ConnectionFactory.class)
protected static class RabbitConnectionFactoryCreator {

  @Bean
  public CachingConnectionFactory rabbitConnectionFactory(
    RabbitProperties properties) throws Exception {
    PropertyMapper map = PropertyMapper.get();
    CachingConnectionFactory factory = new CachingConnectionFactory(
      getRabbitConnectionFactoryBean(properties).getObject());
    map.from(properties::determineAddresses).to(factory::setAddresses);
    map.from(properties::isPublisherConfirms).to(factory::setPublisherConfirms);
    map.from(properties::isPublisherReturns).to(factory::setPublisherReturns);
    RabbitProperties.Cache.Channel channel = properties.getCache().getChannel();
    map.from(channel::getSize).whenNonNull().to(factory::setChannelCacheSize);
    map.from(channel::getCheckoutTimeout).whenNonNull().as(Duration::toMillis)
      .to(factory::setChannelCheckoutTimeout);
    RabbitProperties.Cache.Connection connection = properties.getCache()
      .getConnection();
    map.from(connection::getMode).whenNonNull().to(factory::setCacheMode);
    map.from(connection::getSize).whenNonNull()
      .to(factory::setConnectionCacheSize);
    return factory;
  }

  private RabbitConnectionFactoryBean getRabbitConnectionFactoryBean(
    RabbitProperties properties) throws Exception {
    PropertyMapper map = PropertyMapper.get();
    RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();
    map.from(properties::determineHost).whenNonNull().to(factory::setHost);
    map.from(properties::determinePort).to(factory::setPort);
    map.from(properties::determineUsername).whenNonNull()
      .to(factory::setUsername);
    map.from(properties::determinePassword).whenNonNull()
      .to(factory::setPassword);
    map.from(properties::determineVirtualHost).whenNonNull()
      .to(factory::setVirtualHost);
    map.from(properties::getRequestedHeartbeat).whenNonNull()
      .asInt(Duration::getSeconds).to(factory::setRequestedHeartbeat);
    RabbitProperties.Ssl ssl = properties.getSsl();
    if (ssl.isEnabled()) {
      factory.setUseSSL(true);
      map.from(ssl::getAlgorithm).whenNonNull().to(factory::setSslAlgorithm);
      map.from(ssl::getKeyStoreType).to(factory::setKeyStoreType);
      map.from(ssl::getKeyStore).to(factory::setKeyStore);
      map.from(ssl::getKeyStorePassword).to(factory::setKeyStorePassphrase);
      map.from(ssl::getTrustStoreType).to(factory::setTrustStoreType);
      map.from(ssl::getTrustStore).to(factory::setTrustStore);
      map.from(ssl::getTrustStorePassword).to(factory::setTrustStorePassphrase);
    }
    map.from(properties::getConnectionTimeout).whenNonNull()
      .asInt(Duration::toMillis).to(factory::setConnectionTimeout);
    factory.afterPropertiesSet();
    return factory;
  }
}

1.2 RabbitTemplateConfiguration

基于ConnectionFactory, 在用户没有自定义的场景下,生成默认的RabbitTemplate和amqpAdmin 因此,需要通过import引入RabbitConnectionFactoryCreator配置类

@Configuration
@Import(RabbitConnectionFactoryCreator.class)
protected static class RabbitTemplateConfiguration {

  private final ObjectProvider<MessageConverter> messageConverter;

  private final RabbitProperties properties;

  public RabbitTemplateConfiguration(
    ObjectProvider<MessageConverter> messageConverter,
    RabbitProperties properties) {
    this.messageConverter = messageConverter;
    this.properties = properties;
  }

  @Bean
  @ConditionalOnSingleCandidate(ConnectionFactory.class)
  @ConditionalOnMissingBean(RabbitTemplate.class)
  public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    PropertyMapper map = PropertyMapper.get();
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    MessageConverter messageConverter = this.messageConverter.getIfUnique();
    if (messageConverter != null) {
      template.setMessageConverter(messageConverter);
    }
    template.setMandatory(determineMandatoryFlag());
    RabbitProperties.Template properties = this.properties.getTemplate();
    if (properties.getRetry().isEnabled()) {
      template.setRetryTemplate(createRetryTemplate(properties.getRetry()));
    }
    map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis)
      .to(template::setReceiveTimeout);
    map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis)
      .to(template::setReplyTimeout);
    map.from(properties::getExchange).to(template::setExchange);
    map.from(properties::getRoutingKey).to(template::setRoutingKey);
    return template;
  }

  private boolean determineMandatoryFlag() {
    Boolean mandatory = this.properties.getTemplate().getMandatory();
    return (mandatory != null ? mandatory : this.properties.isPublisherReturns());
  }

  private RetryTemplate createRetryTemplate(RabbitProperties.Retry properties) {
    PropertyMapper map = PropertyMapper.get();
    RetryTemplate template = new RetryTemplate();
    SimpleRetryPolicy policy = new SimpleRetryPolicy();
    map.from(properties::getMaxAttempts).to(policy::setMaxAttempts);
    template.setRetryPolicy(policy);
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    map.from(properties::getInitialInterval).whenNonNull().as(Duration::toMillis)
      .to(backOffPolicy::setInitialInterval);
    map.from(properties::getMultiplier).to(backOffPolicy::setMultiplier);
    map.from(properties::getMaxInterval).whenNonNull().as(Duration::toMillis)
      .to(backOffPolicy::setMaxInterval);
    template.setBackOffPolicy(backOffPolicy);
    return template;
  }

  @Bean
  @ConditionalOnSingleCandidate(ConnectionFactory.class)
  @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
  @ConditionalOnMissingBean(AmqpAdmin.class)
  public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
    return new RabbitAdmin(connectionFactory);
  }
}

1.3 MessagingTemplateConfiguration

基于IOC容器中RabbitTemplate 生成RabbitMessagingTemplate,因此需要通过import引入RabbitTemplateConfiguration配置类

@Configuration
@ConditionalOnClass(RabbitMessagingTemplate.class)
@ConditionalOnMissingBean(RabbitMessagingTemplate.class)
@Import(RabbitTemplateConfiguration.class)
protected static class MessagingTemplateConfiguration {

  @Bean
  @ConditionalOnSingleCandidate(RabbitTemplate.class)
  public RabbitMessagingTemplate rabbitMessagingTemplate(
    RabbitTemplate rabbitTemplate) {
    return new RabbitMessagingTemplate(rabbitTemplate);
  }

}

2. RabbitAnnotationDrivenConfiguration

2.1 RabbitAnnotationDrivenConfiguration

配置类RabbitAnnotationDrivenConfiguration在自动配置类RabbitAutoConfiguration中通过import被注入 该配置的主要目的是注入工厂RabbitListenerContainerFactory。

分成两步,第一步在用户没有自定义的情况下注入默认的configurer,然后基于configurer注入RabbitListenerContainerFactory。

分成两类,分别是simple类型和direct类型。以simple类型的为例:

@Bean
@ConditionalOnMissingBean
public SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurer() {
  SimpleRabbitListenerContainerFactoryConfigurer configurer = new SimpleRabbitListenerContainerFactoryConfigurer();
  configurer.setMessageConverter(this.messageConverter.getIfUnique());
  configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique());
  configurer.setRabbitProperties(this.properties);
  return configurer;
}

@Bean(name = "rabbitListenerContainerFactory")
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple", matchIfMissing = true)
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
  SimpleRabbitListenerContainerFactoryConfigurer configurer,
  ConnectionFactory connectionFactory) {
  SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  configurer.configure(factory, connectionFactory);
  return factory;
}

另外,RabbitAnnotationDrivenConfiguration还有一个内部配置类EnableRabbitConfiguration

2.2 EnableRabbitConfiguration

EnableRabbitConfiguration配置类中没有任何的属性和方法,该配置类的主要目的就是为了添加EnableRabbit注解,已启用RabbitListener注解的解析和使用。

@Configuration
@EnableRabbit
@ConditionalOnMissingBean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
protected static class EnableRabbitConfiguration {

}

在EnableRabbit注解中通过import引入了一个新的配置类RabbitBootstrapConfiguration

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(RabbitBootstrapConfiguration.class)
public @interface EnableRabbit {
}

3. RabbitBootstrapConfiguration

RabbitBootstrapConfiguration配置类主要是注入processor用于解析RabbitListener注解,同时注入一个默认的RabbitListenerEndpointRegistry。

@Configuration
public class RabbitBootstrapConfiguration {

  @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
  @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
    return new RabbitListenerAnnotationBeanPostProcessor();
  }

  @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
  public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
    return new RabbitListenerEndpointRegistry();
  }
}