Springboot RabbitMq源码解析之消费者容器SimpleMessageListenerContainer
前文
- Springboot RabbitMq源码解析之配置类
- Springboot RabbitMq源码解析之消息发送
一、MessageListenerContainer
在Springboot RabbitMq中,接口MessageListenerContainer负责接收并处理消息。
public interface MessageListenerContainer extends SmartLifecycle {
/**
* Setup the message listener to use. Throws an {@link IllegalArgumentException}
* if that message listener type is not supported.
* @param messageListener the {@code object} to wrapped to the {@code MessageListener}.
*/
void setupMessageListener(Object messageListener);
/**
* @return the {@link MessageConverter} that can be used to
* convert {@link org.springframework.amqp.core.Message}, if any.
*/
MessageConverter getMessageConverter();
}
从前面的配置类RabbitAnnotationDrivenConfiguration中也能看到,springboot主要支持simple和direct两种方式进行消息的消费,在这里,以SimpleMessageListenerContainer为例查看消息是如何被接收处理的。
二、SimpleMessageListenerContainer启动
1. AbstractMessageListenerContainer#start
首先,MessageListenerContainer继承了SmartLifecycle,因此我们从start方法开始作为介入点进入源码进行查看。
/**
* Start this container.
* @see #doStart
*/
@Override
public void start() {
if (isRunning()) {
return;
}
if (!this.initialized) {
synchronized (this.lifecycleMonitor) {
if (!this.initialized) {
afterPropertiesSet();
this.initialized = true;
}
}
}
try {
if (logger.isDebugEnabled()) {
logger.debug("Starting Rabbit listener container.");
}
configureAdminIfNeeded();
checkMismatchedQueues();
doStart();
}
catch (Exception ex) {
throw convertRabbitAccessException(ex);
}
}
SimpleMessageListenerContainer本身并没有重写start方法,其start方法从父类AbstractMessageListenerContainer继承而来,在AbstractMessageContainer#start方法中,主要进行了包括afterPropertiesSet, configureAdminIfNeeded, checkMismatchedQueues方法在内的一些初始化工作,剩下了工作委托给doStart方法进行处理,方便扩展。
2. SimpleMessageListenerContainer#doStart
SimpleMessageListenerContainer#start方法的目的是校验通过后将消费者consumer提交到线程池taskExecutors,主要逻辑分为3步:
- messageListener是ListenerContainerAware时的场景的处理,检查queueNames是否和messsgeListener的expectedQueueNames一致。
if (getMessageListener() instanceof ListenerContainerAware) {
Collection<String> expectedQueueNames = ((ListenerContainerAware) getMessageListener()).expectedQueueNames();
if (expectedQueueNames != null) {
String[] queueNames = getQueueNames();
Assert.state(expectedQueueNames.size() == queueNames.length,
"Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
+ Arrays.asList(queueNames));
boolean found = false;
for (String queueName : queueNames) {
if (expectedQueueNames.contains(queueName)) {
found = true;
}
else {
found = false;
break;
}
}
Assert.state(found, () -> "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
+ Arrays.asList(queueNames));
}
}
- 通过super.doStart();初始化active和running属性,并唤醒所有相关线程。
/**
* Start this container, and notify all invoker tasks.
* @throws Exception if thrown by Rabbit API methods
*/
protected void doStart() throws Exception {
// Reschedule paused tasks, if any.
synchronized (this.lifecycleMonitor) {
this.active = true;
this.running = true;
this.lifecycleMonitor.notifyAll();
}
}
- 初始化consumer并提交
synchronized (this.consumersMonitor) {
if (this.consumers != null) {
throw new IllegalStateException("A stopped container should not have consumers");
}
int newConsumers = initializeConsumers();
if (this.consumers == null) {
logger.info("Consumers were initialized and then cleared " +
"(presumably the container was stopped concurrently)");
return;
}
if (newConsumers <= 0) {
if (logger.isInfoEnabled()) {
logger.info("Consumers are already running");
}
return;
}
Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
for (BlockingQueueConsumer consumer : this.consumers) {
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
getTaskExecutor().execute(processor);
if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
}
}
for (AsyncMessageProcessingConsumer processor : processors) {
FatalListenerStartupException startupException = processor.getStartupException();
if (startupException != null) {
throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
}
}
}
3.1. 初始化consumer在这儿初始化的consumer为BlockingQueueConsumer类型,consumer数量根据concurrentConsumers而来,默认为1。每个consumer的属性根据SimpleMessageListenerContainer而来。
protected int initializeConsumers() {
int count = 0;
synchronized (this.consumersMonitor) {
if (this.consumers == null) {
this.cancellationLock.reset();
this.consumers = new HashSet<BlockingQueueConsumer>(this.concurrentConsumers);
for (int i = 0; i < this.concurrentConsumers; i++) {
BlockingQueueConsumer consumer = createBlockingQueueConsumer();
this.consumers.add(consumer);
count++;
}
}
}
return count;
}
protected BlockingQueueConsumer createBlockingQueueConsumer() {
BlockingQueueConsumer consumer;
String[] queues = getQueueNames();
// There's no point prefetching less than the tx size, otherwise the consumer will stall because the broker
// didn't get an ack for delivered messages
int actualPrefetchCount = getPrefetchCount() > this.txSize ? getPrefetchCount() : this.txSize;
consumer = new BlockingQueueConsumer(
getConnectionFactory(), getMessagePropertiesConverter(),
this.cancellationLock, getAcknowledgeMode(), isChannelTransacted(), actualPrefetchCount,
isDefaultRequeueRejected(), getConsumerArguments(), isNoLocal(), isExclusive(), queues);
if (this.declarationRetries != null) {
consumer.setDeclarationRetries(this.declarationRetries);
}
if (getFailedDeclarationRetryInterval() > 0) {
consumer.setFailedDeclarationRetryInterval(getFailedDeclarationRetryInterval());
}
if (this.retryDeclarationInterval != null) {
consumer.setRetryDeclarationInterval(this.retryDeclarationInterval);
}
if (getConsumerTagStrategy() != null) {
consumer.setTagStrategy(getConsumerTagStrategy());
}
consumer.setBackOffExecution(getRecoveryBackOff().start());
consumer.setShutdownTimeout(getShutdownTimeout());
consumer.setApplicationEventPublisher(getApplicationEventPublisher());
return consumer;
}
3.2. 将BlockingQueueConsumer封装成AsyncMessageProcessingConsumer并提交到线程池,同时检查启动结果。SimpleMessageListenerContainer#doStart通过AsyncMessageProcessingConsumer#getStartupException检查AsyncMessageProcessingConsumer的启动结果。
三、AsyncMessageProcessingConsumer
AsyncMessageProcessingConsumer是SimpleMessageListenerContainer的内部类,是一个Runnable类,有3个成员变量,分别为:
private final BlockingQueueConsumer consumer;
private final CountDownLatch start;
private volatile FatalListenerStartupException startupException;
AsyncMessageProcessingConsumer由BlockingQueueConsumer封装而来,也只有AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer)一个构造函数,start是一个CountDownLatch变量,值为1,启动结束后通过this.start.countDown()变化为0。startupException用于保存run方法执行过程中抛出的特定异常。
1. AsyncMessageProcessingConsumer#getStartupException
AsyncMessageProcessingConsumer#getStartupException用于在启动结束后返回执行过程中出现的特定异常。事实上,这里的校验包括了两种情况:
- 规定时间内没有启动或者被中断,抛出异常。 这儿抛出的异常来自于判断条件,因为start在构造方法内初始化为1,启动结束后为0。
- 启动过程中会针对一部分特定的异常进行处理,如果捕捉到了这部分异常,会将其赋值给startupException,然后在getStartupException方法中返回。
private FatalListenerStartupException getStartupException() throws TimeoutException,
InterruptedException {
if (!this.start.await(
SimpleMessageListenerContainer.this.consumerStartTimeout, TimeUnit.MILLISECONDS)) {
logger.error("Consumer failed to start in "
+ SimpleMessageListenerContainer.this.consumerStartTimeout
+ " milliseconds; does the task executor have enough threads to support the container "
+ "concurrency?");
}
return this.startupException;
}
2. AsyncMessageProcessingConsumer#run
@Override
public void run() {
//1.active校验
if (!isActive()) {
return;
}
//2. 初始化
boolean aborted = false;
int consecutiveIdles = 0;
int consecutiveMessages = 0;
this.consumer.setLocallyTransacted(isChannelLocallyTransacted());
String routingLookupKey = getRoutingLookupKey();
if (routingLookupKey != null) {
SimpleResourceHolder.bind(getRoutingConnectionFactory(), routingLookupKey);
}
//3. queue数量校验
if (this.consumer.getQueueCount() < 1) {
if (logger.isDebugEnabled()) {
logger.debug("Consumer stopping; no queues for " + this.consumer);
}
SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(
new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
}
this.start.countDown();
return;
}
try {
try {
//4. 通过redeclareElementsIfNecessary方法进行重新声明和定义元素
redeclareElementsIfNecessary();
//5. consumer启动
this.consumer.start();
this.start.countDown();
}
catch (QueuesNotAvailableException e) {
if (isMissingQueuesFatal()) {
throw e;
}
else {
this.start.countDown();
handleStartupFailure(this.consumer.getBackOffExecution());
throw e;
}
}
catch (FatalListenerStartupException ex) {
if (isPossibleAuthenticationFailureFatal()) {
throw ex;
}
else {
Throwable possibleAuthException = ex.getCause().getCause();
if (possibleAuthException == null ||
!(possibleAuthException instanceof PossibleAuthenticationFailureException)) {
throw ex;
}
else {
this.start.countDown();
handleStartupFailure(this.consumer.getBackOffExecution());
throw possibleAuthException;
}
}
}
catch (Throwable t) { //NOSONAR
this.start.countDown();
handleStartupFailure(this.consumer.getBackOffExecution());
throw t;
}
if (getTransactionManager() != null) {
/*
* Register the consumer's channel so it will be used by the transaction manager
* if it's an instance of RabbitTransactionManager.
*/
ConsumerChannelRegistry.registerConsumerChannel(this.consumer.getChannel(), getConnectionFactory());
}
//6.接收并处理消息
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
try {
boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
if (receivedOk) {
if (isActive(this.consumer)) {
consecutiveIdles = 0;
if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
considerAddingAConsumer();
consecutiveMessages = 0;
}
}
}
else {
consecutiveMessages = 0;
if (consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
considerStoppingAConsumer(this.consumer);
consecutiveIdles = 0;
}
}
}
long idleEventInterval = getIdleEventInterval();
if (idleEventInterval > 0) {
if (receivedOk) {
updateLastReceive();
}
else {
long now = System.currentTimeMillis();
long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();
long lastReceive = getLastReceive();
if (now > lastReceive + idleEventInterval
&& now > lastAlertAt + idleEventInterval
&& SimpleMessageListenerContainer.this.lastNoMessageAlert
.compareAndSet(lastAlertAt, now)) {
publishIdleContainerEvent(now - lastReceive);
}
}
}
}
catch (ListenerExecutionFailedException ex) {
// Continue to process, otherwise re-throw
if (ex.getCause() instanceof NoSuchMethodException) {
throw new FatalListenerExecutionException("Invalid listener", ex);
}
}
catch (AmqpRejectAndDontRequeueException rejectEx) {
/*
* These will normally be wrapped by an LEFE if thrown by the
* listener, but we will also honor it if thrown by an
* error handler.
*/
}
}
}
catch (InterruptedException e) {
logger.debug("Consumer thread interrupted, processing stopped.");
Thread.currentThread().interrupt();
aborted = true;
publishConsumerFailedEvent("Consumer thread interrupted, processing stopped", true, e);
}
catch (QueuesNotAvailableException ex) {
logger.error("Consumer received fatal=" + isMismatchedQueuesFatal() + " exception on startup", ex);
if (isMissingQueuesFatal()) {
this.startupException = ex;
// Fatal, but no point re-throwing, so just abort.
aborted = true;
}
publishConsumerFailedEvent("Consumer queue(s) not available", aborted, ex);
}
catch (FatalListenerStartupException ex) {
logger.error("Consumer received fatal exception on startup", ex);
this.startupException = ex;
// Fatal, but no point re-throwing, so just abort.
aborted = true;
publishConsumerFailedEvent("Consumer received fatal exception on startup", true, ex);
}
catch (FatalListenerExecutionException ex) {
logger.error("Consumer received fatal exception during processing", ex);
// Fatal, but no point re-throwing, so just abort.
aborted = true;
publishConsumerFailedEvent("Consumer received fatal exception during processing", true, ex);
}
catch (PossibleAuthenticationFailureException ex) {
logger.error("Consumer received fatal=" + isPossibleAuthenticationFailureFatal() +
" exception during processing", ex);
if (isPossibleAuthenticationFailureFatal()) {
this.startupException =
new FatalListenerStartupException("Authentication failure",
new AmqpAuthenticationException(ex));
// Fatal, but no point re-throwing, so just abort.
aborted = true;
}
publishConsumerFailedEvent("Consumer received PossibleAuthenticationFailure during startup", aborted, ex);
}
catch (ShutdownSignalException e) {
if (RabbitUtils.isNormalShutdown(e)) {
if (logger.isDebugEnabled()) {
logger.debug("Consumer received Shutdown Signal, processing stopped: " + e.getMessage());
}
}
else {
logConsumerException(e);
}
}
catch (AmqpIOException e) {
if (e.getCause() instanceof IOException && e.getCause().getCause() instanceof ShutdownSignalException
&& e.getCause().getCause().getMessage().contains("in exclusive use")) {
getExclusiveConsumerExceptionLogger().log(logger,
"Exclusive consumer failure", e.getCause().getCause());
publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, e);
}
else {
logConsumerException(e);
}
}
catch (Error e) { //NOSONAR
// ok to catch Error - we're aborting so will stop
logger.error("Consumer thread error, thread abort.", e);
publishConsumerFailedEvent("Consumer threw an Error", true, e);
aborted = true;
}
catch (Throwable t) { //NOSONAR
// by now, it must be an exception
if (isActive()) {
logConsumerException(t);
}
}
finally {
if (getTransactionManager() != null) {
ConsumerChannelRegistry.unRegisterConsumerChannel();
}
}
//7. 确保启动结束的标志start正确
// In all cases count down to allow container to progress beyond startup
this.start.countDown();
//8. 终止或重启consumer
if (!isActive(this.consumer) || aborted) {
logger.debug("Cancelling " + this.consumer);
try {
this.consumer.stop();
SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(
new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
}
}
catch (AmqpException e) {
logger.info("Could not cancel message consumer", e);
}
if (aborted && SimpleMessageListenerContainer.this.containerStoppingForAbort
.compareAndSet(null, Thread.currentThread())) {
logger.error("Stopping container from aborted consumer");
stop();
SimpleMessageListenerContainer.this.containerStoppingForAbort.set(null);
ListenerContainerConsumerFailedEvent event = null;
do {
try {
event = SimpleMessageListenerContainer.this.abortEvents.poll(5, TimeUnit.SECONDS);
if (event != null) {
SimpleMessageListenerContainer.this.publishConsumerFailedEvent(
event.getReason(), event.isFatal(), event.getThrowable());
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
while (event != null);
}
}
else {
logger.info("Restarting " + this.consumer);
restart(this.consumer);
}
if (routingLookupKey != null) {
SimpleResourceHolder.unbind(getRoutingConnectionFactory());
}
}
run方法很长,但除去异常处理的相关代码之后的逻辑也并不是很复杂,大致可以按以下步骤理解:
- active校验校验
- 变量初始化。一些变量的初始化,包括是否启动事务、路由的key值。consecutiveMessages表示连续接收到的消息数量, consecutiveIdles表示连续失败的数量
- queue数量校验
- 通过redeclareElementsIfNecessary方法进行重新声明和定义元素。自动删除队列会导致绑定关系、路由器等元素也被删除,因此有可能需要重新申明和定义
- consumer启动。正常的业务场景下,这一步执行结束表示了AyncMessageProcessingCosnumer启动完成。 通过consumer的start方法进行consumer的启动。
- 接收并处理消息: 这一步是一个循环.
- 在consumer可用的状态(consumer有效且queue不为空且没有取消)下循环接收消息并委托给receiveAndExecute方法进行处理。
- 如果接收并处理成功,当consecutiveMessages大于触发器consecutiveActiveTrigger,触发considerAddingAConsumer方法在符合条件的情况下增加consumer数量
- 否则,当consecutiveIdles大于触发器consecutiveIdleTrigger,触发considerStoppingAConsumer在符合条件的情况下终止consumer。
- 根据时间间隔判断失败的场景下是否需要发布ListenerContainerIdleEvent。
- 确保启动结束的标志start正确
- 终止或重启consumer。根据consumer是否有效和是否被中断对consumer进行相应的终止或重启操作。
四、 消息接收与处理
1. SimpleMessageListenerContainer#receiveAndExecute
private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Throwable {
if (getTransactionManager() != null) {
try {
if (this.transactionTemplate == null) {
this.transactionTemplate =
new TransactionTemplate(getTransactionManager(), getTransactionAttribute());
}
return this.transactionTemplate
.execute(status -> {
RabbitResourceHolder resourceHolder = ConnectionFactoryUtils.bindResourceToTransaction(
new RabbitResourceHolder(consumer.getChannel(), false),
getConnectionFactory(), true);
// unbound in ResourceHolderSynchronization.beforeCompletion()
try {
return doReceiveAndExecute(consumer);
}
catch (RuntimeException e1) {
prepareHolderForRollback(resourceHolder, e1);
throw e1;
}
catch (Throwable e2) { //NOSONAR
// ok to catch Throwable here because we re-throw it below
throw new WrappedTransactionException(e2);
}
});
}
catch (WrappedTransactionException e) {
throw e.getCause();
}
}
return doReceiveAndExecute(consumer);
}
receiveAndExecute委托doReceiveAndExecute方法进行具体的业务逻辑,如果配置了事务,在抛出异常的场景下由prepareHolderForRollback方法进行回滚。
2. SimpleMessageListenerContainer#doReveiveAndExecute
private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable { //NOSONAR
Channel channel = consumer.getChannel();
for (int i = 0; i < this.txSize; i++) {
logger.trace("Waiting for message from consumer.");
Message message = consumer.nextMessage(this.receiveTimeout);
if (message == null) {
break;
}
try {
executeListener(channel, message);
}
catch (ImmediateAcknowledgeAmqpException e) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("User requested ack for failed delivery '"
+ e.getMessage() + "': "
+ message.getMessageProperties().getDeliveryTag());
}
break;
}
catch (Throwable ex) { //NOSONAR
if (causeChainHasImmediateAcknowledgeAmqpException(ex)) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("User requested ack for failed delivery: "
+ message.getMessageProperties().getDeliveryTag());
}
break;
}
if (getTransactionManager() != null) {
if (getTransactionAttribute().rollbackOn(ex)) {
RabbitResourceHolder resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager
.getResource(getConnectionFactory());
if (resourceHolder != null) {
consumer.clearDeliveryTags();
}
else {
/*
* If we don't actually have a transaction, we have to roll back
* manually. See prepareHolderForRollback().
*/
consumer.rollbackOnExceptionIfNecessary(ex);
}
throw ex; // encompassing transaction will handle the rollback.
}
else {
if (this.logger.isDebugEnabled()) {
this.logger.debug("No rollback for " + ex);
}
break;
}
}
else {
consumer.rollbackOnExceptionIfNecessary(ex);
throw ex;
}
}
}
return consumer.commitIfNecessary(isChannelLocallyTransacted());
}
doReceiveAndExecute又委托了executeListener(channel, message);进行处理,剩余部分的代码都是针对事务的处理。
3. SimpleMessageListenerContainer#nextMessage
public Message nextMessage(long timeout) throws InterruptedException, ShutdownSignalException {
if (logger.isTraceEnabled()) {
logger.trace("Retrieving delivery for " + this);
}
checkShutdown();
if (this.missingQueues.size() > 0) {
checkMissingQueues();
}
Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
if (message == null && this.cancelled.get()) {
throw new ConsumerCancelledException();
}
return message;
}
private Message handle(Delivery delivery) throws InterruptedException {
if ((delivery == null && this.shutdown != null)) {
throw this.shutdown;
}
if (delivery == null) {
return null;
}
byte[] body = delivery.getBody();
Envelope envelope = delivery.getEnvelope();
MessageProperties messageProperties = this.messagePropertiesConverter.toMessageProperties(
delivery.getProperties(), envelope, "UTF-8");
messageProperties.setConsumerTag(delivery.getConsumerTag());
messageProperties.setConsumerQueue(this.consumerTags.get(delivery.getConsumerTag()));
Message message = new Message(body, messageProperties);
if (logger.isDebugEnabled()) {
logger.debug("Received message: " + message);
}
this.deliveryTags.add(messageProperties.getDeliveryTag());
if (this.transactional && !this.locallyTransacted) {
ConnectionFactoryUtils.registerDeliveryTag(
this.connectionFactory, this.channel,delivery.getEnvelope().getDeliveryTag());
}
return message;
}
nextMessage方法用于从rabbit中拉去数据然后组装成message格式。
4. AbstractMessageListenerContainer#executeListener
protected void executeListener(Channel channel, Message messageIn) throws Exception {
if (!isRunning()) {
if (logger.isWarnEnabled()) {
logger.warn("Rejecting received message because the listener container has been stopped: " + messageIn);
}
throw new MessageRejectedWhileStoppingException();
}
try {
Message message = messageIn;
if (this.afterReceivePostProcessors != null) {
for (MessagePostProcessor processor : this.afterReceivePostProcessors) {
message = processor.postProcessMessage(message);
if (message == null) {
throw new ImmediateAcknowledgeAmqpException(
"Message Post Processor returned 'null', discarding message");
}
}
}
Object batchFormat = message.getMessageProperties().getHeaders().get(MessageProperties.SPRING_BATCH_FORMAT);
if (MessageProperties.BATCH_FORMAT_LENGTH_HEADER4.equals(batchFormat) && this.deBatchingEnabled) {
ByteBuffer byteBuffer = ByteBuffer.wrap(message.getBody());
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.getHeaders().remove(MessageProperties.SPRING_BATCH_FORMAT);
while (byteBuffer.hasRemaining()) {
int length = byteBuffer.getInt();
if (length < 0 || length > byteBuffer.remaining()) {
throw new ListenerExecutionFailedException(
"Bad batched message received",
new MessageConversionException("Insufficient batch data at offset " + byteBuffer.position()),
message);
}
byte[] body = new byte[length];
byteBuffer.get(body);
messageProperties.setContentLength(length);
// Caveat - shared MessageProperties.
Message fragment = new Message(body, messageProperties);
invokeListener(channel, fragment);
}
}
else {
invokeListener(channel, message);
}
}
catch (Exception ex) {
if (messageIn.getMessageProperties().isFinalRetryForMessageWithNoId()) {
if (this.statefulRetryFatalWithNullMessageId) {
throw new FatalListenerExecutionException(
"Illegal null id in message. Failed to manage retry for message: " + messageIn);
}
else {
throw new ListenerExecutionFailedException(
"Cannot retry message more than once without an ID",
new AmqpRejectAndDontRequeueException("Not retryable; rejecting and not requeuing", ex),
messageIn);
}
}
handleListenerException(ex);
throw ex;
}
}
可以看出executeListener方法内只是控制了业务流程,校验通过后,由后处理器MessagePostProcessor进行预处理,对批量型的springBatchFormat进行message处理,委托invokeListener方法进行具体的业务代码实现,如果出现异常,由handleListenerException处理异常。
5. AbstractMessageListenerContainer#invokeListener
protected void invokeListener(Channel channel, Message message) throws Exception {
this.proxy.invokeListener(channel, message);
}
AbstractMessageListenerContainer#invokeListener方法最终是执行proxy的invokeListener方法,查看proxy的属性值。
private final ContainerDelegate delegate = this::actualInvokeListener;
private ContainerDelegate proxy = this.delegate;
可以看出,最终回到了AbstractMessageListenerContainer#actualInvokeListener方法。
6. AbstractMessageListenerContainer#actualInvokeListener
protected void actualInvokeListener(Channel channel, Message message) throws Exception {
Object listener = getMessageListener();
if (listener instanceof ChannelAwareMessageListener) {
doInvokeListener((ChannelAwareMessageListener) listener, channel, message);
}
else if (listener instanceof MessageListener) {
boolean bindChannel = isExposeListenerChannel() && isChannelLocallyTransacted();
if (bindChannel) {
RabbitResourceHolder resourceHolder = new RabbitResourceHolder(channel, false);
resourceHolder.setSynchronizedWithTransaction(true);
TransactionSynchronizationManager.bindResource(
this.getConnectionFactory(),resourceHolder);
}
try {
doInvokeListener((MessageListener) listener, message);
}
finally {
if (bindChannel) {
// unbind if we bound
TransactionSynchronizationManager.unbindResource(this.getConnectionFactory());
}
}
}
else if (listener != null) {
throw new FatalListenerExecutionException(
"Only MessageListener and SessionAwareMessageListener supported: "
+ listener);
}
else {
throw new FatalListenerExecutionException(
"No message listener specified - see property 'messageListener'");
}
}
由于AbstractMessageListener的属性messageListener是Object类型的,因此在处理记得逻辑时需要根据messageListener的真实类型做不同处理。
- ChannelAwareMessageListener类型: 委托doInvokeListener((ChannelAwareMessageListener) listener, channel, message);方法处理。
- MessageListener类型: 委托doInvokeListener((MessageListener) listener, message);方法处理,另外根据需要进行事务资源的绑定与解绑。
- 不存在或者其它类型: 抛出异常
7. AbstractMessageListenerContainer#doInvokeListener(MessageListener, Message)
protected void doInvokeListener(MessageListener listener, Message message) throws Exception {
try {
listener.onMessage(message);
}
catch (Exception e) {
throw wrapToListenerExecutionFailedExceptionIfNeeded(e, message);
}
}
对于MessageListener类型的监听器,由MessageListener#onMessage处理即可。
8.AbstractMessageListenerContainer#doInvokeListener(ChannelAwareMessageListener, Channel, Message)
/**
* Invoke the specified listener as Spring ChannelAwareMessageListener, exposing a new Rabbit Session (potentially
* with its own transaction) to the listener if demanded.
* @param listener the Spring ChannelAwareMessageListener to invoke
* @param channel the Rabbit Channel to operate on
* @param message the received Rabbit Message
* @throws Exception if thrown by Rabbit API methods or listener itself.
* <p>
* Exception thrown from listener will be wrapped to {@link ListenerExecutionFailedException}.
* @see ChannelAwareMessageListener
* @see #setExposeListenerChannel(boolean)
*/
protected void doInvokeListener(ChannelAwareMessageListener listener, Channel channel, Message message)
throws Exception {
RabbitResourceHolder resourceHolder = null;
Channel channelToUse = channel;
boolean boundHere = false;
try {
if (!isExposeListenerChannel()) {
// We need to expose a separate Channel.
resourceHolder = getTransactionalResourceHolder();
channelToUse = resourceHolder.getChannel();
/*
* If there is a real transaction, the resource will have been bound; otherwise
* we need to bind it temporarily here. Any work done on this channel
* will be committed in the finally block.
*/
if (isChannelLocallyTransacted() &&
!TransactionSynchronizationManager.isActualTransactionActive()) {
resourceHolder.setSynchronizedWithTransaction(true);
TransactionSynchronizationManager.bindResource(this.getConnectionFactory(),
resourceHolder);
boundHere = true;
}
}
else {
// if locally transacted, bind the current channel to make it available to RabbitTemplate
if (isChannelLocallyTransacted()) {
RabbitResourceHolder localResourceHolder = new RabbitResourceHolder(channelToUse, false);
localResourceHolder.setSynchronizedWithTransaction(true);
TransactionSynchronizationManager.bindResource(this.getConnectionFactory(),
localResourceHolder);
boundHere = true;
}
}
// Actually invoke the message listener...
try {
listener.onMessage(message, channelToUse);
}
catch (Exception e) {
throw wrapToListenerExecutionFailedExceptionIfNeeded(e, message);
}
}
finally {
if (resourceHolder != null && boundHere) {
// so the channel exposed (because exposeListenerChannel is false) will be closed
resourceHolder.setSynchronizedWithTransaction(false);
}
ConnectionFactoryUtils.releaseResources(resourceHolder);
if (boundHere) {
// unbind if we bound
TransactionSynchronizationManager.unbindResource(this.getConnectionFactory());
if (!isExposeListenerChannel() && isChannelLocallyTransacted()) {
/*
* commit the temporary channel we exposed; the consumer's channel
* will be committed later. Note that when exposing a different channel
* when there's no transaction manager, the exposed channel is committed
* on each message, and not based on txSize.
*/
RabbitUtils.commitIfNecessary(channelToUse);
}
}
}
}
对于ChannelAwareMessageListener类型的监听器,除去事务的处理以外,同样也是交给ChannelAwareMessageListener#onMessage进行处理。
本文由 创作,采用 知识共享署名4.0 国际许可协议进行许可。本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名。最后编辑时间为: 2021/07/07 09:30