Springboot RabbitMq源码解析之消息发送

/ MQ / 没有评论 / 1436浏览

Springboot RabbitMq源码解析之消息发送

前章

1. RabbitTemplate#convertAndSend

rabbitmq的发送可以通过AmqpTemplate接口实现,AmqpTemplate接口下重载了许多send和convertAndSend方法。send用于发送Message格式的数据,convertAndSend方法先将对象转化成Message数据后进行发送。

从前面的配置类中可以看到,默认使用的AmqpTemplate是RabbitTemplate,以RabbitTemplate#convertAndSend(String, String, Object)

@Override
public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData)
  throws AmqpException {
  send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
}

然后委托MessageConverter类将对象转化成Message数据

protected Message convertMessageIfNecessary(final Object object) {
  if (object instanceof Message) {
    return (Message) object;
  }
  return getRequiredMessageConverter().toMessage(object, new MessageProperties());
}

2. SimpleMessageConverter#createMessage

MessageConverter接口只有两个方法,分别用于将对象转换成Message和将Message还原回对象。

RabbitTemplate#messageConvert的默认值是SimpleMessageConverter,其toMessage方法继承自父类AbstractMessageConverter,具体逻辑如下,其核心逻辑在于SimpleMessageConverter#createMessage中。

@Override
public final Message toMessage(Object object, MessageProperties messageProperties)
  throws MessageConversionException {
  if (messageProperties == null) {
    messageProperties = new MessageProperties();
  }
  Message message = createMessage(object, messageProperties);
  messageProperties = message.getMessageProperties();
  if (this.createMessageIds && messageProperties.getMessageId() == null) {
    messageProperties.setMessageId(UUID.randomUUID().toString());
  }
  return message;
}
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
  byte[] bytes = null;
  if (object instanceof byte[]) {
    bytes = (byte[]) object;
    messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
  }
  else if (object instanceof String) {
    try {
      bytes = ((String) object).getBytes(this.defaultCharset);
    }
    catch (UnsupportedEncodingException e) {
      throw new MessageConversionException(
        "failed to convert to Message content", e);
    }
    messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
    messageProperties.setContentEncoding(this.defaultCharset);
  }
  else if (object instanceof Serializable) {
    try {
      bytes = SerializationUtils.serialize(object);
    }
    catch (IllegalArgumentException e) {
      throw new MessageConversionException(
        "failed to convert to serialized Message content", e);
    }
    messageProperties.setContentType(
      MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);
  }
  if (bytes != null) {
    messageProperties.setContentLength(bytes.length);
    return new Message(bytes, messageProperties);
  }
  throw new IllegalArgumentException(
    getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
}

SimpleMessageConverter#toMessage方法通过new Message(bytes, messageProperties)生成返回的对象。因此在转化对象时需要考虑的是如何将对象转化成字节数组以及对应的contentType。

转换成字节数组的方式根据对象类型有4种场景:

  1. 字节数组:不需要转化
  2. 字符串:直接使用String#getBytes
  3. 可序列化对象:进行序列化
  4. 其它:抛出异常

3. RabbitTemplate#send

RabbitTemplate内所有重载的send和convertAndSend方法都通过调用public void send(final String exchange, final String routingKey, final Message message, final CorrelationData correlationData) throws AmqpException来进行消息的发送,而这个方法有委托execute方法处理逻辑。

@Override
public void send(final String exchange, final String routingKey,
                 final Message message, final CorrelationData correlationData)
  throws AmqpException {
  execute(channel -> {
    doSend(channel, exchange, routingKey, message, RabbitTemplate.this.returnCallback != null
           && RabbitTemplate.this.mandatoryExpression.getValue(
             RabbitTemplate.this.evaluationContext, message, Boolean.class),
           correlationData);
    return null;
  }, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
}
private <T> T execute(final ChannelCallback<T> action, final ConnectionFactory connectionFactory) {
  if (this.retryTemplate != null) {
    try {
      return this.retryTemplate.execute(
        (RetryCallback<T, Exception>) context -> doExecute(action, connectionFactory),
        (RecoveryCallback<T>) this.recoveryCallback);
    }
    catch (Exception e) {
      if (e instanceof RuntimeException) {
        throw (RuntimeException) e;
      }
      throw RabbitExceptionTranslator.convertRabbitAccessException(e);
    }
  }
  else {
    return doExecute(action, connectionFactory);
  }
}

可以看到按照是否重试分两种场景,但核心逻辑都是 doExecute(action, connectionFactory)。

4. RabbitTemplate#doExecute

private <T> T doExecute(ChannelCallback<T> action, ConnectionFactory connectionFactory) {
  Assert.notNull(action, "Callback object must not be null");
  Channel channel = null;
  boolean invokeScope = false;
  // No need to check the thread local if we know that no invokes are in process
  if (this.activeTemplateCallbacks.get() > 0) {
    channel = this.dedicatedChannels.get();
  }
  RabbitResourceHolder resourceHolder = null;
  Connection connection = null; // NOSONAR (close)
  if (channel == null) {
    if (isChannelTransacted()) {
      resourceHolder = ConnectionFactoryUtils.
        getTransactionalResourceHolder(connectionFactory, true, this.usePublisherConnection);
      channel = resourceHolder.getChannel();
      if (channel == null) {
        ConnectionFactoryUtils.releaseResources(resourceHolder);
        throw new IllegalStateException("Resource holder returned a null channel");
      }
    }
    else {
      connection = ConnectionFactoryUtils.createConnection(connectionFactory,
                                                           this.usePublisherConnection); // NOSONAR - RabbitUtils closes
      if (connection == null) {
        throw new IllegalStateException("Connection factory returned a null connection");
      }
      try {
        channel = connection.createChannel(false);
        if (channel == null) {
          throw new IllegalStateException("Connection returned a null channel");
        }
      }
      catch (RuntimeException e) {
        RabbitUtils.closeConnection(connection);
        throw e;
      }
    }
  }
  else {
    invokeScope = true;
  }
  try {
    if (this.confirmsOrReturnsCapable == null) {
      determineConfirmsReturnsCapability(connectionFactory);
    }
    if (this.confirmsOrReturnsCapable) {
      addListener(channel);
    }
    if (logger.isDebugEnabled()) {
      logger.debug(
        "Executing callback " + action.getClass().getSimpleName() + " on RabbitMQ Channel: " + channel);
    }
    return action.doInRabbit(channel);
  }
  catch (Exception ex) {
    if (isChannelLocallyTransacted(channel)) {
      resourceHolder.rollbackAll();
    }
    throw convertRabbitAccessException(ex);
  }
  finally {
    if (!invokeScope) {
      if (resourceHolder != null) {
        ConnectionFactoryUtils.releaseResources(resourceHolder);
      }
      else {
        RabbitUtils.closeChannel(channel);
        RabbitUtils.closeConnection(connection);
      }
    }
  }
}

除去其中的一些校验和异常抛出,其中的逻辑可以分为3步:

  1. 创建连接connection和渠道channel,根据是否开启事务机制,创建连接和渠道的方式不同。如果开启了事务,根据ResourceHolder获取channel。如果没有开启事务,创建connection,然后根据connection创建channel。
  2. 执行ChannelCallback#doInRabbit方法,这里的方法逻辑在RabbitTemplate#send发送中通过Lambda表达式传入。
  3. 关闭connection和channel。

5. RabbitTemplate#doSend

public void doSend(
  Channel channel, String exchange, String routingKey, Message message,
  boolean mandatory, CorrelationData correlationData) throws Exception {
  if (exchange == null) {
    // try to send to configured exchange
    exchange = this.exchange;
  }
  if (routingKey == null) {
    // try to send to configured routing key
    routingKey = this.routingKey;
  }
  if (logger.isDebugEnabled()) {
    logger.debug("Publishing message " + message
                 + "on exchange [" + exchange + "], routingKey = [" + routingKey + "]");
  }

  Message messageToUse = message;
  MessageProperties messageProperties = messageToUse.getMessageProperties();
  if (mandatory) {
    messageProperties.getHeaders()
      .put(PublisherCallbackChannel.RETURN_CORRELATION_KEY, this.uuid);
  }
  if (this.beforePublishPostProcessors != null) {
    for (MessagePostProcessor processor : this.beforePublishPostProcessors) {
      messageToUse = processor.postProcessMessage(messageToUse, correlationData);
    }
  }
  setupConfirm(channel, messageToUse, correlationData);
  if (this.userIdExpression != null && messageProperties.getUserId() == null) {
    String userId = this.userIdExpression.getValue(this.evaluationContext, messageToUse, String.class);
    if (userId != null) {
      messageProperties.setUserId(userId);
    }
  }
  sendToRabbit(channel, exchange, routingKey, mandatory, messageToUse);
  // Check if commit needed
  if (isChannelLocallyTransacted(channel)) {
    // Transacted channel created by this template -> commit.
    RabbitUtils.commitIfNecessary(channel);
  }
}
private void setupConfirm(Channel channel, Message message, CorrelationData correlationData) {
if (this.confirmCallback != null && channel instanceof PublisherCallbackChannel) {
  PublisherCallbackChannel publisherCallbackChannel = (PublisherCallbackChannel) channel;
  correlationData = this.correlationDataPostProcessor != null
    ? this.correlationDataPostProcessor.postProcess(message, correlationData)
    : correlationData;
  publisherCallbackChannel.addPendingConfirm(
    this, channel.getNextPublishSeqNo(),
    new PendingConfirm(correlationData, System.currentTimeMillis()));
}
}
protected void sendToRabbit(
  Channel channel, String exchange, String routingKey, boolean mandatory,
  Message message) throws IOException {
  BasicProperties convertedMessageProperties = this.messagePropertiesConverter
    .fromMessageProperties(message.getMessageProperties(), this.encoding);
  channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
}

主要有以下步骤:

  1. 数据组装,在Message结构的基础上进行组装, 根据mandatory属性确定是否在header中加上相应的key-value。 根据beforePublishPostProcessors进行message的数据处理。setupConfirm方法中根据correlationDataPostProcessor进行correlationData的数据处理。 未配置userId为空时根据userIdExpression加上userId属性。
  2. 消息的发送,sendToRabbit中委托channel#basicPublish进行消息发送
  3. 提交, 如果channel配置了本地事务,通过RabbitUtils.commitIfNecessary执行channel#txCommit进行提交。

6. 消息发送确认

对应的rabbit配置:spring.rabbitmq.publisher-confirms: true

//RabbitAutConfiguration.RabbitConnectionFactoryCreator#rabbitConnectionFactory
map.from(properties::isPublisherConfirms).to(factory::setPublisherConfirms);

rabbit支持消息发送的确认机制,当消息发送到broker时,broker会做出应答,确认消息已经被正确发送,但不确保消息被有效消费。

RabbitTemplate#setupConfirm方法用于处理CorrelationData参数,而该参数在rabbit中用于消息发送的确认机制。在该方法中通过publisherCallbackChannel.addPendingConfirm(this, channel.getNextPublishSeqNo(), new PendingConfirm(correlationData, System.currentTimeMillis()));添加了RabbitTemplate本身作为消息确认的监听器。

//PublisherCallbackChannelImpl#addPendingConfirm
@Override
public synchronized void addPendingConfirm(Listener listener, long seq, PendingConfirm pendingConfirm) {
  SortedMap<Long, PendingConfirm> pendingConfirmsForListener = this.pendingConfirms.get(listener);
  Assert.notNull(pendingConfirmsForListener,
                 "Listener not registered: " + listener + " " + this.pendingConfirms.keySet());
  pendingConfirmsForListener.put(seq, pendingConfirm);
  this.listenerForSeq.put(seq, listener);
}

broker做出应答后,进入ConfiremListener的handleAck或HandleNack方法

//PublisherCallbackChannelmpl
@Override
public void handleAck(long seq, boolean multiple)
  throws IOException {
  if (this.logger.isDebugEnabled()) {
    this.logger.debug(this.toString() + " PC:Ack:" + seq + ":" + multiple);
  }
  this.processAck(seq, true, multiple, true);
}

@Override
public void handleNack(long seq, boolean multiple)
  throws IOException {
  if (this.logger.isDebugEnabled()) {
    this.logger.debug(this.toString() + " PC:Nack:" + seq + ":" + multiple);
  }
  this.processAck(seq, false, multiple, true);
}

handleAck和handleNack都委托了processAck方法进行处理

private synchronized void processAck(long seq, boolean ack, boolean multiple, boolean remove) {
  if (multiple) {
    /*
		  * Piggy-backed ack - extract all Listeners for this and earlier
		  * sequences. Then, for each Listener, handle each of it's acks.
		  * Finally, remove the sequences from listenerForSeq.
		  */
    Map<Long, Listener> involvedListeners = this.listenerForSeq.headMap(seq + 1);
    // eliminate duplicates
    Set<Listener> listeners = new HashSet<Listener>(involvedListeners.values());
    for (Listener involvedListener : listeners) {
      // find all unack'd confirms for this listener and handle them
      SortedMap<Long, PendingConfirm> confirmsMap = this.pendingConfirms.get(involvedListener);
      if (confirmsMap != null) {
        Map<Long, PendingConfirm> confirms = confirmsMap.headMap(seq + 1);
        Iterator<Entry<Long, PendingConfirm>> iterator = confirms.entrySet().iterator();
        while (iterator.hasNext()) {
          Entry<Long, PendingConfirm> entry = iterator.next();
          PendingConfirm value = entry.getValue();
          iterator.remove();
          doHandleConfirm(ack, involvedListener, value);
        }
      }
    }
    List<Long> seqs = new ArrayList<Long>(involvedListeners.keySet());
    for (Long key : seqs) {
      this.listenerForSeq.remove(key);
    }
  }
  else {
    Listener listener = this.listenerForSeq.remove(seq);
    if (listener != null) {
      SortedMap<Long, PendingConfirm> confirmsForListener = this.pendingConfirms.get(listener);
      PendingConfirm pendingConfirm;
      if (remove) {
        pendingConfirm = confirmsForListener.remove(seq);
      }
      else {
        pendingConfirm = confirmsForListener.get(seq);
      }
      if (pendingConfirm != null) {
        doHandleConfirm(ack, listener, pendingConfirm);
      }
    }
    else {
      if (this.logger.isDebugEnabled()) {
        this.logger.debug(this.delegate.toString() + " No listener for seq:" + seq);
      }
    }
  }
}

private void doHandleConfirm(boolean ack, Listener listener, PendingConfirm pendingConfirm) {
  try {
    if (listener.isConfirmListener()) {
      if (this.logger.isDebugEnabled()) {
        this.logger.debug("Sending confirm " + pendingConfirm);
      }
      listener.handleConfirm(pendingConfirm, ack);
    }
  }
  catch (Exception e) {
    this.logger.error("Exception delivering confirm", e);
  }
}

最终还是交给Listener#handleConfirm处理

//RabbitTemplate
@Override
public void handleConfirm(PendingConfirm pendingConfirm, boolean ack) {
  if (this.confirmCallback != null) {
    this.confirmCallback.confirm(pendingConfirm.getCorrelationData(), ack, pendingConfirm.getCause());
  }
  else {
    if (logger.isDebugEnabled()) {
      logger.warn("Confirm received but no callback available");
    }
  }
}

用户可以通过ConfirmCallback的实现类并将其设置为RabbitTemplate的confirmCallback属性来实现自定义消息确认后的操作。

7. 消息不可达

在发送消息之后可能会存在消息不可达的场景:例如queue不存在的情况。 对应的配置文件内容:spring.rabbitmq.publisher-returns: true或者spring.rabbit.template.mandatory:true

//RabbitAuoConfiguration.RabbitConnectionFactoryCreator#rabbitConnectionFactory
map.from(properties::isPublisherReturns).to(factory::setPublisherReturns);
//RabbitAuotConfiguration#determineMandatoryFlag
private boolean determineMandatoryFlag() {
  Boolean mandatory = this.properties.getTemplate().getMandatory();
  return (mandatory != null ? mandatory : this.properties.isPublisherReturns());
}

rabbit支持对不可大的消息进行监听,只需要将mandatory置为true。在发送过程中RabbitTemplate#doSend中也针对mandatory为true的场景做了特殊处理。

if (mandatory) {
  messageProperties.getHeaders().put(PublisherCallbackChannel.RETURN_CORRELATION_KEY, this.uuid);
}

通过ReturnListener#handleReturn来监听处理

//PublisherCallbackChannelImpl#handleReturn
@Override
public void handleReturn(int replyCode,
                         String replyText,
                         String exchange,
                         String routingKey,
                         AMQP.BasicProperties properties,
                         byte[] body) throws IOException {
  String uuidObject = properties.getHeaders().get(RETURN_CORRELATION_KEY).toString();
  Listener listener = this.listeners.get(uuidObject);
  if (listener == null || !listener.isReturnListener()) {
    if (this.logger.isWarnEnabled()) {
      this.logger.warn("No Listener for returned message");
    }
  }
  else {
    listener.handleReturn(replyCode, replyText, exchange, routingKey, properties, body);
  }
}

然后进入Listener#handleReturn

//RabbitTemplate#handleReturn
@Override
public void handleReturn(int replyCode,
                         String replyText,
                         String exchange,
                         String routingKey,
                         BasicProperties properties,
                         byte[] body)
  throws IOException {

  ReturnCallback returnCallback = this.returnCallback;
  if (returnCallback == null) {
    Object messageTagHeader = properties.getHeaders().remove(RETURN_CORRELATION_KEY);
    if (messageTagHeader != null) {
      String messageTag = messageTagHeader.toString();
      final PendingReply pendingReply = this.replyHolder.get(messageTag);
      if (pendingReply != null) {
        returnCallback = (message, replyCode1, replyText1, exchange1, routingKey1) ->
          pendingReply.returned(new AmqpMessageReturnedException(
            "Message returned",
            message, replyCode1, replyText1, exchange1, routingKey1));
      }
      else if (logger.isWarnEnabled()) {
        logger.warn("Returned request message but caller has timed out");
      }
    }
    else if (logger.isWarnEnabled()) {
      logger.warn("Returned message but no callback available");
    }
  }
  if (returnCallback != null) {
    properties.getHeaders().remove(PublisherCallbackChannel.RETURN_CORRELATION_KEY);
    MessageProperties messageProperties = this.messagePropertiesConverter.toMessageProperties(
      properties, null, this.encoding);
    Message returnedMessage = new Message(body, messageProperties);
    returnCallback.returnedMessage(returnedMessage,
                                   replyCode, replyText, exchange, routingKey);
  }
}

8. 发送端确认机制总结