Springboot RabbitMq源码解析之消费者容器SimpleMessageListenerContainer

/ MQ / 没有评论 / 1561浏览

Springboot RabbitMq源码解析之消费者容器SimpleMessageListenerContainer

前文

一、MessageListenerContainer

在Springboot RabbitMq中,接口MessageListenerContainer负责接收并处理消息。

public interface MessageListenerContainer extends SmartLifecycle {
  /**
	 * Setup the message listener to use. Throws an {@link IllegalArgumentException}
	 * if that message listener type is not supported.
	 * @param messageListener the {@code object} to wrapped to the {@code MessageListener}.
	 */
  void setupMessageListener(Object messageListener);
  /**
	 * @return the {@link MessageConverter} that can be used to
	 * convert {@link org.springframework.amqp.core.Message}, if any.
	 */
  MessageConverter getMessageConverter();
}

从前面的配置类RabbitAnnotationDrivenConfiguration中也能看到,springboot主要支持simple和direct两种方式进行消息的消费,在这里,以SimpleMessageListenerContainer为例查看消息是如何被接收处理的。

二、SimpleMessageListenerContainer启动

1. AbstractMessageListenerContainer#start

首先,MessageListenerContainer继承了SmartLifecycle,因此我们从start方法开始作为介入点进入源码进行查看。

/**
 * Start this container.
 * @see #doStart
 */
@Override
public void start() {
  if (isRunning()) {
    return;
  }
  if (!this.initialized) {
    synchronized (this.lifecycleMonitor) {
      if (!this.initialized) {
        afterPropertiesSet();
        this.initialized = true;
      }
    }
  }
  try {
    if (logger.isDebugEnabled()) {
      logger.debug("Starting Rabbit listener container.");
    }
    configureAdminIfNeeded();
    checkMismatchedQueues();
    doStart();
  }
  catch (Exception ex) {
    throw convertRabbitAccessException(ex);
  }
}

SimpleMessageListenerContainer本身并没有重写start方法,其start方法从父类AbstractMessageListenerContainer继承而来,在AbstractMessageContainer#start方法中,主要进行了包括afterPropertiesSet, configureAdminIfNeeded, checkMismatchedQueues方法在内的一些初始化工作,剩下了工作委托给doStart方法进行处理,方便扩展。

2. SimpleMessageListenerContainer#doStart

SimpleMessageListenerContainer#start方法的目的是校验通过后将消费者consumer提交到线程池taskExecutors,主要逻辑分为3步:

  1. messageListener是ListenerContainerAware时的场景的处理,检查queueNames是否和messsgeListener的expectedQueueNames一致。
if (getMessageListener() instanceof ListenerContainerAware) {
  Collection<String> expectedQueueNames = ((ListenerContainerAware) getMessageListener()).expectedQueueNames();
  if (expectedQueueNames != null) {
    String[] queueNames = getQueueNames();
    Assert.state(expectedQueueNames.size() == queueNames.length,
                 "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
                 + Arrays.asList(queueNames));
    boolean found = false;
    for (String queueName : queueNames) {
      if (expectedQueueNames.contains(queueName)) {
        found = true;
      }
      else {
        found = false;
        break;
      }
    }
    Assert.state(found, () -> "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
                 + Arrays.asList(queueNames));
  }
}
  1. 通过super.doStart();初始化active和running属性,并唤醒所有相关线程。
/**
 * Start this container, and notify all invoker tasks.
 * @throws Exception if thrown by Rabbit API methods
    */
protected void doStart() throws Exception {
  // Reschedule paused tasks, if any.
  synchronized (this.lifecycleMonitor) {
    this.active = true;
    this.running = true;
    this.lifecycleMonitor.notifyAll();
  }
}
  1. 初始化consumer并提交
synchronized (this.consumersMonitor) {
  if (this.consumers != null) {
    throw new IllegalStateException("A stopped container should not have consumers");
  }
  int newConsumers = initializeConsumers();
  if (this.consumers == null) {
    logger.info("Consumers were initialized and then cleared " +
                "(presumably the container was stopped concurrently)");
    return;
  }
  if (newConsumers <= 0) {
    if (logger.isInfoEnabled()) {
      logger.info("Consumers are already running");
    }
    return;
  }
  Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
  for (BlockingQueueConsumer consumer : this.consumers) {
    AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
    processors.add(processor);
    getTaskExecutor().execute(processor);
    if (getApplicationEventPublisher() != null) {
      getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
    }
  }
  for (AsyncMessageProcessingConsumer processor : processors) {
    FatalListenerStartupException startupException = processor.getStartupException();
    if (startupException != null) {
      throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
    }
  }
}

3.1. 初始化consumer在这儿初始化的consumer为BlockingQueueConsumer类型,consumer数量根据concurrentConsumers而来,默认为1。每个consumer的属性根据SimpleMessageListenerContainer而来。

protected int initializeConsumers() {
  int count = 0;
  synchronized (this.consumersMonitor) {
    if (this.consumers == null) {
      this.cancellationLock.reset();
      this.consumers = new HashSet<BlockingQueueConsumer>(this.concurrentConsumers);
      for (int i = 0; i < this.concurrentConsumers; i++) {
        BlockingQueueConsumer consumer = createBlockingQueueConsumer();
        this.consumers.add(consumer);
        count++;
      }
    }
  }
  return count;
}

protected BlockingQueueConsumer createBlockingQueueConsumer() {
  BlockingQueueConsumer consumer;
  String[] queues = getQueueNames();
  // There's no point prefetching less than the tx size, otherwise the consumer will stall because the broker
  // didn't get an ack for delivered messages
  int actualPrefetchCount = getPrefetchCount() > this.txSize ? getPrefetchCount() : this.txSize;
  consumer = new BlockingQueueConsumer(
    getConnectionFactory(), getMessagePropertiesConverter(),
    this.cancellationLock, getAcknowledgeMode(), isChannelTransacted(), actualPrefetchCount,
    isDefaultRequeueRejected(), getConsumerArguments(), isNoLocal(), isExclusive(), queues);
  if (this.declarationRetries != null) {
    consumer.setDeclarationRetries(this.declarationRetries);
  }
  if (getFailedDeclarationRetryInterval() > 0) {
    consumer.setFailedDeclarationRetryInterval(getFailedDeclarationRetryInterval());
  }
  if (this.retryDeclarationInterval != null) {
    consumer.setRetryDeclarationInterval(this.retryDeclarationInterval);
  }
  if (getConsumerTagStrategy() != null) {
    consumer.setTagStrategy(getConsumerTagStrategy());
  }
  consumer.setBackOffExecution(getRecoveryBackOff().start());
  consumer.setShutdownTimeout(getShutdownTimeout());
  consumer.setApplicationEventPublisher(getApplicationEventPublisher());
  return consumer;
}

3.2. 将BlockingQueueConsumer封装成AsyncMessageProcessingConsumer并提交到线程池,同时检查启动结果。SimpleMessageListenerContainer#doStart通过AsyncMessageProcessingConsumer#getStartupException检查AsyncMessageProcessingConsumer的启动结果。

三、AsyncMessageProcessingConsumer

AsyncMessageProcessingConsumer是SimpleMessageListenerContainer的内部类,是一个Runnable类,有3个成员变量,分别为:

private final BlockingQueueConsumer consumer;

private final CountDownLatch start;

private volatile FatalListenerStartupException startupException;

AsyncMessageProcessingConsumer由BlockingQueueConsumer封装而来,也只有AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer)一个构造函数,start是一个CountDownLatch变量,值为1,启动结束后通过this.start.countDown()变化为0。startupException用于保存run方法执行过程中抛出的特定异常。

1. AsyncMessageProcessingConsumer#getStartupException

AsyncMessageProcessingConsumer#getStartupException用于在启动结束后返回执行过程中出现的特定异常。事实上,这里的校验包括了两种情况:

private FatalListenerStartupException getStartupException() throws TimeoutException,
InterruptedException {
  if (!this.start.await(
    SimpleMessageListenerContainer.this.consumerStartTimeout, TimeUnit.MILLISECONDS)) {
    logger.error("Consumer failed to start in "
                 + SimpleMessageListenerContainer.this.consumerStartTimeout
                 + " milliseconds; does the task executor have enough threads to support the container "
                 + "concurrency?");
  }
  return this.startupException;
}

2. AsyncMessageProcessingConsumer#run

@Override
public void run() {
  //1.active校验
  if (!isActive()) {
    return;
  }

  //2. 初始化
  boolean aborted = false;

  int consecutiveIdles = 0;

  int consecutiveMessages = 0;

  this.consumer.setLocallyTransacted(isChannelLocallyTransacted());

  String routingLookupKey = getRoutingLookupKey();
  if (routingLookupKey != null) {
    SimpleResourceHolder.bind(getRoutingConnectionFactory(), routingLookupKey);
  }

  //3. queue数量校验
  if (this.consumer.getQueueCount() < 1) {
    if (logger.isDebugEnabled()) {
      logger.debug("Consumer stopping; no queues for " + this.consumer);
    }
    SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
    if (getApplicationEventPublisher() != null) {
      getApplicationEventPublisher().publishEvent(
        new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
    }
    this.start.countDown();
    return;
  }


  try {

    try {
      //4. 通过redeclareElementsIfNecessary方法进行重新声明和定义元素
      redeclareElementsIfNecessary();
      //5. consumer启动
      this.consumer.start();
      this.start.countDown();
    }
    catch (QueuesNotAvailableException e) {
      if (isMissingQueuesFatal()) {
        throw e;
      }
      else {
        this.start.countDown();
        handleStartupFailure(this.consumer.getBackOffExecution());
        throw e;
      }
    }
    catch (FatalListenerStartupException ex) {
      if (isPossibleAuthenticationFailureFatal()) {
        throw ex;
      }
      else {
        Throwable possibleAuthException = ex.getCause().getCause();
        if (possibleAuthException == null ||
            !(possibleAuthException instanceof PossibleAuthenticationFailureException)) {
          throw ex;
        }
        else {
          this.start.countDown();
          handleStartupFailure(this.consumer.getBackOffExecution());
          throw possibleAuthException;
        }
      }
    }
    catch (Throwable t) { //NOSONAR
      this.start.countDown();
      handleStartupFailure(this.consumer.getBackOffExecution());
      throw t;
    }

    if (getTransactionManager() != null) {
      /*
  		 * Register the consumer's channel so it will be used by the transaction manager
     * if it's an instance of RabbitTransactionManager.
     */
      ConsumerChannelRegistry.registerConsumerChannel(this.consumer.getChannel(), getConnectionFactory());
    }

    //6.接收并处理消息
    while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
      try {
        boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
        if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
          if (receivedOk) {
            if (isActive(this.consumer)) {
              consecutiveIdles = 0;
              if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
                considerAddingAConsumer();
                consecutiveMessages = 0;
              }
            }
          }
          else {
            consecutiveMessages = 0;
            if (consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
              considerStoppingAConsumer(this.consumer);
              consecutiveIdles = 0;
            }
          }
        }
        long idleEventInterval = getIdleEventInterval();
        if (idleEventInterval > 0) {
          if (receivedOk) {
            updateLastReceive();
          }
          else {
            long now = System.currentTimeMillis();
            long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();
            long lastReceive = getLastReceive();
            if (now > lastReceive + idleEventInterval
                && now > lastAlertAt + idleEventInterval
                && SimpleMessageListenerContainer.this.lastNoMessageAlert
                .compareAndSet(lastAlertAt, now)) {
              publishIdleContainerEvent(now - lastReceive);
            }
          }
        }
      }
      catch (ListenerExecutionFailedException ex) {
        // Continue to process, otherwise re-throw
        if (ex.getCause() instanceof NoSuchMethodException) {
          throw new FatalListenerExecutionException("Invalid listener", ex);
        }
      }
      catch (AmqpRejectAndDontRequeueException rejectEx) {
        /*
  			 *  These will normally be wrapped by an LEFE if thrown by the
     *  listener, but we will also honor it if thrown by an
     *  error handler.
     */
      }
    }

  }
  catch (InterruptedException e) {
    logger.debug("Consumer thread interrupted, processing stopped.");
    Thread.currentThread().interrupt();
    aborted = true;
    publishConsumerFailedEvent("Consumer thread interrupted, processing stopped", true, e);
  }
  catch (QueuesNotAvailableException ex) {
    logger.error("Consumer received fatal=" + isMismatchedQueuesFatal() + " exception on startup", ex);
    if (isMissingQueuesFatal()) {
      this.startupException = ex;
      // Fatal, but no point re-throwing, so just abort.
      aborted = true;
    }
    publishConsumerFailedEvent("Consumer queue(s) not available", aborted, ex);
  }
  catch (FatalListenerStartupException ex) {
    logger.error("Consumer received fatal exception on startup", ex);
    this.startupException = ex;
    // Fatal, but no point re-throwing, so just abort.
    aborted = true;
    publishConsumerFailedEvent("Consumer received fatal exception on startup", true, ex);
  }
  catch (FatalListenerExecutionException ex) {
    logger.error("Consumer received fatal exception during processing", ex);
    // Fatal, but no point re-throwing, so just abort.
    aborted = true;
    publishConsumerFailedEvent("Consumer received fatal exception during processing", true, ex);
  }
  catch (PossibleAuthenticationFailureException ex) {
    logger.error("Consumer received fatal=" + isPossibleAuthenticationFailureFatal() +
                 " exception during processing", ex);
    if (isPossibleAuthenticationFailureFatal()) {
      this.startupException =
        new FatalListenerStartupException("Authentication failure",
                                          new AmqpAuthenticationException(ex));
      // Fatal, but no point re-throwing, so just abort.
      aborted = true;
    }
    publishConsumerFailedEvent("Consumer received PossibleAuthenticationFailure during startup", aborted, ex);
  }
  catch (ShutdownSignalException e) {
    if (RabbitUtils.isNormalShutdown(e)) {
      if (logger.isDebugEnabled()) {
        logger.debug("Consumer received Shutdown Signal, processing stopped: " + e.getMessage());
      }
    }
    else {
      logConsumerException(e);
    }
  }
  catch (AmqpIOException e) {
    if (e.getCause() instanceof IOException && e.getCause().getCause() instanceof ShutdownSignalException
        && e.getCause().getCause().getMessage().contains("in exclusive use")) {
      getExclusiveConsumerExceptionLogger().log(logger,
                                                "Exclusive consumer failure", e.getCause().getCause());
      publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, e);
    }
    else {
      logConsumerException(e);
    }
  }
  catch (Error e) { //NOSONAR
    // ok to catch Error - we're aborting so will stop
    logger.error("Consumer thread error, thread abort.", e);
    publishConsumerFailedEvent("Consumer threw an Error", true, e);
    aborted = true;
  }
  catch (Throwable t) { //NOSONAR
    // by now, it must be an exception
    if (isActive()) {
      logConsumerException(t);
    }
  }
  finally {
    if (getTransactionManager() != null) {
      ConsumerChannelRegistry.unRegisterConsumerChannel();
    }
  }

  //7. 确保启动结束的标志start正确
  // In all cases count down to allow container to progress beyond startup
  this.start.countDown();

  //8. 终止或重启consumer
  if (!isActive(this.consumer) || aborted) {
    logger.debug("Cancelling " + this.consumer);
    try {
      this.consumer.stop();
      SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
      if (getApplicationEventPublisher() != null) {
        getApplicationEventPublisher().publishEvent(
          new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
      }
    }
    catch (AmqpException e) {
      logger.info("Could not cancel message consumer", e);
    }
    if (aborted && SimpleMessageListenerContainer.this.containerStoppingForAbort
        .compareAndSet(null, Thread.currentThread())) {
      logger.error("Stopping container from aborted consumer");
      stop();
      SimpleMessageListenerContainer.this.containerStoppingForAbort.set(null);
      ListenerContainerConsumerFailedEvent event = null;
      do {
        try {
          event = SimpleMessageListenerContainer.this.abortEvents.poll(5, TimeUnit.SECONDS);
          if (event != null) {
            SimpleMessageListenerContainer.this.publishConsumerFailedEvent(
              event.getReason(), event.isFatal(), event.getThrowable());
          }
        }
        catch (InterruptedException e) {
          Thread.currentThread().interrupt();
        }
      }
      while (event != null);
    }
  }
  else {
    logger.info("Restarting " + this.consumer);
    restart(this.consumer);
  }

  if (routingLookupKey != null) {
    SimpleResourceHolder.unbind(getRoutingConnectionFactory());
  }
}

run方法很长,但除去异常处理的相关代码之后的逻辑也并不是很复杂,大致可以按以下步骤理解:

  1. active校验校验
  2. 变量初始化。一些变量的初始化,包括是否启动事务、路由的key值。consecutiveMessages表示连续接收到的消息数量, consecutiveIdles表示连续失败的数量
  3. queue数量校验
  4. 通过redeclareElementsIfNecessary方法进行重新声明和定义元素。自动删除队列会导致绑定关系、路由器等元素也被删除,因此有可能需要重新申明和定义
  5. consumer启动。正常的业务场景下,这一步执行结束表示了AyncMessageProcessingCosnumer启动完成。 通过consumer的start方法进行consumer的启动。
  6. 接收并处理消息: 这一步是一个循环.
  1. 确保启动结束的标志start正确
  2. 终止或重启consumer。根据consumer是否有效和是否被中断对consumer进行相应的终止或重启操作。

四、 消息接收与处理

1. SimpleMessageListenerContainer#receiveAndExecute

private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Throwable {

  if (getTransactionManager() != null) {
    try {
      if (this.transactionTemplate == null) {
        this.transactionTemplate =
          new TransactionTemplate(getTransactionManager(), getTransactionAttribute());
      }
      return this.transactionTemplate
        .execute(status -> {
          RabbitResourceHolder resourceHolder = ConnectionFactoryUtils.bindResourceToTransaction(
            new RabbitResourceHolder(consumer.getChannel(), false),
            getConnectionFactory(), true);
          // unbound in ResourceHolderSynchronization.beforeCompletion()
          try {
            return doReceiveAndExecute(consumer);
          }
          catch (RuntimeException e1) {
            prepareHolderForRollback(resourceHolder, e1);
            throw e1;
          }
          catch (Throwable e2) { //NOSONAR
            // ok to catch Throwable here because we re-throw it below
            throw new WrappedTransactionException(e2);
          }
        });
    }
    catch (WrappedTransactionException e) {
      throw e.getCause();
    }
  }
  return doReceiveAndExecute(consumer);
}

receiveAndExecute委托doReceiveAndExecute方法进行具体的业务逻辑,如果配置了事务,在抛出异常的场景下由prepareHolderForRollback方法进行回滚。

2. SimpleMessageListenerContainer#doReveiveAndExecute

private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable { //NOSONAR
  Channel channel = consumer.getChannel();
  for (int i = 0; i < this.txSize; i++) {
    logger.trace("Waiting for message from consumer.");
    Message message = consumer.nextMessage(this.receiveTimeout);
    if (message == null) {
      break;
    }
    try {
      executeListener(channel, message);
    }
    catch (ImmediateAcknowledgeAmqpException e) {
      if (this.logger.isDebugEnabled()) {
        this.logger.debug("User requested ack for failed delivery '"
                          + e.getMessage() + "': "
                          + message.getMessageProperties().getDeliveryTag());
      }
      break;
    }
    catch (Throwable ex) { //NOSONAR
      if (causeChainHasImmediateAcknowledgeAmqpException(ex)) {
        if (this.logger.isDebugEnabled()) {
          this.logger.debug("User requested ack for failed delivery: "
                            + message.getMessageProperties().getDeliveryTag());
        }
        break;
      }
      if (getTransactionManager() != null) {
        if (getTransactionAttribute().rollbackOn(ex)) {
          RabbitResourceHolder resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager
            .getResource(getConnectionFactory());
          if (resourceHolder != null) {
            consumer.clearDeliveryTags();
          }
          else {
            /*
  					 * If we don't actually have a transaction, we have to roll back
       * manually. See prepareHolderForRollback().
      		*/
            consumer.rollbackOnExceptionIfNecessary(ex);
          }
          throw ex; // encompassing transaction will handle the rollback.
        }
        else {
          if (this.logger.isDebugEnabled()) {
            this.logger.debug("No rollback for " + ex);
          }
          break;
        }
      }
      else {
        consumer.rollbackOnExceptionIfNecessary(ex);
        throw ex;
      }
    }
  }
  return consumer.commitIfNecessary(isChannelLocallyTransacted());
}

doReceiveAndExecute又委托了executeListener(channel, message);进行处理,剩余部分的代码都是针对事务的处理。

3. SimpleMessageListenerContainer#nextMessage

public Message nextMessage(long timeout) throws InterruptedException, ShutdownSignalException {
  if (logger.isTraceEnabled()) {
    logger.trace("Retrieving delivery for " + this);
  }
  checkShutdown();
  if (this.missingQueues.size() > 0) {
    checkMissingQueues();
  }
  Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
  if (message == null && this.cancelled.get()) {
    throw new ConsumerCancelledException();
  }
  return message;
}
private Message handle(Delivery delivery) throws InterruptedException {
  if ((delivery == null && this.shutdown != null)) {
    throw this.shutdown;
  }
  if (delivery == null) {
    return null;
  }
  byte[] body = delivery.getBody();
  Envelope envelope = delivery.getEnvelope();

  MessageProperties messageProperties = this.messagePropertiesConverter.toMessageProperties(
    delivery.getProperties(), envelope, "UTF-8");
  messageProperties.setConsumerTag(delivery.getConsumerTag());
  messageProperties.setConsumerQueue(this.consumerTags.get(delivery.getConsumerTag()));
  Message message = new Message(body, messageProperties);
  if (logger.isDebugEnabled()) {
    logger.debug("Received message: " + message);
  }
  this.deliveryTags.add(messageProperties.getDeliveryTag());
  if (this.transactional && !this.locallyTransacted) {
    ConnectionFactoryUtils.registerDeliveryTag(
      this.connectionFactory, this.channel,delivery.getEnvelope().getDeliveryTag());
  }
  return message;
}

nextMessage方法用于从rabbit中拉去数据然后组装成message格式。

4. AbstractMessageListenerContainer#executeListener

protected void executeListener(Channel channel, Message messageIn) throws Exception {
  if (!isRunning()) {
    if (logger.isWarnEnabled()) {
      logger.warn("Rejecting received message because the listener container has been stopped: " + messageIn);
    }
    throw new MessageRejectedWhileStoppingException();
  }
  try {
    Message message = messageIn;
    if (this.afterReceivePostProcessors != null) {
      for (MessagePostProcessor processor : this.afterReceivePostProcessors) {
        message = processor.postProcessMessage(message);
        if (message == null) {
          throw new ImmediateAcknowledgeAmqpException(
            "Message Post Processor returned 'null', discarding message");
        }
      }
    }
    Object batchFormat = message.getMessageProperties().getHeaders().get(MessageProperties.SPRING_BATCH_FORMAT);
    if (MessageProperties.BATCH_FORMAT_LENGTH_HEADER4.equals(batchFormat) && this.deBatchingEnabled) {
      ByteBuffer byteBuffer = ByteBuffer.wrap(message.getBody());
      MessageProperties messageProperties = message.getMessageProperties();
      messageProperties.getHeaders().remove(MessageProperties.SPRING_BATCH_FORMAT);
      while (byteBuffer.hasRemaining()) {
        int length = byteBuffer.getInt();
        if (length < 0 || length > byteBuffer.remaining()) {
          throw new ListenerExecutionFailedException(
            "Bad batched message received",
            new MessageConversionException("Insufficient batch data at offset " + byteBuffer.position()),
            message);
        }
        byte[] body = new byte[length];
        byteBuffer.get(body);
        messageProperties.setContentLength(length);
        // Caveat - shared MessageProperties.
        Message fragment = new Message(body, messageProperties);
        invokeListener(channel, fragment);
      }
    }
    else {
      invokeListener(channel, message);
    }
  }
  catch (Exception ex) {
    if (messageIn.getMessageProperties().isFinalRetryForMessageWithNoId()) {
      if (this.statefulRetryFatalWithNullMessageId) {
        throw new FatalListenerExecutionException(
          "Illegal null id in message. Failed to manage retry for message: " + messageIn);
      }
      else {
        throw new ListenerExecutionFailedException(
          "Cannot retry message more than once without an ID",
          new AmqpRejectAndDontRequeueException("Not retryable; rejecting and not requeuing", ex),
          messageIn);
      }
    }
    handleListenerException(ex);
    throw ex;
  }
}

可以看出executeListener方法内只是控制了业务流程,校验通过后,由后处理器MessagePostProcessor进行预处理,对批量型的springBatchFormat进行message处理,委托invokeListener方法进行具体的业务代码实现,如果出现异常,由handleListenerException处理异常。

5. AbstractMessageListenerContainer#invokeListener

protected void invokeListener(Channel channel, Message message) throws Exception {
  this.proxy.invokeListener(channel, message);
}

AbstractMessageListenerContainer#invokeListener方法最终是执行proxy的invokeListener方法,查看proxy的属性值。

private final ContainerDelegate delegate = this::actualInvokeListener;
private ContainerDelegate proxy = this.delegate;

可以看出,最终回到了AbstractMessageListenerContainer#actualInvokeListener方法。

6. AbstractMessageListenerContainer#actualInvokeListener

protected void actualInvokeListener(Channel channel, Message message) throws Exception {
  Object listener = getMessageListener();
  if (listener instanceof ChannelAwareMessageListener) {
    doInvokeListener((ChannelAwareMessageListener) listener, channel, message);
  }
  else if (listener instanceof MessageListener) {
    boolean bindChannel = isExposeListenerChannel() && isChannelLocallyTransacted();
    if (bindChannel) {
      RabbitResourceHolder resourceHolder = new RabbitResourceHolder(channel, false);
      resourceHolder.setSynchronizedWithTransaction(true);
      TransactionSynchronizationManager.bindResource(
        this.getConnectionFactory(),resourceHolder);
    }
    try {
      doInvokeListener((MessageListener) listener, message);
    }
    finally {
      if (bindChannel) {
        // unbind if we bound
        TransactionSynchronizationManager.unbindResource(this.getConnectionFactory());
      }
    }
  }
  else if (listener != null) {
    throw new FatalListenerExecutionException(
      "Only MessageListener and SessionAwareMessageListener supported: "
      + listener);
  }
  else {
    throw new FatalListenerExecutionException(
      "No message listener specified - see property 'messageListener'");
  }
}

由于AbstractMessageListener的属性messageListener是Object类型的,因此在处理记得逻辑时需要根据messageListener的真实类型做不同处理。

7. AbstractMessageListenerContainer#doInvokeListener(MessageListener, Message)

protected void doInvokeListener(MessageListener listener, Message message) throws Exception {
  try {
    listener.onMessage(message);
  }
  catch (Exception e) {
    throw wrapToListenerExecutionFailedExceptionIfNeeded(e, message);
  }
}

对于MessageListener类型的监听器,由MessageListener#onMessage处理即可。

8.AbstractMessageListenerContainer#doInvokeListener(ChannelAwareMessageListener, Channel, Message)

/**
 * Invoke the specified listener as Spring ChannelAwareMessageListener, exposing a new Rabbit Session (potentially
 * with its own transaction) to the listener if demanded.
 * @param listener the Spring ChannelAwareMessageListener to invoke
 * @param channel the Rabbit Channel to operate on
 * @param message the received Rabbit Message
 * @throws Exception if thrown by Rabbit API methods or listener itself.
 * <p>
 * Exception thrown from listener will be wrapped to {@link ListenerExecutionFailedException}.
 * @see ChannelAwareMessageListener
 * @see #setExposeListenerChannel(boolean)
 */
protected void doInvokeListener(ChannelAwareMessageListener listener, Channel channel, Message message)
  throws Exception {

  RabbitResourceHolder resourceHolder = null;
  Channel channelToUse = channel;
  boolean boundHere = false;
  try {
    if (!isExposeListenerChannel()) {
      // We need to expose a separate Channel.
      resourceHolder = getTransactionalResourceHolder();
      channelToUse = resourceHolder.getChannel();
      /*
    		 * If there is a real transaction, the resource will have been bound; otherwise
    		 * we need to bind it temporarily here. Any work done on this channel
    		 * will be committed in the finally block.
    		 */
      if (isChannelLocallyTransacted() &&
          !TransactionSynchronizationManager.isActualTransactionActive()) {
        resourceHolder.setSynchronizedWithTransaction(true);
        TransactionSynchronizationManager.bindResource(this.getConnectionFactory(),
                                                       resourceHolder);
        boundHere = true;
      }
    }
    else {
      // if locally transacted, bind the current channel to make it available to RabbitTemplate
      if (isChannelLocallyTransacted()) {
        RabbitResourceHolder localResourceHolder = new RabbitResourceHolder(channelToUse, false);
        localResourceHolder.setSynchronizedWithTransaction(true);
        TransactionSynchronizationManager.bindResource(this.getConnectionFactory(),
                                                       localResourceHolder);
        boundHere = true;
      }
    }
    // Actually invoke the message listener...
    try {
      listener.onMessage(message, channelToUse);
    }
    catch (Exception e) {
      throw wrapToListenerExecutionFailedExceptionIfNeeded(e, message);
    }
  }
  finally {
    if (resourceHolder != null && boundHere) {
      // so the channel exposed (because exposeListenerChannel is false) will be closed
      resourceHolder.setSynchronizedWithTransaction(false);
    }
    ConnectionFactoryUtils.releaseResources(resourceHolder);
    if (boundHere) {
      // unbind if we bound
      TransactionSynchronizationManager.unbindResource(this.getConnectionFactory());
      if (!isExposeListenerChannel() && isChannelLocallyTransacted()) {
        /*
    			 *  commit the temporary channel we exposed; the consumer's channel
    			 *  will be committed later. Note that when exposing a different channel
    			 *  when there's no transaction manager, the exposed channel is committed
    			 *  on each message, and not based on txSize.
    			 */
        RabbitUtils.commitIfNecessary(channelToUse);
      }
    }
  }
}

对于ChannelAwareMessageListener类型的监听器,除去事务的处理以外,同样也是交给ChannelAwareMessageListener#onMessage进行处理。