Java 线程池框架核心代码分析

澳门新葡亰3522平台游戏 1

前言

多线程编程中,为每个任务分配一个线程是不现实的,线程创建的开销和资源消耗都是很高的。线程池应运而生,成为我们管理线程的利器。Java
通过Executor接口,提供了一种标准的方法将任务的提交过程和执行过程解耦开来,并用Runnable表示任务。

下面,我们来分析一下 Java 线程池框架的实现ThreadPoolExecutor

下面的分析基于JDK1.7

引言


为什么引入线程池技术?

对于服务端的程序,经常面对的是执行时间较短、工作内容较为单一的任务,需要服务端快速处理并返回接口。假若服务端每次接收到一个任务,就创建一个线程,然后执行,这种方式在原型阶段是不错的选择,但是面对成千上万的任务提交进服务器时,这个时候将会创建数以万记的线程,这很明显不是一个好的选择。为什么呢?

  • 第一,频繁的线程切换会使操作系统频繁的进行上下文切换,增加了系统的负载;
  • 第二,线程的创建和销毁是需要耗费系统资源的,这样子很明显浪费了系统资源。

线程池技术很好的解决了这个问题,它预先创建一定数量的线程,用户不能直接控制线程的创建和销毁,重复使用固定或者较为固定数目的线程来完成任务的执行。这样做的好处:

  • 消除了频繁创建和销毁线程的系统资源开销;
  • 面对过量任务的提交能够平缓劣化。

生命周期

ThreadPoolExecutor中,使用CAPACITY的高3位来表示运行状态,分别是:

  1. RUNNING:接收新任务,并且处理任务队列中的任务
  2. SHUTDOWN:不接收新任务,但是处理任务队列的任务
  3. STOP:不接收新任务,不出来任务队列,同时中断所有进行中的任务
  4. TIDYING:所有任务已经被终止,工作线程数量为
    0,到达该状态会执行terminated()
  5. TERMINATED:terminated()执行完毕

澳门新葡亰3522平台游戏 1

状态转换图

ThreadPoolExecutor中用原子类来表示状态位

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ThreadPoolExcutor源码解析

在看具体的源码之前,先给一个线程池使用案例

线程池使用案例

  1. 创建线程池对象;
  2. executor.submit(Runnable task)提交10个任务;
  3. executor.submit(Callable<T> task)提交5个任务;
  4. 所有线程的管理都由线程池来原理,程序员不需要关注线程的创建销毁。

线程池模型

构造方法

public ThreadPoolExecutor(int corePoolSize, 
                          int maximumPoolSize, 
                          long keepAliveTime, 
                          TimeUnit unit, 
                          BlockingQueue<Runnable> workQueue, 
                          RejectedExecutionHandler handler) {    
  this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);}

核心参数:

  • corePoolSize:核心线程数,线程池里一直不会被销毁的线程数量;

  • maximumPoolSize:最大线程数量;

  • keepAliveTime:非核心线程空闲时的存活时间,该参数只有在线程数量 >
    corePoolSize情况下才有用;

  • unit:keepAlive时间单位;

  • workQueue:工作队列,JDK提供这几种工作队列:

    • ArrayBlockingQueue:基于数组的有界阻塞队列,任务以FIFO顺序排序;
    • LinkedBlockingQueue:基于链表的阻塞队列,任务以FIFO顺序排列,吞吐量优于ArrayBlockingQueue,在使用时需要注意,此阻塞队列在不设置大小的时候,默认的长度是Integer.MAX_VALUE
    • PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含任务的排序不是FIFO,而是依据任务的自然排序顺序或者是构造函数的Comparator决定的顺序;
    • SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的,典型的生产者-消费者模型,它不存储元素,每一次的插入必须要等另一个线程的移除操作完成。
  • threadFactory:创建线程工厂,可以自定义线程工厂给线程池里的线程设置一个自定义线程名。

    DefaultThreadFactory源码

  • handler:饱和策略,假如线程池已满,并且没有空闲的线程,这个时候不再允许提交任务到线程池,线程池提供了4中策略,至于具体采用哪种策略还是自定义策略,具体情况具体分析。

    • AbortPolicy:拒绝提交,直接抛出异常,也是默认的饱和策略;
    • CallerRunsPolicy:线程池还未关闭时,用调用者的线程执行任务;
    • DiscardPolicy:丢掉提交任务;
    • DiscardOldestPolicy:线程池还未关闭时,丢掉阻塞队列最久为处理的任务,并且执行当前任务。

核心参数

  • corePoolSize:最小存活的工作线程数量(如果设置allowCoreThreadTimeOut,那么该值为
    0)
  • maximumPoolSize:最大的线程数量,受限于CAPACITY
  • keepAliveTime:对应线程的存活时间,时间单位由TimeUnit指定
  • workQueue:工作队列,存储待执行的任务
  • RejectExecutionHandler:拒绝策略,线程池满后会触发

线程池的最大容量CAPACITY中的前三位用作标志位,也就是说工作线程的最大容量为(2^29)-1

线程池内部状态

线程池内部状态

线程池用ctl的低29位表示线程池中的线程数,高3位表示当前线程状态,后续假如想要增大这个值,可以将AtomicInteger改成AtomicLong。

  • RUNNING:运行状态,高3位为111;
  • SHUTDOWN:关闭状态,高3位为000,在此状态下,线程池不再接受新任务,但是仍然处理阻塞队列中的任务;
  • STOP:停止状态,高3位为001,在此状态下,线程池不再接受新任务,也不会处理阻塞队列中的任务,正在运行的任务也会停止;
  • TIDYING:高3位为010;
  • TERMINATED:终止状态,高3位为011。

接下来就以submit方法入手,分析一下相关源码。

四种模型

  • CachedThreadPool:一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,当需求增加时,则可以添加新的线程,线程池的规模不存在任何的限制。
  • FixedThreadPool:一个固定大小的线程池,提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的大小将不再变化。
  • SingleThreadPool:一个单线程的线程池,它只有一个工作线程来执行任务,可以确保按照任务在队列中的顺序来串行执行,如果这个线程异常结束将创建一个新的线程来执行任务。
  • ScheduledThreadPool:一个固定大小的线程池,并且以延迟或者定时的方式来执行任务,类似于Timer。

submit任务提交

public Future<?> submit(Runnable task) {
  //提交的task为null,抛出空指针异常   
  if (task == null)
       throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    //执行任务
    execute(ftask);
    return ftask;
}

整个任务的提交核心都在任务执行这部分,执行任务,拿到返回值。

执行任务 execute

核心逻辑:

  1. 当前线程数量
    corePoolSize,直接开启新的核心线程执行任务addWorker(command, true)
  2. 当前线程数量 >= corePoolSize,且任务加入工作队列成功
    1. 检查线程池当前状态是否处于RUNNING
    2. 如果否,则拒绝该任务
    3. 如果是,判断当前线程数量是否为 0,如果为 0,就增加一个工作线程。
  3. 开启普通线程执行任务addWorker(command, false),开启失败就拒绝该任务

从上面的分析可以总结出线程池运行的四个阶段:

  1. poolSize < corePoolSize 且队列为空,此时会新建线程来处理提交的任务
  2. poolSize == corePoolSize,此时提交的任务进入工作队列,工作线程从队列中获取任务执行,此时队列不为空且未满。
  3. poolSize == corePoolSize,并且队列已满,此时也会新建线程来处理提交的任务,但是poolSize < maxPoolSize
  4. poolSize == maxPoolSize,并且队列已满,此时会触发拒绝策略
任务执行execute
public void execute(Runnable command) {
  if (command == null)
     throw new NullPointerException();
  int c = ctl.get();
  if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
        return;
    c = ctl.get();
  }
  if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
  } else if (!addWorker(command, false))
      reject(command);
}

具体的执行流程如下:

  • 通过workerCountOf计算出当前线程池的线程数,如果线程数小于corePoolSize,执行addWork方法创建新的线程执行任务;
  • 如果当前线程池线程数大于coreSize,向队列里添加task,不继续增加线程;
  • workQueue.offer失败时,也就是说现在队列已满,不能再向队列里放,此时工作线程大于等于corePoolSize,创建新的线程执行该task;
  • 执行addWork失败,执行reject方法处理该任务。

总结一下,对于使用线程池的外部来说,线程池的机制是这样的:

  1. 如果正在运行的线程数 <
    coreSize,马上创建线程执行该task,不排队等待;
  2. 澳门新葡亰3522平台游戏,如果正在运行的线程数 >= coreSize,把该task放入队列;
  3. 如果队列已满 && 正在运行的线程数 <
    maximumPoolSize,创建新的线程执行该task;
  4. 如果队列已满 && 正在运行的线程数 >=
    maximumPoolSize,线程池调用handler的reject方法拒绝本次提交。