Ribbon对于SocketTimeOutException重试的坑以及重试代码解析

/ Java / 没有评论 / 1648浏览

Ribbon对于SocketTimeOutException重试的坑以及重试代码解析

背景

本文基于Spring-Cloud, Daltson SR4

微服务一般多实例部署,在发布的时候,我们要做到无感知发布;微服务调用总会通过Ribbon,同时里面会实现一些重试的机制,相关配置是:

#最多重试多少台服务器
ribbon.MaxAutoRetriesNextServer=2
#每台服务器最多重试次数,但是首次调用不包括在内
ribbon.MaxAutoRetries=11234

在发布时,为了适应Eureka注册中心的注册信息变换(参考Eureka上线下线解析),我们挨个重启实例,并且在每个实例启动后等待一段时间((Eureka客户端注册信息刷新时间+Eureka客户端Ribbon刷新事件)*3)再重启另外一个实例,来避免注册信息变化带来的影响,这样这个被重启的实例的微服务的调用方总能负载均衡重试调用到可用的实例。

但是,实际生产中,我们发现,某个实例重启其他实例正常工作时,会有一小段时间,调用方调用到被重启的实例,直接失败,没有触发重试。

代码分析

无论上层是Feign调用还是Zuul调用,到了Ribbon这一层都是创建一个LoadBalancerCommand,调用其中的submit方法执行http请求,这里利用了RxJava机制:

public Observable<T> submit(final ServerOperation<T> operation) {
    final ExecutionInfoContext context = new ExecutionInfoContext();

    if (listenerInvoker != null) {
        try {
            listenerInvoker.onExecutionStart();
        } catch (AbortExecutionException e) {
            return Observable.error(e);
        }
    }

    //这里就是读取上面说的配置最多重试多少台服务器以及每台服务器最多重试次数
    final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
    final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

    // 利用RxJava生成一个Observable用于后面的回调
    Observable<T> o = 
            //选择一个server进行调用
            (server == null ? selectServer() : Observable.just(server))
            .concatMap(new Func1<Server, Observable<T>>() {
                @Override
                // Called for each server being selected
                public Observable<T> call(Server server) {
                    context.setServer(server);
                    //获取这个server调用监控记录,用于各种统计和LoadBalanceRule的筛选server处理
                    final ServerStats stats = loadBalancerContext.getServerStats(server);

                    //获取本次server调用的回调入口,用于重试同一实例的重试回调
                    Observable<T> o = Observable
                            .just(server)
                            .concatMap(new Func1<Server, Observable<T>>() {
                                @Override
                                public Observable<T> call(final Server server) {
                                    context.incAttemptCount();
                                    loadBalancerContext.noteOpenConnection(stats);

                                    if (listenerInvoker != null) {
                                        try {
                                            listenerInvoker.onStartWithServer(context.toExecutionInfo());
                                        } catch (AbortExecutionException e) {
                                            return Observable.error(e);
                                        }
                                    }

                                    final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();

                                    return operation.call(server).doOnEach(new Observer<T>() {
                                        private T entity;
                                        @Override
                                        public void onCompleted() {
                                            recordStats(tracer, stats, entity, null);
                                            // TODO: What to do if onNext or onError are never called?
                                        }

                                        @Override
                                        public void onError(Throwable e) {
                                            recordStats(tracer, stats, null, e);
                                            logger.debug("Got error {} when executed on server {}", e, server);
                                            if (listenerInvoker != null) {
                                                listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
                                            }
                                        }

                                        @Override
                                        public void onNext(T entity) {
                                            this.entity = entity;
                                            if (listenerInvoker != null) {
                                                listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
                                            }
                                        }                            

                                        private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
                                            tracer.stop();
                                            loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
                                        }
                                    });
                                }
                            });
                    //设置针对同一实例的重试回调
                    if (maxRetrysSame > 0) 
                        o = o.retry(retryPolicy(maxRetrysSame, true));
                    return o;
                }
            });
    //设置重试下一个实例的回调    
    if (maxRetrysNext > 0 && server == null) 
        o = o.retry(retryPolicy(maxRetrysNext, false));
    //设置重试超过次数则终止调用并设置对应异常的回调
    return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
        @Override
        public Observable<T> call(Throwable e) {
            if (context.getAttemptCount() > 0) {
                if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
                            "Number of retries on next server exceeded max " + maxRetrysNext
                            + " retries, while making a call for: " + context.getServer(), e);
                }
                else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
                            "Number of retries exceeded max " + maxRetrysSame
                            + " retries, while making a call for: " + context.getServer(), e);
                }
            }
            if (listenerInvoker != null) {
                listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
            }
            return Observable.error(e);
        }
    });
}

我们重点看一下设置重试的回调的详细回调代码:

private Func2<Integer, Throwable, Boolean> retryPolicy(final int maxRetrys, final boolean same) {
    return new Func2<Integer, Throwable, Boolean>() {
        //只有返回为true的时候才会retry
        @Override
        public Boolean call(Integer tryCount, Throwable e) {
            //抛出的异常是AbortExecutionException则不重试
            if (e instanceof AbortExecutionException) {
                return false;
            }

            //超过最大重试次数则不重试
            if (tryCount > maxRetrys) {
                return false;
            }

            if (e.getCause() != null && e instanceof RuntimeException) {
                e = e.getCause();
            }
            //判断是否是可以重试的exception
            return retryHandler.isRetriableException(e, same);
        }
    };
}

这个判断是否是可以重试的exception里面的逻辑是:

public boolean isRetriableException(Throwable e, boolean sameServer)
{
    //如果已经配置了ribbon.okToRetryOnAllErrors为true,则不论什么异常都会重试,我们没有这么配置,一般也不会这么配置
    if (okToRetryOnAllErrors)
    {
        return true;
    }
    else if (e instanceof ClientException)
    {
        ClientException ce = (ClientException) e;
        if (ce.getErrorType() == ClientException.ErrorType.SERVER_THROTTLED)
        {
            return !sameServer;
        }
        else
        {
            return false;
        }
    }
    else
    {

        if (e instanceof RetryableHttpCodeAndMethodException)
        {
         //如果是有response返回的异常就会到这里
            if (((RetryableHttpCodeAndMethodException) e).getMethod().equals("GET") || okToRetryOnAllOperations)
                return true;
            return false;
        }
        //其他情况,就是连接失败的判断。首先需要配置ribbon.okToRetryOnConnectErrors为true,这个默认就是true;然后通过isConnectionException判断
        return okToRetryOnConnectErrors && isConnectionException(e);
    }
}

最后,我们来看看如何判断一个Exception为ConnectionException:

protected List<Class<? extends Throwable>> connectionRelated = Lists
            .<Class<? extends Throwable>> newArrayList(SocketException.class);
public boolean isConnectionException(Throwable e)
{
    return Utils.isPresentAsCause(e, connectionRelated);
}

这个方法其实就看这个异常的异常以及Cause中是否有SocketException,如果有则返回true。

问题定位

在Windows环境下调试,我们发现一个有意思的现象,当我们设置ribbon连接超时 ribbon.ConnectTimeout=500时(这个和我们线上配置一样),重试失败,捕获到“java.net.SocketTimeoutException: connect timed out”这个Exception;当设置连接超时为1000ms以上时(不包括1000),抛出的异常就是“java.net.ConnectException: Connection refused: connect”

我们写一段测试代码看一下:

 public static void main(String[] args) throws IOException {
    Socket socket = new Socket();
    try {
        socket.connect(new InetSocketAddress("127.0.0.1", 8080), 500);
    } catch (Exception e) {
        e.printStackTrace();
    }
    socket = new Socket();
    socket.connect(new InetSocketAddress("127.0.0.1", 8080), 1100);

}

这个端口没有启用,输出为:

java.net.SocketTimeoutException: connect timed out
    at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
    at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at com.hash.test.TestRxJava.main(TestRxJava.java:14)
Exception in thread "main" java.net.ConnectException: Connection refused: connect
    at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
    at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at com.hash.test.TestRxJava.main(TestRxJava.java:19)

就是不一样的Exception

而SocketTimeoutException不是一种SocketException,所以,原有的重试逻辑不能重试。

对于这个问题,我在Feign的github源代码库提了个issue

所以,我们要改造isConnectionException这个方法;对于SocketTimeoutException,不是全都重试,只重试msg为connect timed out的Exception。同时,SocketTimeoutException可能会被封装,我们为了简单,只通过msg进行判断:

public boolean isConnectionException(Throwable e)
{
    return Utils.isPresentAsCause(e, connectionRelated)
            || e.getMessage().contains("connect timed out");
}

这段代码,也提了Pull Request

修改替换源代码后,线上问题解决