MQ中消息重复消费及解决

/ MQ / 没有评论 / 1618浏览

MQ中消息重复消费及解决

重复消费原因

因为在网络延迟的情况下,消息重复发送的问题不可避免的发生,如果非要实现消息不可重复发送,那基本太难,因为网络环境无法预知,还会使程序复杂度加大,因此默认允许消息重复发送。 只要通过网络交换数据,就无法避免这个问题。所以解决这个问题的办法就是绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?

重复消费问题例子

RabbitMQ、RocketMQ、Kafka,都有可能会出现消息重复消费的问题,正常。因为这问题通常不是 MQ 自己保证的,是由我们开发来保证的。

RabbitMQ

RabbitMQ 不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。

AMQP 消费者确认机制

AMQP 定义了消费者确认机制(message ack),如果一个消费者应用崩溃掉(此时连接会断掉,broker 会得知),但是 broker 尚未获得 ack,那么消息会被重新放入队列。所以 AMQP 提供的是“至少一次交付”(at-least-once delivery),异常情况下,消息会被重复消费,此时业务要实现幂等性(重复消息处理)。

消息重复发布:不存在,因为 AMQP 定义了事务(tx transaction)来确保生产消息被 broker 接收并成功入队。TX 事务是阻塞调用,生产者需等待 broker 写磁盘后返回的确认,之后才能继续发送消息。事务提交失败时(如 broker 宕机场景),broker 并不保证提交的消息全部入队。RabbitMQ 使用 confirm 机制来优化生产消息的确认(可以持续发布消息,但会批量回复确认)。 消息重复消费:AMQP 提供的是“至少一次交付”(at-least-once delivery),异常情况下,消息会被重复消费,此时业务要实现幂等性(重复消息处理)。

Kafka

Kafka 实际上有个 offset 的概念,就是每个消息写进去,都有一个 offset,代表消息的序号,然后 consumer 消费了数据之后,每隔一段时间(定时定期),会把自己消费过的消息的 offset 提交一下。 假如,有这么个场景。数据 1/2/3 依次进入 kafka,kafka 会给这三条数据每条分配一个 offset,代表这条数据的序号,分配的 offset 依次是 152,153,154。消费者从 kafka 去消费的时候,也是按照这个顺序去消费。假如当消费者消费了 offset=153 的这条数据,刚准备去提交 offset 到 zookeeper,此时消费者进程被重启了。那么此时消费过的数据 1,2 的 offset 并没有提交,kafka 也就不知道你已经消费了 offset=153 这条数据。那么重启之后,消费者会找 kafka 把上次消费到的那个地方后面的数据继续传递过来。数据 1,2 再次被消费。

如果消费者干的事儿是拿一条数据就往数据库里写一条,会导致说,你可能就把数据 1,2 在数据库里插入了 2 次,那么数据就错啦。其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。

解决方案

消费端处理消息的业务逻辑保持幂等性。幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。

  1. 比如,你拿到这个消息做数据库的insert操作。那就容易了,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
  2. 再比如,你拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。
  3. 如果上面两种情况还不行,上大招。准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。