0%

ThreadPoolExecutor源码解析

线程池程序开发中经常用到,比如网络和本地数据的访问 ,使用线程池能有效的控制线程数量,避免线程的频繁创建和回收。

1.成员变量

1
private final BlockingQueue<Runnable> workQueue;

表示任务队列。

1
private final HashSet<Worker> workers = new HashSet<Worker>();

Worker可以理解为发动机,每个Worker都会开启一个线程,workers表示存放线程的集合。

1
private volatile long  keepAliveTime;

keepAliveTime表示线程活跃时间。因为线程池有核心线程个数,有最大线程个数,当线程池中的线程大于核心线程数时,线程的闲置时间超过keepAliveTime将会被回收。

1
2
3
4
5
private volatile int   corePoolSize;

private volatile int maximumPoolSize;

private volatile int poolSize;

分别表示核心线程个数,最大线程个数,当前线程个数。

1
private volatile ThreadFactory threadFactory;

用于创建线程的工厂。

1
2
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();

当线程池中的线程数量和任务数量都达到最大值,就会抛出异常给RejectedExecutionHandler处理。

2.核心方法流程

1
2
3
4
5
6
7
8
9
10
public void execute(Runnable command) {
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}

(1)当当前线程个数小于核心线程个数,则会调用addIfUnderCorePoolSize。
(2)当当前线程个数大于核心线程个数进行下一步处理。

1
2
3
4
5
6
7
8
9
10
11
12
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
return t != null;
}

判断前线程个数小于核心线程个数,true则添加线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private Thread addThread(Runnable firstTask) {
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w);
boolean workerStarted = false;
if (t != null) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
w.thread = t;
workers.add(w);
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
try {
t.start();
workerStarted = true;
}
finally {
if (!workerStarted)
workers.remove(w);
}
}
return t;
}

创建新线程并启动。

ThreadPoolExecutor.Worker

1
2
3
4
5
6
7
8
9
10
11
12
13
public void run() {
try {
hasRun = true;
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}

首先运行初始化的第一个任务,后续从队列中获取任务。

ThreadPoolExecutor.Worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}

(1)线程已经停止,直接把任务队列中的数据获取出来,不阻塞登录获取。
(2)这里第二个判断条件是回收线程的重要的地方,在约定时间内获取任务,如果没有则返回null。
(3)阻塞等待获取任务。
(4)判断当前线程是否可用回收。

3. 添加任务处理流程

当一个任务通过execute(Runnable)方法欲添加到线程池时:
如果当前线程池中的数量小于corePoolSize,并线程池处于Running状态,创建并添加的任务。
如果当前线程池中的数量等于corePoolSize,并线程池处于Running状态,缓冲队列 workQueue未满,那么任务被放入缓冲队列、等待任务调度执行。
如果当前线程池中的数量大于corePoolSize,缓冲队列workQueue已满,并且线程池中的数量小于maximumPoolSize,新提交任务会创建新线程执行任务。
如果当前线程池中的数量大于corePoolSize,缓冲队列workQueue已满,并且线程池中的数量等于maximumPoolSize,新提交任务由Handler处理。
当线程池中的线程大于corePoolSize时,多余线程空闲时间超过keepAliveTime时,会关闭这部分线程。