Springboot RabbitMq源码解析之RabbitListener注解

/ MQ / 没有评论 / 1653浏览

Springboot RabbitMq源码解析之RabbitListener注解

前文

RabbitListener是Springboot RabbitMq中经常用到的一个注解,将被RabbitListener注解的类和方法封装成MessageListener注入MessageListenerContainer。

  1. 当RabbitListener注解在方法上时,对应的方式就是Rabbit消息的监听器。
  2. 当RabbitListener注解在类上时,和RabbitHandle注解配合使用,可以实现不同类型的消息的分发,类中被RabbitHandle注解的方法就是Rabbit消息的监听器。

一、EnableRabbit和RabbitBootstrapConfiguration

在Springboot RabbitMq源码解析之配置类已经提到,在springboot项目中已经通过自动配置类RabbitAutoConfiguration将EnableRabbit引入,而EnableRabbit又通过import注解引入了配置类RabbitBootstrapConfiguration。

@Configuration
public class RabbitBootstrapConfiguration {

  @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
  @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
    return new RabbitListenerAnnotationBeanPostProcessor();
  }

  @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
  public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
    return new RabbitListenerEndpointRegistry();
  }

}

可以看到,其向IOC容器注入了bean后置处理器RabbitListenerAnnotationBeanPostProcessor,同时也注入了一个默认的RabbitListenerEndpointRegistry。

二、RabbitListenerAnnotationBeanPostProcessor

RabbitListenerAnnotationBeanPostProcessor类实现了BeanPostProcessor, Ordered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware, SmartInitializingSingleton接口,Ordered表示处理顺序,BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware主要用于获取对应的BeanFactory,BeanClassLoader, Environment属性,我们主要关注从SmartInitializingSingleton和BeanPostProcessor继承的方法。

1. RabbitListenerAnnotationBeanProcessor#afterSingletonsInstantiated

public void afterSingletonsInstantiated() {
  this.registrar.setBeanFactory(this.beanFactory);

  if (this.beanFactory instanceof ListableBeanFactory) {
    Map<String, RabbitListenerConfigurer> instances =
      ((ListableBeanFactory) this.beanFactory).getBeansOfType(RabbitListenerConfigurer.class);
    for (RabbitListenerConfigurer configurer : instances.values()) {
      configurer.configureRabbitListeners(this.registrar);
    }
  }

  if (this.registrar.getEndpointRegistry() == null) {
    if (this.endpointRegistry == null) {
      Assert.state(this.beanFactory != null,
                   "BeanFactory must be set to find endpoint registry by bean name");
      this.endpointRegistry = this.beanFactory.getBean(
        RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
        RabbitListenerEndpointRegistry.class);
    }
    this.registrar.setEndpointRegistry(this.endpointRegistry);
  }

  if (this.containerFactoryBeanName != null) {
    this.registrar.setContainerFactoryBeanName(this.containerFactoryBeanName);
  }

  // Set the custom handler method factory once resolved by the configurer
  MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
  if (handlerMethodFactory != null) {
    this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);
  }

  // Actually register all listeners
  this.registrar.afterPropertiesSet();

  // clear the cache - prototype beans will be re-cached.
  this.typeCache.clear();
}

初始化工作,主要是基于自定义配置RabbitListenerConfigurer进行RabbitListenerAnnotationBeanPostProcessor(尤其是registrar元素)的初始化。

2. RabbitListenerAnnotationBeanPostProcessor#postProcessBeforeInitialization

@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
  return bean;
}

没有做任何处理。

3. RabbitListenerAnnotationBeanPostProcessor#postProcessAfterInitilization

@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
  Class<?> targetClass = AopUtils.getTargetClass(bean);
  final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
  for (ListenerMethod lm : metadata.listenerMethods) {
    for (RabbitListener rabbitListener : lm.annotations) {
      processAmqpListener(rabbitListener, lm.method, bean, beanName);
    }
  }
  if (metadata.handlerMethods.length > 0) {
    processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
  }
  return bean;
}

RabbitListenerAnnotationBeanPostProcessor#postProcessAfterInitilization是整个类中最核心的方法,对RabbitListener注解查找和解析。具体分析见下文。

三、RabbitListenerAnnotationBeanPostProcessor对RabbitListener注解的解析

RabbitListenerAnnotationBeanPostProcessor#postProcessAfterInitilization方法在上文已经提到,此处针对具体方法逻辑进行解析。

1 RabbitListenerAnnotationBeanPostProcessor#buildMetadata

private TypeMetadata buildMetadata(Class<?> targetClass) {
  Collection<RabbitListener> classLevelListeners = findListenerAnnotations(targetClass);
  final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
  final List<ListenerMethod> methods = new ArrayList<>();
  final List<Method> multiMethods = new ArrayList<>();
  ReflectionUtils.doWithMethods(targetClass, method -> {
    Collection<RabbitListener> listenerAnnotations = findListenerAnnotations(method);
    if (listenerAnnotations.size() > 0) {
      methods.add(new ListenerMethod(
        method,
        listenerAnnotations.toArray(new RabbitListener[listenerAnnotations.size()])));
    }
    if (hasClassLevelListeners) {
      RabbitHandler rabbitHandler = AnnotationUtils.findAnnotation(method, RabbitHandler.class);
      if (rabbitHandler != null) {
        multiMethods.add(method);
      }
    }
  }, ReflectionUtils.USER_DECLARED_METHODS);
  if (methods.isEmpty() && multiMethods.isEmpty()) {
    return TypeMetadata.EMPTY;
  }
  return new TypeMetadata(
    methods.toArray(new ListenerMethod[methods.size()]),
    multiMethods.toArray(new Method[multiMethods.size()]),
    classLevelListeners.toArray(new RabbitListener[classLevelListeners.size()]));
}

前面我们已经提到,RabbitListener有两种用法:

2 RabbitListenerAnnotationBeanPostProcessor的processAmqpListener和processMultiMethodListeners方法

protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
  Method methodToUse = checkProxy(method, bean);
  MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
  endpoint.setMethod(methodToUse);
  endpoint.setBeanFactory(this.beanFactory);
  endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));
  String errorHandlerBeanName = resolveExpressionAsString(rabbitListener.errorHandler(), "errorHandler");
  if (StringUtils.hasText(errorHandlerBeanName)) {
    endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, RabbitListenerErrorHandler.class));
  }
  processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
}

private void processMultiMethodListeners(RabbitListener[] classLevelListeners, Method[] multiMethods,
                                         Object bean, String beanName) {
  List<Method> checkedMethods = new ArrayList<Method>();
  for (Method method : multiMethods) {
    checkedMethods.add(checkProxy(method, bean));
  }
  for (RabbitListener classLevelListener : classLevelListeners) {
    MultiMethodRabbitListenerEndpoint endpoint = new MultiMethodRabbitListenerEndpoint(checkedMethods, bean);
    endpoint.setBeanFactory(this.beanFactory);
    processListener(endpoint, classLevelListener, bean, bean.getClass(), beanName);
  }
}

RabbitListenerAnnotationBeanPostProcessor#processAmqpListener针对被RabbitListener注解的方法进行解析,RabbitListenerAnnotationBeanPostProcessot#processMultiMethodListeners针对RabbitListener注解的类中被RabbitHandle注解的方法进行解析。这两个方法解析目标虽然不同,但逻辑并没有什么实质差别。首先通过checkProxy方法对方法进行校验,然后新建MultiMethodRabbitListenerEndpoint对象,针对两种方式的差异进行部分属性的初始化后交给RabbitListenerAnnotationBeanPostProcessor进行后续处理。checkProxy针对jdk动态代理的情况进行检查,检查代理的目标接口是否含有对应方法。

3 RabbitListenerAnnotationBeanPostProcessor#processListener

protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,Object adminTarget, String beanName) {
  endpoint.setBean(bean);
  endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
  endpoint.setId(getEndpointId(rabbitListener));
  endpoint.setQueueNames(resolveQueues(rabbitListener));
  endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
  String group = rabbitListener.group();
  if (StringUtils.hasText(group)) {
    Object resolvedGroup = resolveExpression(group);
    if (resolvedGroup instanceof String) {
      endpoint.setGroup((String) resolvedGroup);
    }
  }
  String autoStartup = rabbitListener.autoStartup();
  if (StringUtils.hasText(autoStartup)) {
    endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup));
  }

  endpoint.setExclusive(rabbitListener.exclusive());
  String priority = resolve(rabbitListener.priority());
  if (StringUtils.hasText(priority)) {
    try {
      endpoint.setPriority(Integer.valueOf(priority));
    }
    catch (NumberFormatException ex) {
      throw new BeanInitializationException(
        "Invalid priority value for " + rabbitListener + " (must be an integer)", ex);
    }
  }

  String rabbitAdmin = resolve(rabbitListener.admin());
  if (StringUtils.hasText(rabbitAdmin)) {
    Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve RabbitAdmin by bean name");
    try {
      endpoint.setAdmin(this.beanFactory.getBean(rabbitAdmin, RabbitAdmin.class));
    }
    catch (NoSuchBeanDefinitionException ex) {
      throw new BeanInitializationException(
        "Could not register rabbit listener endpoint on [" +
        adminTarget + "], no " + RabbitAdmin.class.getSimpleName() + " with id '" +
        rabbitAdmin + "' was found in the application context", ex);
    }
  }


  RabbitListenerContainerFactory<?> factory = null;
  String containerFactoryBeanName = resolve(rabbitListener.containerFactory());
  if (StringUtils.hasText(containerFactoryBeanName)) {
    Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
    try {
      factory = this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class);
    }
    catch (NoSuchBeanDefinitionException ex) {
      throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
                                            adminTarget + "] for bean " + beanName + ", no " + RabbitListenerContainerFactory.class.getSimpleName() + " with id '" +
                                            containerFactoryBeanName + "' was found in the application context", ex);
    }
  }

  this.registrar.registerEndpoint(endpoint, factory);
}

主要根据RabbitListener注解的属性进行MethodRabbitListenerEndpoint 的属性设置和校验,最后通过RabbitListenerEndpointRegistrar#registerEndpoint方法将MethodRabbitListenerEndpoint 注入容器RabbitListenerContainerFactory。

四、RabbitListenerEndpointRegistrar

1. RabbitListenerEndpointRegistrar#afterPropertiesSet

RabbitListenerEndpointRegistrar同样实现了InitializingBean接口,首先关注重写的afterPropertiesSet方法。

@Override
public void afterPropertiesSet() {
  registerAllEndpoints();
}

protected void registerAllEndpoints() {
  synchronized (this.endpointDescriptors) {
    for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
      this.endpointRegistry.registerListenerContainer(
        descriptor.endpoint, resolveContainerFactory(descriptor));
    }
    this.startImmediately = true;  // trigger immediate startup
  }
}

2. RabbitListenerEndpointRegistrar#registerEndpoint

public void registerEndpoint(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) {
  Assert.notNull(endpoint, "Endpoint must be set");
  Assert.hasText(endpoint.getId(), "Endpoint id must be set");
  // Factory may be null, we defer the resolution right before actually creating the container
  AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory);
  synchronized (this.endpointDescriptors) {
    if (this.startImmediately) { // Register and start immediately
      this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
                                                      resolveContainerFactory(descriptor), true);
    }
    else {
      this.endpointDescriptors.add(descriptor);
    }
  }
}

结合上面的RabbitListenerEndpointRegistrar#afterPropertiesSet可以看出,如果RabbitListenerEndpointRegistrar类已经启动(已经执行了afterPropertiesSet方法),则立即委托RabbitListenerEndpointRegistry#registerListenerContainer方法进行注册,否则加入列表中在afterPropertiesSet方法中进行注册。

无论是RabbitListenerEndpointRegistrar#afterPropertiesSet还是RabbitListenerEndpointRegistrar#registerEndpoint 都通过RabbitListenerEndpointRegistry#registerListenerContainer进行注册监听器的容器,但是两者传入方法的第3个参数不同,分别为true和false。

五、RabbitListenerEndpointRegistry

1. RabbitListenerEndpointRegistry#registerListenerContainer

public void registerListenerContainer(
  RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
  boolean startImmediately) {
  Assert.notNull(endpoint, "Endpoint must not be null");
  Assert.notNull(factory, "Factory must not be null");

  String id = endpoint.getId();
  Assert.hasText(id, "Endpoint id must not be empty");
  synchronized (this.listenerContainers) {
    Assert.state(!this.listenerContainers.containsKey(id),
                 "Another endpoint is already registered with id '" + id + "'");
    MessageListenerContainer container = createListenerContainer(endpoint, factory);
    this.listenerContainers.put(id, container);
    if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
      List<MessageListenerContainer> containerGroup;
      if (this.applicationContext.containsBean(endpoint.getGroup())) {
        containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
      }
      else {
        containerGroup = new ArrayList<MessageListenerContainer>();
        this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
      }
      containerGroup.add(container);
    }
    if (startImmediately) {
      startIfNecessary(container);
    }
  }
}

RabbitListenerEndpointRegistry#registerListenerContainer方法基于RabbitListenerEndpoint根据监听器的容器工厂类生成一个监听器的容器,并且整个注册过程是同步的,同时最多只能有一个endpoint在注册。 可以看到,第三个参数startImmediately的作用就是是否要调用startIfNecessary方法。

private void startIfNecessary(MessageListenerContainer listenerContainer) {
  if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
    listenerContainer.start();
  }
}

2. RabbitListenerEndpointRegistry#start

RabbitListenerEndpointRegistry实现了SmartLifecycle,对start方法进行了重写。

@Override
public void start() {
  for (MessageListenerContainer listenerContainer : getListenerContainers()) {
    startIfNecessary(listenerContainer);
  }
}
private void startIfNecessary(MessageListenerContainer listenerContainer) {
  if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
    listenerContainer.start();
  }
}

上文我们已经了解到,RabbitListenerEndpointRegistry#registerListenerContainer中只有第三个参数我true时才会调用startIfNecessary方法,这是因为在RabbitListenerEndpointRegistry#start方法中本身就已经对listenerContainers中的监听器进行了相应的调用,所以并不是所有的listenerContainer都需要在RabbitListenerEndpointRegistry#registerListenerContainer中通过startIfNecessary方法启动。

当listenerContainer自动启动或者上下文刷新时,调用MessageListenerContainer#start方法。 这样子,我们的逻辑就进入了监听器的启动,可以进入到之前的博客中的Springboot RabbitMq源码解析之消费者容器SimpleMessageListenerContainer的解析过程。