Java 线程池 Executors 教程

/ Java / 没有评论 / 1619浏览

Java 线程池 Executors 教程

在现代的互联网应用发展过程中,三高特性,经常性的会在面试中被问起。这是一种趋势,不管你要进入的公司是不是具有这些特性。今天我们要说的是性能方便多线程的使用!说到多线程,大家一定会想到线程池,下面一起来总结一下多线程的好处!

线程池的好处

对于线程池,java 中提供了一个 Executors 线程池,我们今天重点讲它!

Executors 类关系图

在上图中,我们最常使用的是 Executors,它主要用来创建线程池使用线程。它包含了一个 Executor 框架,它是一个根据一组执行策略的调用调度执行和控制异步任务的框架,目的是提供一种将任务提交与任务如何运行分离开的机制。它包含了三个 Executor 接口:

在《阿里巴巴开发手册》中有一项规则写到:“【强制】线程池不允许使用 Executors去创建,而是通过 ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。……”

我们可以看到,手册上还做了一个说明!具体如下:

Executors 返回的线程池对象的弊端如下:

  1. FixedThreadPool和 SingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
  2. CachedThreadPool和 ScheduledThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

从上面这个说明可以看出,阿里巴巴内部肯定有人使用 Executors 去创建线程池,然而由于使用不当,产生了 OOM 问题。我猜这样用的肯定不止一个人,而且是引起的问题比较大,内部肯定做了特例进行学习,于是就被写进了手册。

阿里巴巴的开发人员能犯的错,我们可能也会犯,所以呢?我也不太推荐大家直接用 Executors 去直接创建线程池!

ThreadPoolExecutor

既然上面提到了使用 ThreadPoolExecutor 来创建线程池,那我们就一起来看看它。

ThreadPoolExecutor一共有七个参数,这七个参数配合起来,构成了线程池强大的功能。

当我们提交一个新的任务到线程池,线程池会根据当前池中正在运行的线程数量来决定该任务的处理方式。处理方式有三种: 1、直接切换(SynchronusQueue) 2、无界队列(LinkedBlockingQueue)能够创建的最大线程数为corePoolSize,这时maximumPoolSize就不会起作用了。当线程池中所有的核心线程都是运行状态的时候,新的任务提交就会放入等待队列中。 3、有界队列(ArrayBlockingQueue)最大maximumPoolSize,能够降低资源消耗,但是这种方式使得线程池对线程调度变的更困难。因为线程池与队列容量都是有限的。所以想让线程池的吞吐率和处理任务达到一个合理的范围,又想使我们的线程调度相对简单,并且还尽可能降低资源的消耗,我们就需要合理的限制这两个数量 分配技巧: [如果想降低资源的消耗包括降低cpu使用率、操作系统资源的消耗、上下文切换的开销等等,可以设置一个较大的队列容量和较小的线程池容量,这样会降低线程池的吞吐量。如果我们提交的任务经常发生阻塞,我们可以调整maximumPoolSize。如果我们的队列容量较小,我们需要把线程池大小设置的大一些,这样cpu的使用率相对来说会高一些。但是如果线程池的容量设置的过大,提高任务的数量过多的时候,并发量会增加,那么线程之间的调度就是一个需要考虑的问题。这样反而可能会降低处理任务的吞吐量。

corePoolSize、maximumPoolSize、workQueue 三者关系:如果运行的线程数小于corePoolSize的时候,直接创建新线程来处理任务。即使线程池中的其他线程是空闲的。如果运行中的线程数大于corePoolSize且小于maximumPoolSize时,那么只有当workQueue满的时候才创建新的线程去处理任务。如果corePoolSize与maximumPoolSize是相同的,那么创建的线程池大小是固定的。这时有新任务提交,当workQueue未满时,就把请求放入workQueue中。等待空线程从workQueue取出任务。如果workQueue此时也满了,那么就使用另外的拒绝策略参数去执行拒绝策略。

初始化方法:由七个参数组合成四个初始化方法 ThreadPoolExecutor

其他方法:

  1. execute() 提交任务,交给线程池执行
  2. submit() 提交任务,能够返回执行结果 execute+Future
  3. shutdown() 关闭线程池,等待任务都执行完
  4. shutdownNow() 关闭线程池,不等待任务执行完
  5. getTaskCount() 线程池已执行和未执行的任务总数
  6. getCompleteTaskCount() 已完成的任务数量
  7. getPoolSize() 线程池当前的线程数量
  8. getActiveCount() 当前线程池中正在执行任务的线程数量

线程池线程生命周期

线程池线程生命周期

线程池

线程池原理

这是Executors 创建常用的 4种线程池,但是我在上面说了 需要用ThreadPoolExecutor 来创建,那么怎么办呢? 那我们一起看一下源码把

Executors.newCachedThreadPool

ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();

源码如下:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(
        0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>()
    );
}

Executors.newFixedThreadPool

ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);

对应的接口方法如下:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(
        nThreads, nThreads,0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>()
    );
}

Executors.newSingleThreadExecutor

ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();

对应的接口源码如下:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService(
        new ThreadPoolExecutor(
            1, 1,0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>()
        )
    );
}

Executors.newScheduledThreadPool

ScheduledExecutorService提供了三种方法可以使用: ScheduledExecutorService

举例:

executorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        log.warn("schedule run");
    }
}, 1, 3, TimeUnit.SECONDS);//延迟一秒后每隔3秒执行

小扩展:延迟执行任务的操作,java中还有Timer类同样可以实现:

Timer timer = new Timer();
timer.schedule(new TimerTask() {
    @Override
    public void run() {
        log.warn("timer run");
    }
}, new Date(), 5 * 1000);