Springboot RabbitMq源码解析之RabbitListener的MessageListener#onMessage解析

/ MQ / 没有评论 / 1581浏览

Springboot RabbitMq源码解析之RabbitListener的MessageListener#onMessage解析

前文

在Springboot RabbitMq源码解析之RabbitListener注解中,我们解析了Springboot中是如何根据RabbitListener注解生成MessageListenerContainer。

在Springboot RabbitMq源码解析之消费者容器SimpleMessageListenerContainer,我们以SimpleMessageListenerContainer解析了MessageListener是如何消息rabbitmq消息的。对于messageListener类型的监听器,SimpleMessageListenerContainer最终是通过MessagListener#onMessage方法进行了消息的消费逻辑。

接下来,我们关注的重点是在RabbitListener注解的MessageListener#onMessage处理逻辑。

一、 生成MessageListenerContainer

从上文可以看到,在 RabbitListenerEndpointRegistry#registerListenerContainer中的通过MessageListenerContainer container = createListenerContainer(endpoint, factory);生成MessageListenerContainer。

1. RabbitListenerEndpointRegistry#createListenerContainer

protected MessageListenerContainer createListenerContainer(
  RabbitListenerEndpoint endpoint,RabbitListenerContainerFactory<?> factory) {
  MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
  if (listenerContainer instanceof InitializingBean) {
    try {
      ((InitializingBean) listenerContainer).afterPropertiesSet();
    }
    catch (Exception ex) {
      throw new BeanInitializationException("Failed to initialize message listener container", ex);
    }
  }

  int containerPhase = listenerContainer.getPhase();
  if (containerPhase < Integer.MAX_VALUE) {  // a custom phase value
    if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {
      throw new IllegalStateException(
        "Encountered phase mismatch between container factory definitions: " +
        this.phase + " vs " + containerPhase);
    }
    this.phase = listenerContainer.getPhase();
  }

  return listenerContainer;
}

点击MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);继续跟踪。

2. AbstractRabbitListenerContainerFactory#createListenerContainer

@Override
public C createListenerContainer(RabbitListenerEndpoint endpoint) {
  C instance = createContainerInstance();

  if (this.connectionFactory != null) {
    instance.setConnectionFactory(this.connectionFactory);
  }
  if (this.errorHandler != null) {
    instance.setErrorHandler(this.errorHandler);
  }
  if (this.messageConverter != null) {
    instance.setMessageConverter(this.messageConverter);
  }
  if (this.acknowledgeMode != null) {
    instance.setAcknowledgeMode(this.acknowledgeMode);
  }
  if (this.channelTransacted != null) {
    instance.setChannelTransacted(this.channelTransacted);
  }
  if (this.applicationContext != null) {
    instance.setApplicationContext(this.applicationContext);
  }
  if (this.taskExecutor != null) {
    instance.setTaskExecutor(this.taskExecutor);
  }
  if (this.transactionManager != null) {
    instance.setTransactionManager(this.transactionManager);
  }
  if (this.prefetchCount != null) {
    instance.setPrefetchCount(this.prefetchCount);
  }
  if (this.defaultRequeueRejected != null) {
    instance.setDefaultRequeueRejected(this.defaultRequeueRejected);
  }
  if (this.adviceChain != null) {
    instance.setAdviceChain(this.adviceChain);
  }
  if (this.recoveryBackOff != null) {
    instance.setRecoveryBackOff(this.recoveryBackOff);
  }
  if (this.mismatchedQueuesFatal != null) {
    instance.setMismatchedQueuesFatal(this.mismatchedQueuesFatal);
  }
  if (this.missingQueuesFatal != null) {
    instance.setMissingQueuesFatal(this.missingQueuesFatal);
  }
  if (this.consumerTagStrategy != null) {
    instance.setConsumerTagStrategy(this.consumerTagStrategy);
  }
  if (this.idleEventInterval != null) {
    instance.setIdleEventInterval(this.idleEventInterval);
  }
  if (this.failedDeclarationRetryInterval != null) {
    instance.setFailedDeclarationRetryInterval(this.failedDeclarationRetryInterval);
  }
  if (this.applicationEventPublisher != null) {
    instance.setApplicationEventPublisher(this.applicationEventPublisher);
  }
  if (endpoint.getAutoStartup() != null) {
    instance.setAutoStartup(endpoint.getAutoStartup());
  }
  else if (this.autoStartup != null) {
    instance.setAutoStartup(this.autoStartup);
  }
  if (this.phase != null) {
    instance.setPhase(this.phase);
  }
  if (this.afterReceivePostProcessors != null) {
    instance.setAfterReceivePostProcessors(this.afterReceivePostProcessors);
  }
  instance.setListenerId(endpoint.getId());

  endpoint.setupListenerContainer(instance);
  initializeContainer(instance, endpoint);

  return instance;
}

AbstractRabbitListenerContainerFactory是MessageListener的抽象工厂类,方法中的绝大多数内容都只是给对象属性赋值,需要关注的是endpoint.setupListenerContainer(instance);语句。

3. AbstractRabbitListenerEndpoint#setListenerContainer

@Override
public void setupListenerContainer(MessageListenerContainer listenerContainer) {
  AbstractMessageListenerContainer container = (AbstractMessageListenerContainer) listenerContainer;

  boolean queuesEmpty = getQueues().isEmpty();
  boolean queueNamesEmpty = getQueueNames().isEmpty();
  if (!queuesEmpty && !queueNamesEmpty) {
    throw new IllegalStateException("Queues or queue names must be provided but not both for " + this);
  }
  if (queuesEmpty) {
    Collection<String> names = getQueueNames();
    container.setQueueNames(names.toArray(new String[names.size()]));
  }
  else {
    Collection<Queue> instances = getQueues();
    container.setQueues(instances.toArray(new Queue[instances.size()]));
  }

  container.setExclusive(isExclusive());
  if (getPriority() != null) {
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-priority", getPriority());
    container.setConsumerArguments(args);
  }

  if (getAdmin() != null) {
    container.setRabbitAdmin(getAdmin());
  }
  setupMessageListener(listenerContainer);
}
protected abstract MessageListener createMessageListener(MessageListenerContainer container);

private void setupMessageListener(MessageListenerContainer container) {
  MessageListener messageListener = createMessageListener(container);
  Assert.state(messageListener != null, () -> "Endpoint [" + this + "] must provide a non null message listener");
  container.setupMessageListener(messageListener);
}

二、 生成MessageListener

可以看到,上文中生成MessageListener的关键方法AbstractRabbitListenerEndpoint#createMessageListener是一个抽象方法。AbstractRabbitListenerEndpoint有3个子类,分别SimpleRabbitListenerEndpoint,MethodRabbitListenerEndpoint和MultiMethodRabbitListenerEndpoint,其中MultiMethodRabbitListenerEndpoint是MethodRabbitListenerEndpoint的子类。回到上文RabbitListenerAnnotationBeanPostProcessor的processAmqpListener和processMultiMethodListeners可以看到,RabbitListener注解对应的endpoint有两种。

  1. 当RabbitListener注解在方法上时,对应的endpoint就是MethodRabbitListenerEndpoint。
  2. 当RabbitListener注解在类上时,和RabbitHandle注解配合使用,对应的endpoint就是MultiMethodRabbitListenerEndpoint。

1. MethodRabbitListenerEndpoint#createMessageListener

@Override
protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container) {
  Assert.state(this.messageHandlerMethodFactory != null,
               "Could not create message listener - MessageHandlerMethodFactory not set");
  MessagingMessageListenerAdapter messageListener = createMessageListenerInstance();
  messageListener.setHandlerMethod(configureListenerAdapter(messageListener));
  String replyToAddress = getDefaultReplyToAddress();
  if (replyToAddress != null) {
    messageListener.setResponseAddress(replyToAddress);
  }
  MessageConverter messageConverter = container.getMessageConverter();
  if (messageConverter != null) {
    messageListener.setMessageConverter(messageConverter);
  }
  if (getBeanResolver() != null) {
    messageListener.setBeanResolver(getBeanResolver());
  }
  return messageListener;
}

/**
 * Create a {@link HandlerAdapter} for this listener adapter.
 * @param messageListener the listener adapter.
 * @return the handler adapter.
 */
protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter messageListener) {
  InvocableHandlerMethod invocableHandlerMethod =
    this.messageHandlerMethodFactory.createInvocableHandlerMethod(getBean(), getMethod());
  return new HandlerAdapter(invocableHandlerMethod);
}

/**
 * Create an empty {@link MessagingMessageListenerAdapter} instance.
 * @return the {@link MessagingMessageListenerAdapter} instance.
 */
protected MessagingMessageListenerAdapter createMessageListenerInstance() {
  return new MessagingMessageListenerAdapter(this.bean, this.method, this.returnExceptions, this.errorHandler);
}

可以看到RabbitListener注解对应的MessageListener类型是MessagingMessageListenerAdapter。

2. MultiMethodRabbitListenerEndpoint#configureListenerAdapter

MultiMethodRabbitListenerEndpoint作为MethodRabbitListenerEndpoint的子类,其createMessageListener方法与父类相同,不过其重写了其中的configureListenerAdapter方法。

@Override
protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter messageListener) {
  List<InvocableHandlerMethod> invocableHandlerMethods = new ArrayList<InvocableHandlerMethod>();
  for (Method method : this.methods) {
    invocableHandlerMethods.add(
      getMessageHandlerMethodFactory().createInvocableHandlerMethod(getBean(), method));
  }
  this.delegatingHandler = new DelegatingInvocableHandler(
    invocableHandlerMethods, getBean(), getResolver(),getBeanExpressionContext());
  return new HandlerAdapter(this.delegatingHandler);
}

三. HandlerAdapter类

可以看到MethodRabbitListenerEndpoint和MultiMethodRabbitListenerEndpoint对应的都是MessagingMessageListenerAdapter,彼此的最大区别就是handlerMethod不同,为了保持该属性的类型一致,特地新建了一个适配器类:HandlerAdapter。

HandlerAdapter的属性很简单,只有两个:InvocableHandlerMethod和DelegatingInvocableHandler。

public Object invoke(Message<?> message, Object... providedArgs) throws Exception {
  if (this.invokerHandlerMethod != null) {
    return this.invokerHandlerMethod.invoke(message, providedArgs);
  }
  else {
    return this.delegatingHandler.invoke(message, providedArgs);
  }
}

1. InvocableHandlerMethod类

InvocableHandlerMethod类是HandlerMethod类的子类,核心方法是invoke方法,基于接收到的Message和参数执行rabbit的监听方法。

@Nullable
public Object invoke(Message<?> message, Object... providedArgs) throws Exception {
  Object[] args = getMethodArgumentValues(message, providedArgs);
  if (logger.isTraceEnabled()) {
    logger.trace("Invoking '" + ClassUtils.getQualifiedMethodName(getMethod(), getBeanType()) +
                 "' with arguments " + Arrays.toString(args));
  }
  Object returnValue = doInvoke(args);
  if (logger.isTraceEnabled()) {
    logger.trace(
      "Method [" + ClassUtils.getQualifiedMethodName(getMethod(), getBeanType()) +
      "] returned [" + returnValue + "]");
  }
  return returnValue;
}

2. DelegatingInvocableHandler类

当RabbitListener注解在类上时,可以同时有多个被RabbitHandle注解的方法用于处理同意queue的消息,因此在DelegatingInvocableHandler类中一个属性rivate final List handlers;为InvocableHandlerMethod的集合,同样的核心方法也是invoke方法,多了一个根据消息内容的类型选择对应的InvocableHandlerMethod的逻辑。

public Object invoke(Message<?> message, Object... providedArgs) throws Exception {
  Class<? extends Object> payloadClass = message.getPayload().getClass();
  InvocableHandlerMethod handler = getHandlerForPayload(payloadClass);
  Object result = handler.invoke(message, providedArgs);
  if (message.getHeaders().get(AmqpHeaders.REPLY_TO) == null) {
    Expression replyTo = this.handlerSendTo.get(handler);
    if (replyTo != null) {
      result = new AbstractAdaptableMessageListener.ResultHolder(result, replyTo);
    }
  }
  return result;
}

四、MessagingMessageListenerAdapter类

@Override
public void onMessage(org.springframework.amqp.core.Message amqpMessage, Channel channel) throws Exception {
  Message<?> message = toMessagingMessage(amqpMessage);
  if (logger.isDebugEnabled()) {
    logger.debug("Processing [" + message + "]");
  }
  try {
    Object result = invokeHandler(amqpMessage, channel, message);
    if (result != null) {
      handleResult(result, amqpMessage, channel, message);
    }
    else {
      logger.trace("No result object given - no result to handle");
    }
  }
  catch (ListenerExecutionFailedException e) {
    if (this.errorHandler != null) {
      try {
        Object result = this.errorHandler.handleError(amqpMessage, message, e);
        if (result != null) {
          handleResult(result, amqpMessage, channel, message);
        }
        else {
          logger.trace("Error handler returned no result");
        }
      }
      catch (Exception ex) {
        returnOrThrow(amqpMessage, channel, message, ex, ex);
      }
    }
    else {
      returnOrThrow(amqpMessage, channel, message, e.getCause(), e);
    }
  }
}

可以看出,4个核心步骤委托给其它方法处理

  1. Message<?> message = toMessagingMessage(amqpMessage); 消息类型转换
  2. Object result = invokeHandler(amqpMessage, channel, message); 执行消费方法
  3. handleResult(result, amqpMessage, channel, message); 处理执行结果以发送响应
  4. Object result = this.errorHandler.handleError(amqpMessage, message, e); 第(2)步异常时的异常处理
  5. returnOrThrow(amqpMessage, channel, message, ex, ex);异常抛出或返回

1 Message<?> message = toMessagingMessage(amqpMessage);

protected Message<?> toMessagingMessage(org.springframework.amqp.core.Message amqpMessage) {
  return (Message<?>) getMessagingMessageConverter().fromMessage(amqpMessage);
}

进行消息类型转换,将org.springframework.amqp.core.Message类型通过messagingMessageConverter解析成org.springframework.messaging.Message类型,这一点与消息发送时的RabbitTemplate#convertAndSend中的getRequiredMessageConverter().toMessage(object, new MessageProperties())相对应。

2 Object result = invokeHandler(amqpMessage, channel, message);

private Object invokeHandler(org.springframework.amqp.core.Message amqpMessage, Channel channel,
                             Message<?> message) {
  try {
    return this.handlerMethod.invoke(message, amqpMessage, channel);
  }
  catch (MessagingException ex) {
    throw new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " +
                                                                           "be invoked with the incoming message", message.getPayload()), ex, amqpMessage);
  }
  catch (Exception ex) {
    throw new ListenerExecutionFailedException("Listener method '" +
                                               this.handlerMethod.getMethodAsString(message.getPayload()) + "' threw exception", ex, amqpMessage);
  }
}

进行消息消费,执行被RabbitListener或RabbitHandler注解的方法逻辑,对于出现的异常封装成ListenerExecutionFailedException抛出。

3 handleResult(result, amqpMessage, channel, message);

protected void handleResult(Object resultArg, Message request, Channel channel, Object source) throws Exception {
  if (channel != null) {
    if (this.logger.isDebugEnabled()) {
      this.logger.debug("Listener method returned result [" + resultArg
                        + "] - generating response message for it");
    }
    try {
      Object result = resultArg instanceof ResultHolder ? ((ResultHolder) resultArg).result : resultArg;
      Message response = buildMessage(channel, result);
      postProcessResponse(request, response);
      Address replyTo = getReplyToAddress(request, source, resultArg);
      sendResponse(channel, replyTo, response);
    }
    catch (Exception ex) {
      throw new ReplyFailureException("Failed to send reply with payload '" + resultArg + "'", ex);
    }
  }
  else if (this.logger.isWarnEnabled()) {
    this.logger.warn("Listener method returned result [" + resultArg
                     + "]: not generating response message for it because no Rabbit Channel given");
  }
}

如果消息的消费方法或者异常处理有执行结果或者异常需要返回时,进入该方法进行响应处理 如果rabbit channel存在,将执行结果封装后作为响应消息返回。

4 Object result = this.errorHandler.handleError(amqpMessage, message, e);

@FunctionalInterface
public interface RabbitListenerErrorHandler {
  /**
	 * Handle the error. If an exception is not thrown, the return value is returned to
	 * the sender using normal {@code replyTo/@SendTo} semantics.
	 * @param amqpMessage the raw message received.
	 * @param message the converted spring-messaging message.
	 * @param exception the exception the listener threw, wrapped in a
	 * {@link ListenerExecutionFailedException}.
	 * @return the return value to be sent to the sender.
	 * @throws Exception an exception which may be the original or different.
	 */
  Object handleError(
    Message amqpMessage, org.springframework.messaging.Message<?> message,
    ListenerExecutionFailedException exception) throws Exception;

}

当消息的消费方法抛出异常且RabbitListener注解配置了errorHandler时,进入RabbitListenerErrorHandler 的异常处理逻辑。 如果异常处理逻辑有返回值,将其作为消费方法的执行结果需要做(3)中的响应处理。

5 returnOrThrow(amqpMessage, channel, message, ex, ex)

private void returnOrThrow(
  org.springframework.amqp.core.Message amqpMessage, Channel channel, Message<?> message,
  Throwable throwableToReturn, Exception exceptionToThrow) throws Exception {
  if (!this.returnExceptions) {
    throw exceptionToThrow;
  }
  try {
    handleResult(new RemoteInvocationResult(throwableToReturn), amqpMessage, channel, message);
  }
  catch (ReplyFailureException rfe) {
    if (void.class.equals(this.handlerMethod.getReturnType(message.getPayload()))) {
      throw exceptionToThrow;
    }
    else {
      throw rfe;
    }
  }
}

如果异常处理过程中出现异常或者没有异常处理时,进入该方法进行异常的抛出或返回。 根据RabbitListener注解的returnExceptions属性进行判断,如果为false则直接抛出异常,否则将异常封装成RemoteInvocationResult作为消息消费的执行结果进行(3)中的响应处理。

五、SendTo注解和消息响应

前面分析到,当消费方法的执行结果满足下列条件之一时,会发送响应消息。

  1. 消费方法有执行结果
  2. 消费方法抛出异常且RabbitListener配置了errorHandler,此时errorHandler的异常处理有返回结果
  3. 消费方法抛出异常且RabbitListener没有配置errorHandler(或配置了errorHandler但异常处理也跑出来异常)且RabbitListener的returnExceptions的属性值为true

事实上,我们可以通过与@SendTo注解结合将执行结果发给指定目标,具体方式如下:

  1. RabbitListener注解在方法上时,在同一方法上加上@SendTo注解
  2. RabbitListener注解在类上时,在同一类上加上@SendTo注解或者在该类中被RabbitHandler注解的方法上加上@SendTo注解

同时,需要注意,此时SendTo的value值的数组长度不能超过1,并且SendTo的值支持Bean Spel表达式,但是返回值必须为String或者Address类型。

接下来,同样分两种情况看看代码的逻辑是如何实现的。

1. MessagingMessageListenerAdapter获取目标地址

AbstractAdaptableMessageListener#getReplyToAddress中获取回复地址的逻辑

protected Address getReplyToAddress(Message request, Object source, Object result) throws Exception {
  Address replyTo = request.getMessageProperties().getReplyToAddress();
  if (replyTo == null) {
    if (this.responseAddress == null && this.responseExchange != null) {
      this.responseAddress = new Address(this.responseExchange, this.responseRoutingKey);
    }
    if (result instanceof ResultHolder) {
      replyTo = evaluateReplyTo(request, source, result, ((ResultHolder) result).sendTo);
    }
    else if (this.responseExpression != null) {
      replyTo = evaluateReplyTo(request, source, result, this.responseExpression);
    }
    else if (this.responseAddress == null) {
      throw new AmqpException(
        "Cannot determine ReplyTo message property value: " +
        "Request message does not contain reply-to property, " +
        "and no default response Exchange was set.");
    }
    else {
      replyTo = this.responseAddress;
    }
  }
  return replyTo;
}

可以看出优先级依次是

  1. 在消息中的messageProperties中配置replyTo属性。
  2. 执行结果为ResultHolder类型,根据其sendTo进行取值。
  3. 根据responseExpression属性
  4. 根据responseAddress属性 其余情况下则抛出异常

2. RabbitListener注解在方法上

先看MethodRabbitListenerEndpoint#createMessageListener方法中的String replyToAddress = getDefaultReplyToAddress()。

@Override
protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container) {
  Assert.state(this.messageHandlerMethodFactory != null,
               "Could not create message listener - MessageHandlerMethodFactory not set");
  MessagingMessageListenerAdapter messageListener = createMessageListenerInstance();
  messageListener.setHandlerMethod(configureListenerAdapter(messageListener));
  String replyToAddress = getDefaultReplyToAddress();
  if (replyToAddress != null) {
    messageListener.setResponseAddress(replyToAddress);
  }
  MessageConverter messageConverter = container.getMessageConverter();
  if (messageConverter != null) {
    messageListener.setMessageConverter(messageConverter);
  }
  if (getBeanResolver() != null) {
    messageListener.setBeanResolver(getBeanResolver());
  }
  return messageListener;
}
private String getDefaultReplyToAddress() {
  Method method = getMethod();
  if (method != null) {
    SendTo ann = AnnotationUtils.getAnnotation(method, SendTo.class);
    if (ann != null) {
      String[] destinations = ann.value();
      if (destinations.length > 1) {
        throw new IllegalStateException(
          "Invalid @" + SendTo.class.getSimpleName() + " annotation on '"
          + method + "' one destination must be set (got " + Arrays.toString(destinations) + ")");
      }
      return destinations.length == 1 ? resolve(destinations[0]) : "";
    }
  }
  return null;
}

可以看出,当配置了SendTo注解,会调用AbstractAdaptableMessageListener#setResponseAddress进行处理。

public void setResponseAddress(String defaultReplyTo) {
  if (defaultReplyTo.startsWith(PARSER_CONTEXT.getExpressionPrefix())) {
    this.responseExpression = PARSER.parseExpression(defaultReplyTo, PARSER_CONTEXT);
  }
  else {
    this.responseAddress = new Address(defaultReplyTo);
  }
}

responseExpression 或 responseAddress 的值不再为null,符合(3)(4)两种情况。

3. RabbitListener注解在类上

此时MethodRabbitListenerEndpoint#createMessageListener方法中的String replyToAddress = getDefaultReplyToAddress()的值为null。

其处理SendTo的逻辑主要在DelegatingInvocableHandler类,具体逻辑在setupReplyTo方法中。

private void setupReplyTo(InvocableHandlerMethod handler) {
  String replyTo = null;
  Method method = handler.getMethod();
  if (method != null) {
    SendTo ann = AnnotationUtils.getAnnotation(method, private void setupReplyTo(InvocableHandlerMethod handler) {
      String replyTo = null;
      Method method = handler.getMethod();
      if (method != null) {
        SendTo ann = AnnotationUtils.getAnnotation(method, SendTo.class);
        replyTo = extractSendTo(method.toString(), ann);
      }
      if (replyTo == null) {
        SendTo ann = AnnotationUtils.getAnnotation(this.bean.getClass(), SendTo.class);
        replyTo = extractSendTo(this.getBean().getClass().getSimpleName(), ann);
      }
      if (replyTo != null) {
        this.handlerSendTo.put(handler, PARSER.parseExpression(replyTo, PARSER_CONTEXT));
      }
    }.class);
    replyTo = extractSendTo(method.toString(), ann);
  }
  if (replyTo == null) {
    SendTo ann = AnnotationUtils.getAnnotation(this.bean.getClass(), SendTo.class);
    replyTo = extractSendTo(this.getBean().getClass().getSimpleName(), ann);
  }
  if (replyTo != null) {
    this.handlerSendTo.put(handler, PARSER.parseExpression(replyTo, PARSER_CONTEXT));
  }
}

可以看到,先获取方法上的SendTo注解,不存在则去获取类上的SendTo注解。

public Object invoke(Message<?> message, Object... providedArgs) throws Exception {
  Class<? extends Object> payloadClass = message.getPayload().getClass();
  InvocableHandlerMethod handler = getHandlerForPayload(payloadClass);
  Object result = handler.invoke(message, providedArgs);
  if (message.getHeaders().get(AmqpHeaders.REPLY_TO) == null) {
    Expression replyTo = this.handlerSendTo.get(handler);
    if (replyTo != null) {
      result = new AbstractAdaptableMessageListener.ResultHolder(result, replyTo);
    }
  }
  return result;
}

然后在invoke中执行完方法对应的InvocableHandlerMethod 后将执行结果和SendTo注解信息一起封装成AbstractAdaptableMessageListener.ResultHolder返回,符合情况(2)。