# Java线程池
# refer
要是以前有人这么讲线程池,我早就该明白了! (opens new window)
# 原理分析
Java自带线程池的基本原理在网上很多资料,这里只做简单说明。
线程池流程说明:

线程池相关类的依赖关系:

ThreadPoolExecutor内的消费任务模型

# 使用线程池
# 改造线程池
# 源码解读
我们从使用场景切入,对源码进行学习。
项目中使用线程池通常会先构造一个ThreadPoolExecutor,再通过submit提交task。
ThreadPoolExecutor pool =
new ThreadPoolExecutor(4, 4 * 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
// 通常submit一个Callable,submit是AbstractExecutorService中实现的。
pool.submit(() -> null);
开始进入JDK的线程池代码:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// 构造一个RunnableFuture,它实现了Runnable、Future两个接口
// 使该task既可以被当成Runnable执行,又可以作为Future结果返回
RunnableFuture<T> ftask = newTaskFor(task);
// 该方法是Executor接口类提供,由ThreadPoolExecutor实现
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
// FutureTask是RunnableFuture的一个具体实现
return new FutureTask<T>(callable);
}
ThreadPoolExecutor中的相关实现
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// ctl是线程池的状态组合值,代表了“状态+线程个数”
int c = ctl.get();
// 若工作线程数小于核心线程数
if (workerCountOf(c) < corePoolSize) {
// 创建新线程运行
if (addWorker(command, true))
return;
c = ctl.get();
}
// 1.判断线程是否运行 2.是运行的话则添加任务到阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 1.再次检查任务是否处于运行状态 2.不处于运行状态的话则从队列中移除任务,执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 若线程池为空,添加一个新线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 最后尝试添加线程,若失败则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
// 该方法主要完成的工作:增加线程、添加任务并执行
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 检查队列是否只在必要时为空(判断线程状态,且队列不为空)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 循环CAS增加线程个数
for (;;) {
int wc = workerCountOf(c);
// 若线程个数超限,返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS增加线程数,同时只能增加一个线程,成功则跳回retry
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
// CAS失败后,重新检查运行状态,若状态发生变化,则跳回retry重新执行,否则继续执行内层的CAS循环
if (runStateOf(c) != rs)
continue retry;
}
}
// 经过上面逻辑,CAS增加线程数一定是成功了
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 构造worker,会为其new一个新线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 增加独占锁,因为workers本身是用HashSet装的,需要保证线程安全
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 重新检查线程状态,避免线程池被shutdown
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将worker添加进HashSet中
workers.add(w);
int s = workers.size();
// 更新pool中最大线程数
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 添加完成后启动任务
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 若worker没启动则回滚创建过程
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
// 回滚worker的创建
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
ThreadPoolExecutor内部的Worker类实现
// 实现了AQS,也就支持了锁
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// 用于suppress javac warning,实际不参与序列化
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // 执行runWorker方法之前,禁止中断
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
// 内部Worker类会调用外部类的runWorker方法,实现了任务的执行
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 获取当前任务,从队列中获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// hook,支持在执行任务前做一些统计
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// hook,在任务执行后
afterExecute(task, thrown);
}
} finally {
task = null;
// 统计当前Worker完成了多少任务
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 执行清理工作
processWorkerExit(w, completedAbruptly);
}
}
// worker线程会不断调用getTask方法来获取task
private Runnable getTask() {
// poll()是否超时
boolean timedOut = false;
// 循环获取
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 线程池是终止状态,且队列为空,返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 判断worker是否会被淘汰
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 从阻塞队列中获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
ThreadPoolExecutor中定义状态使用了高低位的方式,将线程池状态和线程数量保存在一个ctl变量上,就不用使用其他开销大的同步方式了
// 记录线程池状态和线程数量,前3位表示线程状态,后29位表示线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// CAPACITY是一个后29位都是1的数
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池状态,也维持了一个递增的顺序
// -1的二进制全是1,左移29位,只剩高3位,最高的4位是1110
private static final int RUNNING = -1 << COUNT_BITS;
// 0000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 0010
private static final int STOP = 1 << COUNT_BITS;
// 0100
private static final int TIDYING = 2 << COUNT_BITS;
// 0110
private static final int TERMINATED = 3 << COUNT_BITS;
// 获取线程状态、工作线程数的方法
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
// 通过rs、wc还原成ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
c & CAPACITY能获取后29位在变量c的值,c & ~CAPACITY取反后再与上c就能获得线程状态的值。
ThreadPoolExecutor中状态变量ctl图

状态流转图
源码中的注释摘抄:
RUNNING -> SHUTDOWN On invocation of shutdown(), perhaps implicitly in finalize() (RUNNING or SHUTDOWN) -> STOP On invocation of shutdownNow() SHUTDOWN -> TIDYING When both queue and pool are empty STOP -> TIDYING When pool is empty TIDYING -> TERMINATED When the terminated() hook method has completed

- RUNNING:运行状态,接受新的任务并且处理队列中的任务。
- SHUTDOWN:关闭状态(调用了 shutdown 方法)。不接受新任务,,但是要处理队列中的任务。
- STOP:停止状态(调用了 shutdownNow 方法)。不接受新任务,也不处理队列中的任务,并且要中断正在处理的任务。
- TIDYING:所有的任务都已终止了,workerCount 为 0,线程池进入该状态后会调terminated() 方法进入 TERMINATED 状态。
- TERMINATED:终止状态,terminated() 方法调用结束后的状态。
// todo FutureTask的实现作用体现
# ThreadPoolExecutor
看过阿里的Java开发手册的同学知道,其【强制】规定了不允许使用Executors创建线程池,而应该通过ThreadPoolExecutor方式创建,这样能让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
ThreadPoolExecutor有以下几个提交任务的方式:
- submit(Runnable)
- submit(Runnable, T)
- submit(Callable)
- execute(Runnable)
- invokeAll(tasks)
- invokeAll(tasks, time)
- invokeAny(tasks)
- invokeAny(tasks)
上面的方法只有execute是无法获取到返回值的,平时若需要获取到返回值通常使用submint。
# Executor
ScheduledExecutorService
# Executors
Java Executors的四种线程 (opens new window)
Executors提供了哪些线程池:
- newCachedThreadPool
- newFixedThreadPool
- newScheduledThreadPool
- newSingleThreadExecutor
接下来就一一介绍
# newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收
ExecutorService executorService = Executors.newCachedThreadPool();
for(int i=0;i<5;i++){
final int index = i;
try {
Thread.sleep(index * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "," +index);
}
});
}
//控制台信息
pool-1-thread-1,0
pool-1-thread-1,1
pool-1-thread-1,2
pool-1-thread-1,3
pool-1-thread-1,4
# newFixedThreadPool
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(4);
for(int i=0;i<5;i++) {
final int index = i;
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + ", " + index);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
//控制台信息
pool-1-thread-1,0
pool-1-thread-2,1
pool-1-thread-3,2
pool-1-thread-4,3
pool-1-thread-1,4
# newScheduledThreadPool
创建一个定长线程池,支持周期和定时任务
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
System.out.println("before:" + System.currentTimeMillis()/1000);
scheduledThreadPool.schedule(new Runnable() {
@Override
public void run() {
System.out.println("延迟3秒执行的哦 :" + System.currentTimeMillis()/1000);
}
}, 3, TimeUnit.SECONDS);
System.out.println("after :" +System.currentTimeMillis()/1000);
//控制台信息
before:1518012703
after :1518012703
延迟3秒执行的哦 :1518012706
System.out.println("before:" + System.currentTimeMillis()/1000);
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("延迟1秒之后,3秒执行一次:" +System.currentTimeMillis()/1000);
}
}, 1, 3, TimeUnit.SECONDS);
System.out.println("after :" +System.currentTimeMillis()/1000);
# newSingleThreadExecutor
创建一个单线程化的线程池,只会用工作线程来执行任务,保证顺序
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i=0;i<10;i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "," + index);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
//控制台信息
pool-1-thread-1,0
pool-1-thread-1,1
pool-1-thread-1,2
pool-1-thread-1,3
pool-1-thread-1,4
# 关闭Executors线程池
Executors提供了两个方法关闭线程池,shutdown或shutdownNow。
两者的区别如下:
- shutdown不会直接关闭线程池,只是将线程池的状态设置为"shutdown"。
- shutdownNow是遍历线程池中所有工作线程,逐个调用他们的interrupt()来中断线程(无法响应中断的线程可能永远无法停止)。
需要注意:
- isShutdown在调用两个关闭方法时就会置为true
- isTerminated只有当所有线程关闭后才会置为true
# 线程池如何判断任务都执行完了?
在需要并行提高业务逻辑效率的情况下, 一般都会使用到线程池(防止过多的创建线程), 但往往在任务执行完后, 需要继续做其他业务逻辑, 此时则需要知道线程池中的任务是否都已经执行完.
# 使用shutdown()和isTerminated()
shutdown()是告诉线程池, 不允许接收新的任务了, 等待将正在执行的任务和队列中的任务完成后, 就完成你作为线程池的使命了. 使用isTerminated()获取到true需要两个条件:
- 调用过shutdown()
- 线程池的任务都已经执行完成
# 使用awaitTermination(1, TimeUnit.DAYS)
是isTerminated()的一种优雅用法, 调用awaitTermination()会产生阻塞, 需要等待满足条件后才放行(条件与isTerminated()基本一样):
- 调用过shutdown()
- 线程池的任务都已经执行完成, 或者设置的超时时间过了.
如果超时时间过了, 会
直接放行.
# 获取Future集合来判断是否任务都执行完成
将Runnable或者Callable,submit到线程池中, 每一次submit的同时会返回一个Future Future可用于:
- 判断任务是否完成
- 获取返回结果
- 取消任务
将submit返回的Future收集起来, 用来判断任务是否都完成了.
代码示例
// 创建线程池
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
ExecutorService pool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1000), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
Runnable runnable = () -> {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
// 提交线程任务并收集Future
List<Future> list = new ArrayList<>();
list.add(pool.submit(runnable));
list.add(pool.submit(runnable));
list.add(pool.submit(runnable));
list.add(pool.submit(runnable));
list.add(pool.submit(runnable));
list.add(pool.submit(runnable));
list.add(pool.submit(runnable));
list.add(pool.submit(runnable));
// 通过future.isDone()判断线程任务是否执行完毕
int size = list.size();
while (true){
int equalSize = 0;
for (Future future : list) {
if(future.isDone()){
equalSize++;
}
}
System.out.println("equalSize="+equalSize + "size="+size);
if(equalSize == size){
break;
}
}
pool.shutdown();//gracefully shutdown
# 使用CountDownLatch
使用CountDownLatch需要提前知道
总数
// 创建线程池(省略)
//创建CountDownLatch 需传入计数器
CountDownLatch countDownLatch = new CountDownLatch(2);
Runnable runnable = () -> {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//计数器减一
countDownLatch.countDown();
};
pool.execute(runnable);
pool.execute(runnable);
pool.execute(runnable);
pool.execute(runnable);
pool.execute(runnable);
pool.execute(runnable);
pool.execute(runnable);
pool.execute(runnable);
pool.execute(runnable);
pool.execute(runnable);
System.out.println("countDownLatch start await...");
//线程阻塞等待计数器为0
countDownLatch.await(100, TimeUnit.SECONDS);
System.out.println("countDownLatch has been executed...");
//gracefully shutdown
pool.shutdown();
# 如何配置线程池大小
关于线程池的大小,并没有一个需要严格遵守的“金规铁律”,按照任务性质,大概可以分为CPU密集型任务、IO密集型任务和混合型任务。
- CPU密集型任务:CPU密集型任务应配置尽可能小的线程,如配置Ncpu+1个线程的线程池。
- IO密集型任务:IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*Ncpu。
- 混合型任务:混合型任务可以按需拆分成CPU密集型任务和IO密集型任务。
当然,这个只是建议,实际上具体怎么配置,还要结合事前评估和测试、事中监控来确定一个大致的线程线程池大小。线程池大小也可以不用写死,使用动态配置的方式,以便调整。
任务队列
任务队列一般建议使用有界队列,无界队列可能会出现队列里任务无限堆积,导致内存溢出的异常。
# 线程池监控
如果在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,可以根据线程池的使用状况快速定位问题。
可以通过线程池提供的参数和方法来监控线程池:
getActiveCount() :线程池中正在执行任务的线程数量
getCompletedTaskCount() :线程池已完成的任务数量,该值小于等于 taskCount
getCorePoolSize() :线程池的核心线程数量
getLargestPoolSize():线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了 maximumPoolSize
getMaximumPoolSize():线程池的最大线程数量
getPoolSize() :线程池当前的线程数量
getTaskCount() :线程池已经执行的和未执行的任务总数
还可以通过扩展线程池来进行监控:
- 通过继承线程池来自定义线程池,重写线程池的beforeExecute、afterExecute和terminated方法。
- 也可以在任务执行前、执行后和线程池关闭前执行一些代码来进行监控。例如,监控任务的平均执行时间、最大执行时间和最小执行时间等。