Spring-RabbitMQ Consumer踩坑经历

/ MQ / 没有评论 / 2264浏览

Spring-RabbitMQ Consumer踩坑经历

问题1: Consumer假死,无真正的消费能力

背景 spring-rabbit 版本变更至 1.6.2.RELEASE

现象 consumer数量正常,mq控制面板的 prefetch 参数始终是1, 消息无法正常ack, 队列处于假死状态 系统报异常org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException

效果图

[2018-09-09 10:31:27.27]RuntimeException-org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException [ ERROR] [ Elog ]
Execution of Rabbit message listener failed. org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:870)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:780)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.AmqpIllegalStateException: No default listener method specified: Either specify a non-null value for the 'defaultListenerMethod' property or override the 'getListenerMethodName' method.
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:291)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777)
... 10 more

异常原因 rabbit:listener的ID 和 真实consumer的ID 冲突,导致真实的consumer Bean无效,找不到接受消息的方法,而rabbitmq在与clien端通信过程中发生异常,会停止消费。

<bean id="consumer_1" class="me.ele.Consumer1"/>
<rabbit:listener-container connection-factory="galaxyConnectionFactory" acknowledge="manual" concurrency="16" >
    <rabbit:listener queues="queue_1" ref="consumer_1" id="consumer_1" />
</rabbit:listener-container

解决办法

删除rabbit:listener 的ID属性

问题2:consumer数量异常

背景 原服务有6个队列,每个队列起10个consumer, task-executor线程池设置为90, 后因数据量增加,发现消费能力不足,决定增加consumer数量,调整listener-container的concurrency为20,重启服务器。

现象 部分队列consumer数量不足,缺失项始终为xml中声明在后的队列

效果图

异常定位 回退concurrency参数为3异常消失,观察异常现场,发现consumer消失队列有先后顺序之分,且consumer数量存在上限值为90,猜测是收参数限制,检查配置参数如下:

    <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"
      id="taskExecutor">
        <!--核心线程数 -->
        <property name="corePoolSize" value="16"/>
        <!--最大线程数 -->
        <property name="maxPoolSize" value="16"/>
        <property name="queueCapacity" value="500"/>
        <!--线程池维护线程所允许的空闲时间 -->
        <property name="keepAliveSeconds" value="60"/>
        <!--线程池对拒绝任务(无线程可用)的处理策略 -->
        <property name="rejectedExecutionHandler">
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
        </property>
        <property name="WaitForTasksToCompleteOnShutdown" value="true"/>
    </bean>
    
    <rabbit:listener-container connection-factory="monitorConnectionFactory" 
            acknowledge="manual" task-executor="taskExecutor" prefetch="10" concurrency="20">
        <rabbit:listener queues="queue_1" ref="consumer_1"/>
        <rabbit:listener queues="queue_2" ref="consumer_2"/>
        <rabbit:listener queues="queue_3" ref="consumer_3"/>
        <rabbit:listener queues="queue_4" ref="consumer_4"/>
        <rabbit:listener queues="queue_5" ref="consumer_5"/>
        <rabbit:listener queues="queue_6" ref="consumer_6"/>
    </rabbit:listener-container>
    
复制代码

异常原因 经排查对比,发现上限值与task-executor ThreadPoolTaskExecutor参数corePoolSize、maxPoolSize极为接近,查询相关资料发现,多个queue的consumer会共用taskExecutor的线程池数量,如果线程池数量不足,consumer无法创建

解决办法 增大task-executor corePoolSize和maxPoolSize的值为200,重启服务,解决。

总结

  1. 在spring容器管理中,ID是一个对象的引用,必须仅且只有一个无重复的ID, 包含Bean及其他自定义标签(如rabbit:listener), 即使在不同文件也需要注意。
  2. 对共享bean的使用,需额外注意共享bean的资源分配。