RabbitMq从入门到精通-ConfirmCallback ReturnCallback 区别及使用

/ MQ / 没有评论 / 1694浏览

RabbitMq从入门到精通-ConfirmCallback ReturnCallback 区别及使用

消息发送确认

发送的消息怎么样才算失败或成功?如何确认?当消息无法路由到队列时,确认消息路由失败。消息成功路由时,当需要发送的队列都发送成功后,进行确认消息,对于持久化队列意味着写入磁盘,对于镜像队列意味着所有镜像接收成功.

ConfirmCallback

通过实现 ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中

@Component
public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback{

  @Autowired
  private RabbitTemplate rabbitTemplate;

  @PostConstruct
  public void init(){
    rabbitTemplate.setConfirmCallback(this); //指定 ConfirmCallback
  }

  @Override
  public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    System.out.println("消息唯一标识:"+correlationData);
    System.out.println("确认结果:"+ack);
    System.out.println("失败原因:"+cause);
  }
}

还需要在配置文件添加配置

spring:
  rabbitmq:
    publisher-confirms: true 

ReturnCallback

通过实现 ReturnCallback 接口,启动消息失败返回,比如路由不到队列时触发回调

@Component
public class RabbitTemplateConfig implements RabbitTemplate.ReturnCallback{

  @Autowired
  private RabbitTemplate rabbitTemplate;

  @PostConstruct
  public void init(){
    rabbitTemplate.setReturnCallback(this);             //指定 ReturnCallback
  }

  @Override
  public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    System.out.println("消息主体 message : "+message);
    System.out.println("消息主体 message : "+replyCode);
    System.out.println("描述:"+replyText);
    System.out.println("消息使用的交换器 exchange : "+exchange);
    System.out.println("消息使用的路由键 routiAng : "+routingKey);
  }
}

还需要在配置文件添加配置

spring:
  rabbitmq:
    publisher-returns: true 

消息接收确认

消息消费者如何通知 Rabbit 消息消费成功?

确认消息(局部方法处理消息)

默认情况下消息消费者是自动 ack (确认)消息的,如果要手动 ack(确认)则需要修改确认模式为 manual

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual

或在 RabbitListenerContainerFactory 中进行开启手动 ack

@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
  SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  factory.setConnectionFactory(connectionFactory);
  factory.setMessageConverter(new Jackson2JsonMessageConverter());
  factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);             //开启手动 ack
  return factory;
}

确认消息

@RabbitHandler
public void processMessage2(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
  System.out.println(message);
  try {
    channel.basicAck(tag,false);            // 确认消息
  } catch (IOException e) {
    e.printStackTrace();
  }
}

需要注意的 basicAck 方法需要传递两个参数

手动否认、拒绝消息

发送一个 header 中包含 error 属性的消息 1

消费者获取消息时检查到头部包含 error 则 nack 消息

@RabbitHandler
public void processMessage2(String message, Channel channel,@Headers Map<String,Object> map) {
  System.out.println(message);
  if (map.get("error")!= null){
    System.out.println("错误的消息");
    try {
      channel.basicNack((Long)map.get(AmqpHeaders.DELIVERY_TAG),false,true);      //否认消息
      return;
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
  try {
    channel.basicAck((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);            //确认消息
  } catch (IOException e) {
    e.printStackTrace();
  }
}

此时控制台重复打印,说明该消息被 nack 后一直重新入队列然后一直重新消费

hello
错误的消息
hello
错误的消息
hello
错误的消息
hello
错误的消息

也可以拒绝该消息,消息会被丢弃,不会重回队列

channel.basicReject((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);        //拒绝消息

确认消息(全局处理消息)

自动确认涉及到一个问题就是如果在处理消息的时候抛出异常,消息处理失败,但是因为自动确认而导致 Rabbit 将该消息删除了,造成消息丢失

@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
  SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
  container.setConnectionFactory(connectionFactory);
  container.setQueueNames("consumer_queue");                 // 监听的队列
  container.setAcknowledgeMode(AcknowledgeMode.NONE);     // NONE 代表自动确认
  container.setMessageListener((MessageListener) message -> {         //消息监听处理
    System.out.println("====接收到消息=====");
    System.out.println(new String(message.getBody()));
    //相当于自己的一些消费逻辑抛错误
    throw new NullPointerException("consumer fail");
  });
  return container;
}

手动确认消息

@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
  SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
  container.setConnectionFactory(connectionFactory);
  container.setQueueNames("consumer_queue");              // 监听的队列
  container.setAcknowledgeMode(AcknowledgeMode.MANUAL);        // 手动确认
  container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {      //消息处理
    System.out.println("====接收到消息=====");
    System.out.println(new String(message.getBody()));
    if(message.getMessageProperties().getHeaders().get("error") == null){
      channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
      System.out.println("消息已经确认");
    }else {
      //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
      channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
      System.out.println("消息拒绝");
    }
  });
  return container;
}

AcknowledgeMode 除了 NONE 和 MANUAL 之外还有 AUTO ,它会根据方法的执行情况来决定是否确认还是拒绝(是否重新入queue)

@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
  SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
  container.setConnectionFactory(connectionFactory);
  container.setQueueNames("consumer_queue");              // 监听的队列
  container.setAcknowledgeMode(AcknowledgeMode.AUTO);     // 根据情况确认消息
  container.setMessageListener((MessageListener) (message) -> {
    System.out.println("====接收到消息=====");
    System.out.println(new String(message.getBody()));
    //抛出NullPointerException异常则重新入队列
    //throw new NullPointerException("消息消费失败");
    //当抛出的异常是AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且requeue=false
    //throw new AmqpRejectAndDontRequeueException("消息消费失败");
    //当抛出ImmediateAcknowledgeAmqpException异常,则消费者会被确认
    throw new ImmediateAcknowledgeAmqpException("消息消费失败");
  });
  return container;
}

消息可靠总结

  1. exchange要持久化
  2. queue要持久化
  3. message要持久化
  1. 启动消费返回(@ReturnList注解,生产者就可以知道哪些消息没有发出去)
  2. 生产者和Server(broker)之间的消息确认
  3. 消费者和Server(broker)之间的消息确认