# Java线程池

# refer

要是以前有人这么讲线程池,我早就该明白了! (opens new window)

# 原理分析

Java自带线程池的基本原理在网上很多资料,这里只做简单说明。

线程池流程说明:

图片

线程池相关类的依赖关系:

image-20220604042232290

ThreadPoolExecutor内的消费任务模型

img

# 使用线程池

# 改造线程池

# 源码解读

我们从使用场景切入,对源码进行学习。

项目中使用线程池通常会先构造一个ThreadPoolExecutor,再通过submit提交task。

ThreadPoolExecutor pool = 
    new ThreadPoolExecutor(4, 4 * 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
// 通常submit一个Callable,submit是AbstractExecutorService中实现的。
pool.submit(() -> null);

开始进入JDK的线程池代码:

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    // 构造一个RunnableFuture,它实现了Runnable、Future两个接口
    // 使该task既可以被当成Runnable执行,又可以作为Future结果返回
    RunnableFuture<T> ftask = newTaskFor(task);
    // 该方法是Executor接口类提供,由ThreadPoolExecutor实现
    execute(ftask);
    return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    // FutureTask是RunnableFuture的一个具体实现
    return new FutureTask<T>(callable);
}

ThreadPoolExecutor中的相关实现

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // ctl是线程池的状态组合值,代表了“状态+线程个数”
    int c = ctl.get();
    // 若工作线程数小于核心线程数
    if (workerCountOf(c) < corePoolSize) {
        // 创建新线程运行
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 1.判断线程是否运行 2.是运行的话则添加任务到阻塞队列中
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 1.再次检查任务是否处于运行状态 2.不处于运行状态的话则从队列中移除任务,执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 若线程池为空,添加一个新线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 最后尝试添加线程,若失败则执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}

// 该方法主要完成的工作:增加线程、添加任务并执行
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // 检查队列是否只在必要时为空(判断线程状态,且队列不为空)
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        // 循环CAS增加线程个数
        for (;;) {
            int wc = workerCountOf(c);
            // 若线程个数超限,返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // CAS增加线程数,同时只能增加一个线程,成功则跳回retry
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();
            // CAS失败后,重新检查运行状态,若状态发生变化,则跳回retry重新执行,否则继续执行内层的CAS循环
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
    // 经过上面逻辑,CAS增加线程数一定是成功了
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 构造worker,会为其new一个新线程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 增加独占锁,因为workers本身是用HashSet装的,需要保证线程安全
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 重新检查线程状态,避免线程池被shutdown
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 将worker添加进HashSet中
                    workers.add(w);
                    int s = workers.size();
                    // 更新pool中最大线程数
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 添加完成后启动任务
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 若worker没启动则回滚创建过程
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

// 回滚worker的创建
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            workers.remove(w);
        decrementWorkerCount();
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

ThreadPoolExecutor内部的Worker类实现

// 实现了AQS,也就支持了锁
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    // 用于suppress javac warning,实际不参与序列化
    private static final long serialVersionUID = 6138294804551838833L;
    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;
    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // 执行runWorker方法之前,禁止中断
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

// 内部Worker类会调用外部类的runWorker方法,实现了任务的执行
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 获取当前任务,从队列中获取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // hook,支持在执行任务前做一些统计
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // hook,在任务执行后
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                // 统计当前Worker完成了多少任务
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 执行清理工作
        processWorkerExit(w, completedAbruptly);
    }
}

// worker线程会不断调用getTask方法来获取task
private Runnable getTask() {
    // poll()是否超时
    boolean timedOut = false;
    // 循环获取
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // 线程池是终止状态,且队列为空,返回null
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // 判断worker是否会被淘汰
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            // 从阻塞队列中获取任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

ThreadPoolExecutor中定义状态使用了高低位的方式,将线程池状态和线程数量保存在一个ctl变量上,就不用使用其他开销大的同步方式了

// 记录线程池状态和线程数量,前3位表示线程状态,后29位表示线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// CAPACITY是一个后29位都是1的数
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 线程池状态,也维持了一个递增的顺序
// -1的二进制全是1,左移29位,只剩高3位,最高的4位是1110
private static final int RUNNING    = -1 << COUNT_BITS;
// 0000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 0010
private static final int STOP       =  1 << COUNT_BITS;
// 0100
private static final int TIDYING    =  2 << COUNT_BITS;
// 0110
private static final int TERMINATED =  3 << COUNT_BITS;

// 获取线程状态、工作线程数的方法
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 通过rs、wc还原成ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }

c & CAPACITY能获取后29位在变量c的值,c & ~CAPACITY取反后再与上c就能获得线程状态的值。

ThreadPoolExecutor中状态变量ctl图

img

状态流转图

源码中的注释摘抄:

RUNNING -> SHUTDOWN On invocation of shutdown(), perhaps implicitly in finalize() (RUNNING or SHUTDOWN) -> STOP On invocation of shutdownNow() SHUTDOWN -> TIDYING When both queue and pool are empty STOP -> TIDYING When pool is empty TIDYING -> TERMINATED When the terminated() hook method has completed

img

  • RUNNING:运行状态,接受新的任务并且处理队列中的任务。
  • SHUTDOWN:关闭状态(调用了 shutdown 方法)。不接受新任务,,但是要处理队列中的任务。
  • STOP:停止状态(调用了 shutdownNow 方法)。不接受新任务,也不处理队列中的任务,并且要中断正在处理的任务。
  • TIDYING:所有的任务都已终止了,workerCount 为 0,线程池进入该状态后会调terminated() 方法进入 TERMINATED 状态。
  • TERMINATED:终止状态,terminated() 方法调用结束后的状态。

// todo FutureTask的实现作用体现

# ThreadPoolExecutor

看过阿里的Java开发手册的同学知道,其【强制】规定了不允许使用Executors创建线程池,而应该通过ThreadPoolExecutor方式创建,这样能让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

ThreadPoolExecutor有以下几个提交任务的方式:

  • submit(Runnable)
  • submit(Runnable, T)
  • submit(Callable)
  • execute(Runnable)
  • invokeAll(tasks)
  • invokeAll(tasks, time)
  • invokeAny(tasks)
  • invokeAny(tasks)

上面的方法只有execute是无法获取到返回值的,平时若需要获取到返回值通常使用submint。

# Executor

ScheduledExecutorService

# Executors

Java Executors的四种线程 (opens new window)

Executors提供了哪些线程池:

  • newCachedThreadPool
  • newFixedThreadPool
  • newScheduledThreadPool
  • newSingleThreadExecutor

接下来就一一介绍

# newCachedThreadPool

创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收

ExecutorService executorService = Executors.newCachedThreadPool();
for(int i=0;i<5;i++){
  final int index = i;
  try {
    Thread.sleep(index * 1000);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  executorService.execute(new Runnable() {
    @Override
    public void run() {
      System.out.println(Thread.currentThread().getName() + "," +index);
    }
  });
}
//控制台信息
pool-1-thread-1,0
pool-1-thread-1,1
pool-1-thread-1,2
pool-1-thread-1,3
pool-1-thread-1,4

# newFixedThreadPool

创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(4);
for(int i=0;i<5;i++) {
  final int index = i;
    fixedThreadPool.execute(new Runnable() {
    @Override
    public void run() {
      try {
        System.out.println(Thread.currentThread().getName() + ", " + index);
        Thread.sleep(2000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  });
}
//控制台信息
pool-1-thread-1,0
pool-1-thread-2,1
pool-1-thread-3,2
pool-1-thread-4,3
pool-1-thread-1,4

# newScheduledThreadPool

创建一个定长线程池,支持周期和定时任务

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
System.out.println("before:" + System.currentTimeMillis()/1000);
scheduledThreadPool.schedule(new Runnable() {
  @Override
  public void run() {
    System.out.println("延迟3秒执行的哦 :" + System.currentTimeMillis()/1000);
  }
}, 3, TimeUnit.SECONDS);
System.out.println("after :" +System.currentTimeMillis()/1000);
//控制台信息
before:1518012703
after :1518012703
延迟3秒执行的哦 :1518012706
System.out.println("before:" + System.currentTimeMillis()/1000);
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
  @Override
  public void run() {
    System.out.println("延迟1秒之后,3秒执行一次:" +System.currentTimeMillis()/1000);
  }
}, 1, 3, TimeUnit.SECONDS);
System.out.println("after :" +System.currentTimeMillis()/1000);

# newSingleThreadExecutor

创建一个单线程化的线程池,只会用工作线程来执行任务,保证顺序

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i=0;i<10;i++) {
  final int index = i;
  singleThreadExecutor.execute(new Runnable() {
    @Override
    public void run() {
      try {
         System.out.println(Thread.currentThread().getName() + "," + index);
        Thread.sleep(2000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  });
}

//控制台信息
pool-1-thread-1,0
pool-1-thread-1,1
pool-1-thread-1,2
pool-1-thread-1,3
pool-1-thread-1,4

# 关闭Executors线程池

Executors提供了两个方法关闭线程池,shutdown或shutdownNow。

两者的区别如下:

  1. shutdown不会直接关闭线程池,只是将线程池的状态设置为"shutdown"。
  2. shutdownNow是遍历线程池中所有工作线程,逐个调用他们的interrupt()来中断线程(无法响应中断的线程可能永远无法停止)。

需要注意:

  1. isShutdown在调用两个关闭方法时就会置为true
  2. isTerminated只有当所有线程关闭后才会置为true

# 线程池如何判断任务都执行完了?

在需要并行提高业务逻辑效率的情况下, 一般都会使用到线程池(防止过多的创建线程), 但往往在任务执行完后, 需要继续做其他业务逻辑, 此时则需要知道线程池中的任务是否都已经执行完.

# 使用shutdown()和isTerminated()

shutdown()是告诉线程池, 不允许接收新的任务了, 等待将正在执行的任务和队列中的任务完成后, 就完成你作为线程池的使命了. 使用isTerminated()获取到true需要两个条件:

  1. 调用过shutdown()
  2. 线程池的任务都已经执行完成

# 使用awaitTermination(1, TimeUnit.DAYS)

是isTerminated()的一种优雅用法, 调用awaitTermination()会产生阻塞, 需要等待满足条件后才放行(条件与isTerminated()基本一样):

  1. 调用过shutdown()
  2. 线程池的任务都已经执行完成, 或者设置的超时时间过了.

如果超时时间过了, 会直接放行.

# 获取Future集合来判断是否任务都执行完成

将Runnable或者Callable,submit到线程池中, 每一次submit的同时会返回一个Future Future可用于:

  1. 判断任务是否完成
  2. 获取返回结果
  3. 取消任务

将submit返回的Future收集起来, 用来判断任务是否都完成了.

代码示例

// 创建线程池
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
ExecutorService pool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>(1000), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
Runnable runnable = () -> {
    System.out.println(Thread.currentThread().getName());
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};
// 提交线程任务并收集Future
List<Future> list = new ArrayList<>();
list.add(pool.submit(runnable));
list.add(pool.submit(runnable));
list.add(pool.submit(runnable));
list.add(pool.submit(runnable));
list.add(pool.submit(runnable));
list.add(pool.submit(runnable));
list.add(pool.submit(runnable));
list.add(pool.submit(runnable));

// 通过future.isDone()判断线程任务是否执行完毕
int size = list.size();
while (true){
    int equalSize = 0;
    for (Future future : list) {
        if(future.isDone()){
            equalSize++;
        }
    }
    System.out.println("equalSize="+equalSize + "size="+size);
    if(equalSize == size){
        break;
    }
}

pool.shutdown();//gracefully shutdown

# 使用CountDownLatch

使用CountDownLatch需要提前知道总数

// 创建线程池(省略)

//创建CountDownLatch 需传入计数器
CountDownLatch countDownLatch = new CountDownLatch(2);
Runnable runnable = () -> {
    System.out.println(Thread.currentThread().getName());
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    //计数器减一
    countDownLatch.countDown();
};
pool.execute(runnable);
pool.execute(runnable);
pool.execute(runnable);
pool.execute(runnable);
pool.execute(runnable);
pool.execute(runnable);
pool.execute(runnable);
pool.execute(runnable);
pool.execute(runnable);
pool.execute(runnable);

System.out.println("countDownLatch start await...");
//线程阻塞等待计数器为0
countDownLatch.await(100, TimeUnit.SECONDS);
System.out.println("countDownLatch has been executed...");
//gracefully shutdown
pool.shutdown();

# 如何配置线程池大小

关于线程池的大小,并没有一个需要严格遵守的“金规铁律”,按照任务性质,大概可以分为CPU密集型任务、IO密集型任务和混合型任务。

  • CPU密集型任务:CPU密集型任务应配置尽可能小的线程,如配置Ncpu+1个线程的线程池。
  • IO密集型任务:IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*Ncpu。
  • 混合型任务:混合型任务可以按需拆分成CPU密集型任务和IO密集型任务。

当然,这个只是建议,实际上具体怎么配置,还要结合事前评估和测试、事中监控来确定一个大致的线程线程池大小。线程池大小也可以不用写死,使用动态配置的方式,以便调整。

任务队列

任务队列一般建议使用有界队列,无界队列可能会出现队列里任务无限堆积,导致内存溢出的异常。

# 线程池监控

如果在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,可以根据线程池的使用状况快速定位问题。

可以通过线程池提供的参数和方法来监控线程池:

  • getActiveCount() :线程池中正在执行任务的线程数量

  • getCompletedTaskCount() :线程池已完成的任务数量,该值小于等于 taskCount

  • getCorePoolSize() :线程池的核心线程数量

  • getLargestPoolSize():线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了 maximumPoolSize

  • getMaximumPoolSize():线程池的最大线程数量

  • getPoolSize() :线程池当前的线程数量

  • getTaskCount() :线程池已经执行的和未执行的任务总数

还可以通过扩展线程池来进行监控:

  • 通过继承线程池来自定义线程池,重写线程池的beforeExecute、afterExecute和terminated方法。
  • 也可以在任务执行前、执行后和线程池关闭前执行一些代码来进行监控。例如,监控任务的平均执行时间、最大执行时间和最小执行时间等。
修改于: 8/11/2022, 3:17:56 PM