Spring-rabbit之@RabbitListener解析

/ MQJava / 没有评论 / 2399浏览

rabbitmq中消费者可使用@RabbitListener标注的方法进行处理,这里介绍下@RabbitListener解析过程。

解析类

RabbitListenerAnnotationBeanPostProcessor类用于解析RabbitListener注解,该类实现了BeanPostProcessor,Ordered,BeanFactoryAware,BeanClassLoaderAware,EnvironmentAware说明该类具有以下特性:

  1. 是一个Bean后处理,@RabbitListener标注的类(或方法所在类)必须可被Bean实例化,也就是要么在@Bean方法中手动创建实例,或者用@@Component标注。
  2. 可设定Bean初始化优先级
  3. 注入BeanFactory,Environment,可以获取Bean工厂中的Bean实例和环境变量
  4. 单例实例化后处理
  5. 解析入口方法:postProcessAfterInitialization

查找RabbitListener标注的类或方法

  1. Bean工厂类中的Bean都是通过动态代理创建的代理类,要获取目标类,先使用AopUtils
    Class<?> targetClass = AopUtils.getTargetClass(bean);
  1. 获取目标类中RabbitListener注解

有2种方式配置RabbitListener,一种使用RabbitListener标注类且使用RabbitHandler标注方法,另一种使用RabbitListener标注方法,两种模式同时存在也可以。使用RabbitHandler标注的方法和RabbitListener标注的方法都会被当做消费者的消息监听方法

//查找RabbitListener标注的类
Collection<RabbitListener> classLevelListeners = findListenerAnnotations(targetClass);

ReflectionUtils.doWithMethods(targetClass, new ReflectionUtils.MethodCallback() {
    @Override
    public void doWith(Method method) 
        throws IllegalArgumentException, IllegalAccessException {
        //查找RabbitListener标注的方法
        Collection<RabbitListener> listenerAnnotations = 
            findListenerAnnotations(method);
        if (listenerAnnotations.size() > 0) {
            methods.add(new ListenerMethod(method, listenerAnnotations.toArray(
                new RabbitListener[listenerAnnotations.size()])));
        }
        //存在RabbitListener标注的类
        if (hasClassLevelListeners) {
            //查找RabbitHandler标注的方法
            RabbitHandler rabbitHandler = 
                AnnotationUtils.findAnnotation(method, RabbitHandler.class);
            if (rabbitHandler != null) {
                multiMethods.add(method);
            }
        }
    }
}, ReflectionUtils.USER_DECLARED_METHODS);//查找范围是除了Object类之外的方法

处理@RabbitListener标注的方法

@RabbitListener中可使用SpEL表达式

protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,Object adminTarget, String beanName) {
    endpoint.setBean(bean);
    endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
    //Id
    endpoint.setId(getEndpointId(rabbitListener));
    //解析队列
    endpoint.setQueueNames(resolveQueues(rabbitListener));
    //独占队列
    endpoint.setExclusive(rabbitListener.exclusive());
    //分组
    if (StringUtils.hasText(group)) {
        Object resolvedGroup = resolveExpression(group);
        if (resolvedGroup instanceof String) {
            endpoint.setGroup((String) resolvedGroup);
        }
    }
    //优先级
    String priority = resolve(rabbitListener.priority());
    if (StringUtils.hasText(priority)) {
        endpoint.setPriority(Integer.valueOf(priority));
    }
    //查找rabbitAdmin
    String rabbitAdmin = resolve(rabbitListener.admin());
    if (StringUtils.hasText(rabbitAdmin)) {
        endpoint.setAdmin(this.beanFactory.getBean(rabbitAdmin, RabbitAdmin.class));
    }

    //查找containerFactory
    String containerFactoryBeanName = resolve(rabbitListener.containerFactory());
    if (StringUtils.hasText(containerFactoryBeanName)) {
        factory = this.beanFactory.getBean(
            containerFactoryBeanName, RabbitListenerContainerFactory.class);
    }
    //将endpoint关联到containerFactory中
    this.registrar.registerEndpoint(endpoint, factory);
}

private String[] resolveQueues(RabbitListener rabbitListener) {
    String[] queues = rabbitListener.queues();
    QueueBinding[] bindings = rabbitListener.bindings();
    //queues和bindings都配置会抛出异常
    if (queues.length > 0 && bindings.length > 0) {
        throw new BeanInitializationException(
            "@RabbitListener can have 'queues' or 'bindings' but not both");
    }
    List<String> result = new ArrayList<String>();
    if (queues.length > 0) {
        for (int i = 0; i < queues.length; i++) {
            //队列名称可为SpEL表达式,如#{myQueue},注意不能带.,如my.queue(可能会报错)
            Object resolvedValue = resolveExpression(queues[i]);
            // 解析队列名称,resolvedValue可为String[],
            // Queue,String,最终都会解析为String,存放到result中
            resolveAsString(resolvedValue, result);
        }
    }else {
        return registerBeansForDeclaration(rabbitListener);
    }
    return result.toArray(new String[result.size()]);
}

private String[] registerBeansForDeclaration(RabbitListener rabbitListener) {
    List<String> queues = new ArrayList<String>();
    if (this.beanFactory instanceof ConfigurableBeanFactory) {
        for (QueueBinding binding : rabbitListener.bindings()) {
            //声明队列
            String queueName = declareQueue(binding);
            queues.add(queueName);
            //声明Exchange和Binding
            declareExchangeAndBinding(binding, queueName);
        }
    }
    return queues.toArray(new String[queues.size()]);
}
//具体声明队列和Exchange及Binding的代码可以去看源码

处理@RabbitListener标注的类和@RabbitHandler标注的方法

解析原理同上,只是解析信息封装在MultiMethodRabbitListenerEndpoint中,上面的是MethodRabbitListenerEndpoint.

这里要说明下注解中均可以使用SpEL表达式,SpEL表达式可以解析BeanFactory和Environment中(spring.rabbitmq.emptyStringArguments(多个,分隔))的数据,如我们自己创建的Queue、Exchaneg、Binding等Bean这里都可以通过表达式引用(定义好Bean名称),还比如我们使用@Queue、@Exchange、@binding配置的属性,也可以通过表达式引用,这些配置值可以写在配置文件中。

  1. beanFactory:DefaultListableBeanFactory实例,Bean工厂
  2. resolver:StandardBeanExpressionResolver,通过SpEL表达式解析Bean实例
  3. expressionContext:BeanExpressionContext,封装了beanFactory,Bean实例上下文,提供SpEL表达式解析
  4. environment:StandardServletEnvironment