Springboot RabbitMq源码解析之consumer管理和AmqpEvent

/ MQ / 没有评论 / 1789浏览

Springboot RabbitMq源码解析之consumer管理和AmqpEvent

前文

上一篇文章Springboot RabbitMq源码解析之消费者容器SimpleMessageListenerContainer主要讲了SimpleMessageListenerContainer的启动和AsyncMessageProcessingConsumer对消息的接收和处理,这一篇文章解析的内容是consumer管理和AmqpEvent,对上一篇文章中没有提到和展开的地方做补充。

一、consumer状态管理

在AsyncMessageProcessingConsumer#run方法中根据需要对BlockingQueueConsumer类型的成员变量consumer进行了管理。

1. BlockingQueueConsumer#start

对consumer进行启动,做好接受消息的准备

public void start() throws AmqpException {
  if (logger.isDebugEnabled()) {
    logger.debug("Starting consumer " + this);
  }

  this.thread = Thread.currentThread();

  try {
    this.resourceHolder = ConnectionFactoryUtils.getTransactionalResourceHolder(this.connectionFactory,
                                                                                this.transactional);
    this.channel = this.resourceHolder.getChannel();
    addRecoveryListener();
  }
  catch (AmqpAuthenticationException e) {
    throw new FatalListenerStartupException("Authentication failure", e);
  }
  this.consumer = new InternalConsumer(this.channel);
  this.deliveryTags.clear();
  this.activeObjectCounter.add(this);

  // mirrored queue might be being moved
  int passiveDeclareRetries = this.declarationRetries;
  this.declaring = true;
  do {
    if (cancelled()) {
      break;
    }
    try {
      attemptPassiveDeclarations();
      if (passiveDeclareRetries < this.declarationRetries && logger.isInfoEnabled()) {
        logger.info("Queue declaration succeeded after retrying");
      }
      passiveDeclareRetries = 0;
    }
    catch (DeclarationException e) {
      if (passiveDeclareRetries > 0 && this.channel.isOpen()) {
        if (logger.isWarnEnabled()) {
          logger.warn("Queue declaration failed; retries left=" + (passiveDeclareRetries), e);
          try {
            Thread.sleep(this.failedDeclarationRetryInterval);
          }
          catch (InterruptedException e1) {
            this.declaring = false;
            Thread.currentThread().interrupt();
            this.activeObjectCounter.release(this);
            throw RabbitExceptionTranslator.convertRabbitAccessException(e1);
          }
        }
      }
      else if (e.getFailedQueues().size() < this.queues.length) {
        if (logger.isWarnEnabled()) {
          logger.warn("Not all queues are available; only listening on those that are - configured: "
                      + Arrays.asList(this.queues) + "; not available: " + e.getFailedQueues());
        }
        this.missingQueues.addAll(e.getFailedQueues());
        this.lastRetryDeclaration = System.currentTimeMillis();
      }
      else {
        this.declaring = false;
        this.activeObjectCounter.release(this);
        throw new QueuesNotAvailableException("Cannot prepare queue for listener. "
                                              + "Either the queue doesn't exist or the broker will not allow us to use it.", e);
      }
    }
  }
  while (passiveDeclareRetries-- > 0 && !cancelled());
  this.declaring = false;

  if (!this.acknowledgeMode.isAutoAck() && !cancelled()) {
    // Set basicQos before calling basicConsume (otherwise if we are not acking the broker
    // will send blocks of 100 messages)
    try {
      this.channel.basicQos(this.prefetchCount);
    }
    catch (IOException e) {
      this.activeObjectCounter.release(this);
      throw new AmqpIOException(e);
    }
  }

  try {
    if (!cancelled()) {
      for (String queueName : this.queues) {
        if (!this.missingQueues.contains(queueName)) {
          consumeFromQueue(queueName);
        }
      }
    }
  }
  catch (IOException e) {
    throw RabbitExceptionTranslator.convertRabbitAccessException(e);
  }
}

主要的工作有以下:

  1. consumer属性封装: BlockingQueueConsumer也有一个consumer属性,为InternalConsumer类型,有channel封装而来。
  2. queue声明: queue的声明是一个失败重试的循环机制,用一个表示最大重试次数的成员变量passiveDeclareRetries,用成员变量declaring表示正在声明中。使用失败重试的原因如注释所言,主要是针对镜像队列的场景,当然其它时候的声明也有可能抛出异常。
  3. 设置basicQos: 在不是自动ack的场景下,需要设置basicQos进行并行处理闲置,在规定数目的消息没有被ack前,进行阻塞,不再消费新的消息。
  4. 启动消费者,并返回标志,将消费者标志和队列加入键值对。

2. BlockingQueueConsumer#stop

用于终止BlockingQueueConsumer,不在对rabbitmq进行消费。

public void stop() {
  if (this.abortStarted == 0) { // signal handle delivery to use offer
    this.abortStarted = System.currentTimeMillis();
  }
  if (this.consumer != null && this.consumer.getChannel() != null && this.consumerTags.size() > 0
      && !this.cancelled.get()) {
    try {
      RabbitUtils.closeMessageConsumer(this.consumer.getChannel(), this.consumerTags.keySet(),
                                       this.transactional);
    }
    catch (Exception e) {
      if (logger.isDebugEnabled()) {
        logger.debug("Error closing consumer " + this, e);
      }
    }
  }
  if (logger.isDebugEnabled()) {
    logger.debug("Closing Rabbit Channel: " + this.channel);
  }
  RabbitUtils.setPhysicalCloseRequired(this.channel, true);
  ConnectionFactoryUtils.releaseResources(this.resourceHolder);
  this.deliveryTags.clear();
  this.consumer = null;
  this.queue.clear(); // in case we still have a client thread blocked
}

记录时间,清空对象的属性,关闭对rabbitmq的消费。

3. SimpleMessageListenerContainer#restart

SimpleMessageListenerContainer#start虽然不是BlockingQueueConsumer的方法,但也是对BlockingQueueConsumer的操作和管理,可以理解为重新启动一个新的BlockingQueueConsumer。

private void restart(BlockingQueueConsumer oldConsumer) {
  BlockingQueueConsumer consumer = oldConsumer;
  synchronized (this.consumersMonitor) {
    if (this.consumers != null) {
      try {
        // Need to recycle the channel in this consumer
        consumer.stop();
        // Ensure consumer counts are correct (another is going
        // to start because of the exception, but
        // we haven't counted down yet)
        this.cancellationLock.release(consumer);
        this.consumers.remove(consumer);
        if (!isActive()) {
          // Do not restart - container is stopping
          return;
        }
        BlockingQueueConsumer newConsumer = createBlockingQueueConsumer();
        newConsumer.setBackOffExecution(consumer.getBackOffExecution());
        consumer = newConsumer;
        this.consumers.add(consumer);
        if (getApplicationEventPublisher() != null) {
          getApplicationEventPublisher()
            .publishEvent(new AsyncConsumerRestartedEvent(this, oldConsumer, newConsumer));
        }
      }
      catch (RuntimeException e) {
        logger.warn("Consumer failed irretrievably on restart. " + e.getClass() + ": " + e.getMessage());
        // Re-throw and have it logged properly by the caller.
        throw e;
      }
      getTaskExecutor()
        .execute(new AsyncMessageProcessingConsumer(consumer));
    }
  }
}

reStart方法的作用就是通过BlockingQueueConsumer#stop方法终止一个旧的consumer,同时新建一个新的consumer封装成AsyncMessageProcessingConsumer进行启动。在终止旧的和新建新的之间会对isActive()再次判断是否可用,如果不可用则直接终止restart进程。

二、 consumer数量管理

在AsyncMessageProcessingConsumer#run方法的接收并消费消息的循环体内,会根据接收消息的情况对BlockingQueueConsumer的数量进行调整和管理。

boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
  if (receivedOk) {
    if (isActive(this.consumer)) {
      consecutiveIdles = 0;
      if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
        considerAddingAConsumer();
        consecutiveMessages = 0;
      }
    }
  }
  else {
    consecutiveMessages = 0;
    if (consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
      considerStoppingAConsumer(this.consumer);
      consecutiveIdles = 0;
    }
  }
}
long idleEventInterval = getIdleEventInterval();
if (idleEventInterval > 0) {
  if (receivedOk) {
    updateLastReceive();
  }
  else {
    long now = System.currentTimeMillis();
    long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();
    long lastReceive = getLastReceive();
    if (now > lastReceive + idleEventInterval
        && now > lastAlertAt + idleEventInterval
        && SimpleMessageListenerContainer.this.lastNoMessageAlert
        .compareAndSet(lastAlertAt, now)) {
      publishIdleContainerEvent(now - lastReceive);
    }
  }
}

可以看到,后续的consumer数量管理都与receiveAndExecute方法的返回值receivedOk息息相关。

1. 返回值receivedOk

跟踪进入receiveAndExecute方法,可以发现recievedOk对应的是SimpleMessageListenerConsumer#doReceiveAndExecute中的return consumer.commitIfNecessary(isChannelLocallyTransacted());语句,即BlockingQueueConsumer#commitIfNecessary的返回值。

/**
 * Perform a commit or message acknowledgement, as appropriate.
 * @param locallyTransacted Whether the channel is locally transacted.
 * @return true if at least one delivery tag exists.
 * @throws IOException Any IOException.
    */
public boolean commitIfNecessary(boolean locallyTransacted) throws IOException {

  if (this.deliveryTags.isEmpty()) {
    return false;
  }

  /*
     * If we have a TX Manager, but no TX, act like we are locally transacted.
        */
  boolean isLocallyTransacted = locallyTransacted
    || (this.transactional
        && TransactionSynchronizationManager.getResource(this.connectionFactory) == null);
  try {

    boolean ackRequired = !this.acknowledgeMode.isAutoAck() && !this.acknowledgeMode.isManual();

    if (ackRequired) {
      if (!this.transactional || isLocallyTransacted) {
        long deliveryTag = new ArrayList<Long>(this.deliveryTags).get(this.deliveryTags.size() - 1);
        this.channel.basicAck(deliveryTag, true);
      }
    }

    if (isLocallyTransacted) {
      // For manual acks we still need to commit
      RabbitUtils.commitIfNecessary(this.channel);
    }

  }
  finally {
    this.deliveryTags.clear();
  }

  return true;

}

可以看到,BlockingQueueCosnumer#commitIfNecessary方法返回的结果的依据就是成员变量deliveryTags是否为空:this.deliveryTags.isEmpty()。

跟踪成员变量deliveryTags,可以发现只有在接收到消息的时候才会往delieveryTags中添加元素,具体内容在BlockingQueueConsumer#handle方法中的this.deliveryTags.add(messageProperties.getDeliveryTag());语句,而且在处理完成后无论是正常处理还是抛出异常都会通过BlockingQueueConsumer#clearDeliveryTags方法、BlockingQueueConsumer#rollbackOnExceptionIfNecessary方法或BlockingQueueConsumer#commitIfNecessary方法将deliveryTags中的元素清空。因此 返回值receivedOk对应的就是在规定时间内(receiveTimeout,默认1000ms)内有没有成功poll到消息。

2. 相关成员变量

在解析consumer数量的调整管理逻辑之前,需要先了解一下BlockingQueueConsumer中的一些相关的成员变量。

  1. BlockingQueueConsumer#receiveTimeout: 前面已经提到,consumer每次poll消息时的超时时间,默认1000
  2. SimpleMessageListenerContainer#consecutiveActiveTrigger: 连续接收到消息的次数的触发值,当连续没有接收到消息次数大于该触发值时,会触发相关的数量调整方法。默认10。
  3. SimpleMessageListenerContainer#consecutiveIdleTrigger: 连续没有收到消息的次数的触发值,当连续没有接收到消息次数大于该触发值时,会触发相关的数量调整方法。默认10。
  4. SimpleMessageListenerContainer#maxConcurrentConsumers: consumer的最大并发数量,只有不为null的时候才会进行相应的数量调整。默认null。
  5. SimpleMessageListenerContainer#concurrentConsumers: consumer的初始并发数量,也可以理解为最小并发数量,容器中的consumer数量不能小于该值,默认1。
  6. SimpleMessageListenerContainer#lastConsumerStarted: 上一次增加consumer的时间戳,初始值为0。
  7. SimpleMessageListenerContainer#lastConsumerStopped: 上一次减少consumer的时间戳,初始值为0。
  8. SimpleMessageListenerContainer#startConsumerMinInterval: 两次新增consumer的最小时间间隔,默认10000。
  9. SimpleMessageListenerContainer#stopConsumerMinInterval: 两次减少consumer的最小时间间隔,默认60000。

另外,consecutiveActiveTrigger和consecutiveIdleTrigger会触发相应的增加和减少consumer的方法,但不保证一定会增加或减少consumer,因为在这两个方法中还会有相应的判断。

3. 触发consumer数量调整

if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
  if (receivedOk) {
    if (isActive(this.consumer)) {
      consecutiveIdles = 0;
      if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
        considerAddingAConsumer();
        consecutiveMessages = 0;
      }
    }
  }
  else {
    consecutiveMessages = 0;
    if (consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
      considerStoppingAConsumer(this.consumer);
      consecutiveIdles = 0;
    }
  }
}

回到AsyncMessageProcessingConsumer#run中,可以看到,只有在设置了SimpleMessageListenerContainer#maxConcurrentConsumers 的前提下才有可能会触发consumer数量调整。

consecutiveIdles和consecutiveMessages 分别记录连续收到和没有收到消息的次数。 需要注意,这里进行比较的时候使用的是consecutiveMessages++而不是++consecutiveMessages,consecutiveIdles也是一样。当次数大于对应的触发值,触发相应的方法,并将次数重置为0。

4. 增加consumer数量

private void considerAddingAConsumer() {
  synchronized (this.consumersMonitor) {
    if (this.consumers != null
        && this.maxConcurrentConsumers != null && this.consumers.size() < this.maxConcurrentConsumers) {
      long now = System.currentTimeMillis();
      if (this.lastConsumerStarted + this.startConsumerMinInterval < now) {
        this.addAndStartConsumers(1);
        this.lastConsumerStarted = now;
      }
    }
  }
}

protected void addAndStartConsumers(int delta) {
  synchronized (this.consumersMonitor) {
    if (this.consumers != null) {
      for (int i = 0; i < delta; i++) {
        BlockingQueueConsumer consumer = createBlockingQueueConsumer();
        this.consumers.add(consumer);
        AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
        if (logger.isDebugEnabled()) {
          logger.debug("Starting a new consumer: " + consumer);
        }
        getTaskExecutor().execute(processor);
        if (this.getApplicationEventPublisher() != null) {
          this.getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
        }
        try {
          FatalListenerStartupException startupException = processor.getStartupException();
          if (startupException != null) {
            this.consumers.remove(consumer);
            throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
          }
        }
        catch (InterruptedException ie) {
          Thread.currentThread().interrupt();
        }
        catch (Exception e) {
          consumer.stop();
          logger.error("Error starting new consumer", e);
          this.cancellationLock.release(consumer);
          this.consumers.remove(consumer);
        }
      }
    }
  }
}

增加consumer数量需要加同步锁,再次进行判断,保证consumer数量不会超过maxConcurrentConsumers且新增的时间间隔大于最小时间间隔,然后新增BlockingQueueConsumer封装成AsyncMessageProcessingConsumer进行启动,并且更新新增consumer的时间戳lastConsumerStarted 。

5. 减少consumer数量

private void considerStoppingAConsumer(BlockingQueueConsumer consumer) {
  synchronized (this.consumersMonitor) {
    if (this.consumers != null && this.consumers.size() > this.concurrentConsumers) {
      long now = System.currentTimeMillis();
      if (this.lastConsumerStopped + this.stopConsumerMinInterval < now) {
        consumer.basicCancel(true);
        this.consumers.remove(consumer);
        if (logger.isDebugEnabled()) {
          logger.debug("Idle consumer terminating: " + consumer);
        }
        this.lastConsumerStopped = now;
      }
    }
  }
}
protected void basicCancel(boolean expected) {
  this.normalCancel = expected;
  for (String consumerTag : this.consumerTags.keySet()) {
    removeConsumer(consumerTag);
    try {
      if (this.channel.isOpen()) {
        this.channel.basicCancel(consumerTag);
      }
    }
    catch (IOException | IllegalStateException e) {
      if (logger.isDebugEnabled()) {
        logger.debug("Error performing 'basicCancel'", e);
      }
    }
    catch (AlreadyClosedException e) {
      if (logger.isTraceEnabled()) {
        logger.trace(this.channel + " is already closed");
      }
    }
  }
  this.abortStarted = System.currentTimeMillis();
}

减少consumer数量需要加同步锁,再次进行判断,确保consumer数量不会小于concurrentConsumers,且两次减少consumer的时间间隔大于最小时间间隔,通过BlockingQueueConsumer#basicCancle将当前consumer终止,并且更新减少consumer的时间戳lastConsumerStopped。

三、AmqpEvent

在Springboot RabbitMq中,提供了不同的ApplicationEvent事件,我们可以通过时间相应的ApplicationListener以进行扩展。 AmqpEvent是继承了ApplicationEvent的抽象类,是Springboot RabbitMq的ApplicationEvent的基类。

1. AsyncConsumerRestartedEvent

An event that is published whenever a consumer is restarted.

在consumer重启时发布,具体方法:org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#restart(BlockingQueueConsumer)

/**
 * @param source the listener container.
 * @param oldConsumer the old consumer.
 * @param newConsumer the new consumer.
 */
public AsyncConsumerRestartedEvent(Object source, Object oldConsumer, Object newConsumer) {
  super(source);
  this.oldConsumer = oldConsumer;
  this.newConsumer = newConsumer;
}

成员变量分别对应容器和旧、新消费者。

2. AsyncConsumerStartedEvent

An event that is published whenever a new consumer is started.

新建consumer时发布,具体方法:

  1. org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer#doConsumeFromQueue(String)
  2. org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#doStart()
  3. org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#addAndStartConsumers(int)
/**
 * @param source the listener container.
 * @param consumer the old consumer.
 */
public AsyncConsumerStartedEvent(Object source, Object consumer) {
  super(source);
  this.consumer = consumer;
}

成员变量分别对应容器和新建的消费者。

3. AsyncConsumerStoppedEvent

An event that is published whenever a consumer is stopped (and not restarted).

终止consumer时发布,具体方法: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run()

public AsyncConsumerStoppedEvent(Object source, Object consumer) {
  super(source);
  this.consumer = consumer;
}

成员变量分别对应容器和被终止的consumer。

4. ConnectionBlockedEvent

The {@link AmqpEvent} emitted by the {@code CachingConnectionFactory} when its connections are blocked.

连接被阻塞时发布,具体方法:org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.ConnectionBlockedListener#handleBlocked(String)

public ConnectionBlockedEvent(Connection source, String reason) {
  super(source);
  this.reason = reason;
}

成员变量对应连接和阻塞原因

5. ConnectionUnblockedEvent

The {@link AmqpEvent} emitted by the {@code CachingConnectionFactory} when its connections are unblocked.

连接从阻塞中恢复时发布,具体方法: org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.ConnectionBlockedListener#handleUnblocked()

public ConnectionUnblockedEvent(Connection source) {
  super(source);
}

成员变量对应连接

6. ConsumeOkEvent

启动consumer,获取consumer tag成功时发布,具体方法:

  1. org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#consumeFromQueue(String)
  2. org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer.SimpleConsumer#handleConsumeOk(String)
public ConsumeOkEvent(Object source, String queue, String consumerTag) {
  super(source);
  this.queue = queue;
  this.consumerTag = consumerTag;
}

成员变量分别对应消费者、消费的队列和消费者的标志。

7. ListenerContainerConsumerFailedEvent

Published when a listener consumer fails.

consumer启动或消费过程中出现异常时发布,具体方法:

  1. org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer#publishConsumerFailedEvent(String, boolean, Throwable)
  2. org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#publishConsumerFailedEvent(String, boolean, Throwable)
/**
 * Construct an instance with the provided arguments.
 * @param source the source container.
 * @param reason the reason.
 * @param throwable the throwable.
 * @param fatal true if the startup failure was fatal (will not be retried).
    */
public ListenerContainerConsumerFailedEvent(
  Object source, String reason,Throwable throwable, boolean fatal) {
  super(source);
  this.reason = reason;
  this.fatal = fatal;
  this.throwable = throwable;
}

成员变量分别对应容器、失败原因、是否致命(是否会进行重试)和抛出的异常。

8. ListenerContainerConsumerTerminatedEvent

Published when a listener consumer is terminated.

从名称上也能看出,ListenerContainerConsumerTerminatedEvent与前面的ListenerContainerConsumerFailedEvent类似,对应的是非异常导致的消费者被中断,比如取消,同样也在org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer#publishConsumerFailedEvent(String, boolean, Throwable) 被发布。

public ListenerContainerConsumerTerminatedEvent(Object source, String reason) {
  super(source);
  this.reason = reason;
}

成员变量对应容器和中断原因。

9. ListenerContainerIdleEvent

An event that is emitted when a container is idle if the container is configured to do so.

消费者空闲一定时间后发布,具体方法: org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer#publishIdleContainerEvent(long)

public ListenerContainerIdleEvent(Object source, long idleTime, String id, String... queueNames) {
  super(source);
  this.idleTime = idleTime;
  this.listenerId = id;
  this.queueNames = Arrays.asList(queueNames);
}

成员变量对应容器、空闲时间、监听器id、队列名称集。

10. RabbitAdminEvent

Base class for admin events.

RabbitAdminEvent是RabbitAdmin相关的时间的基类,有1个子类DeclarationExceptionEvent。

11. DeclarationExceptionEvent

Application event published when a declaration exception occurs.

声明Declarable出现异常时发布,如声明队列、路由器、绑定关系等,具体方法:org.springframework.amqp.rabbit.core.RabbitAdmin#logOrRethrowDeclarationException(Declarable, String, Throwable)

public DeclarationExceptionEvent(Object source, Declarable declarable, Throwable t) {
  super(source);
  this.declarable = declarable;
  this.throwable = t;
}

成员变量分别对应声明的RabbitAdmin,声明的内容和抛出的异常。