Spring-RabbitMQ Consumer踩坑经历
问题1: Consumer假死,无真正的消费能力
背景 spring-rabbit 版本变更至 1.6.2.RELEASE
现象 consumer数量正常,mq控制面板的 prefetch 参数始终是1, 消息无法正常ack, 队列处于假死状态 系统报异常org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException
[2018-09-09 10:31:27.27]RuntimeException-org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException [ ERROR] [ Elog ]
Execution of Rabbit message listener failed. org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:870)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:780)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.AmqpIllegalStateException: No default listener method specified: Either specify a non-null value for the 'defaultListenerMethod' property or override the 'getListenerMethodName' method.
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:291)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777)
... 10 more
异常原因 rabbit:listener的ID 和 真实consumer的ID 冲突,导致真实的consumer Bean无效,找不到接受消息的方法,而rabbitmq在与clien端通信过程中发生异常,会停止消费。
<bean id="consumer_1" class="me.ele.Consumer1"/>
<rabbit:listener-container connection-factory="galaxyConnectionFactory" acknowledge="manual" concurrency="16" >
<rabbit:listener queues="queue_1" ref="consumer_1" id="consumer_1" />
</rabbit:listener-container
解决办法
删除rabbit:listener 的ID属性
问题2:consumer数量异常
背景 原服务有6个队列,每个队列起10个consumer, task-executor线程池设置为90, 后因数据量增加,发现消费能力不足,决定增加consumer数量,调整listener-container的concurrency为20,重启服务器。
现象 部分队列consumer数量不足,缺失项始终为xml中声明在后的队列
异常定位 回退concurrency参数为3异常消失,观察异常现场,发现consumer消失队列有先后顺序之分,且consumer数量存在上限值为90,猜测是收参数限制,检查配置参数如下:
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"
id="taskExecutor">
<!--核心线程数 -->
<property name="corePoolSize" value="16"/>
<!--最大线程数 -->
<property name="maxPoolSize" value="16"/>
<property name="queueCapacity" value="500"/>
<!--线程池维护线程所允许的空闲时间 -->
<property name="keepAliveSeconds" value="60"/>
<!--线程池对拒绝任务(无线程可用)的处理策略 -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
</property>
<property name="WaitForTasksToCompleteOnShutdown" value="true"/>
</bean>
<rabbit:listener-container connection-factory="monitorConnectionFactory"
acknowledge="manual" task-executor="taskExecutor" prefetch="10" concurrency="20">
<rabbit:listener queues="queue_1" ref="consumer_1"/>
<rabbit:listener queues="queue_2" ref="consumer_2"/>
<rabbit:listener queues="queue_3" ref="consumer_3"/>
<rabbit:listener queues="queue_4" ref="consumer_4"/>
<rabbit:listener queues="queue_5" ref="consumer_5"/>
<rabbit:listener queues="queue_6" ref="consumer_6"/>
</rabbit:listener-container>
复制代码
异常原因 经排查对比,发现上限值与task-executor ThreadPoolTaskExecutor参数corePoolSize、maxPoolSize极为接近,查询相关资料发现,多个queue的consumer会共用taskExecutor的线程池数量,如果线程池数量不足,consumer无法创建
解决办法 增大task-executor corePoolSize和maxPoolSize的值为200,重启服务,解决。
总结
- 在spring容器管理中,ID是一个对象的引用,必须仅且只有一个无重复的ID, 包含Bean及其他自定义标签(如rabbit:listener), 即使在不同文件也需要注意。
- 对共享bean的使用,需额外注意共享bean的资源分配。
本文由 创作,采用 知识共享署名4.0 国际许可协议进行许可。本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名。最后编辑时间为: 2021/06/22 08:59