您现在的位置是:亿华云 > IT科技类资讯

面试官:线程池是如何做到线程复用的?有了解过吗?

亿华云2025-10-03 02:15:10【IT科技类资讯】4人已围观

简介前言我们今天探讨ThreadPoolExecutor,一起来看下吧!ThreadPoolExecutor中是如何做到线程复用的?我们知道,一个线程在创建的时候会指定一个线程任务,当执行完这个线程任务之

前言

我们今天探讨ThreadPoolExecutor,面试一起来看下吧!

ThreadPoolExecutor中是官线过如何做到线程复用的?

我们知道,一个线程在创建的程池程复时候会指定一个线程任务,当执行完这个线程任务之后,到线线程自动销毁。用的有解但是面试线程池却可以复用线程,一个线程执行完线程任务后不销毁,官线过继续执行另外的程池程复线程任务。那么它是到线如何做到的?这得从addWorker()说起。

addWorker()

先看上半部分addWorker()。用的有解

private boolean addWorker(Runnable firstTask,面试 boolean core) {

retry:

for (;;) {

int c = ctl.get();

int rs = runStateOf(c);

// 对边界设定的检查

if (rs >= SHUTDOWN &&

! (rs == SHUTDOWN &&

firstTask == null &&

! workQueue.isEmpty()))

return false;

for (;;) {

int wc = workerCountOf(c);

if (wc >= CAPACITY ||

wc >= (core ? corePoolSize : maximumPoolSize))

return false;

if (compareAndIncrementWorkerCount(c))

break retry;

c = ctl.get(); // Re-read ctl

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

}

}

​retry:可能有些同学没用过,它只是官线过一个标记,它的程池程复下一个标记就是for循环,在for循环里面调用continue/break再紧接着retry标记时,就表示从这个地方开始执行continue/break操作,到线但这不是用的有解我们关注的重点。

从上面的代码,我们可以看出,ThreadPoolExecutor在创建线程时,云服务器会将线程封装成「工作线程worker」,并放入「工作线程组」中,然后这个worker反复从阻塞队列中拿任务去执行。这个addWorker是excute方法中调用的。

我们接着看下半部分。private boolean addWorker(Runnable firstTask, boolean core) {

// 上半部分

retry:

for (;;) {

int c = ctl.get();

int rs = runStateOf(c);

// Check if queue empty only if necessary.

if (rs >= SHUTDOWN &&

! (rs == SHUTDOWN &&

firstTask == null &&

! workQueue.isEmpty()))

return false;

for (;;) {

int wc = workerCountOf(c);

// core是ture,需要创建的线程为核心线程,则先判断当前线程是否大于核心线程

// 如果core是false,证明需要创建的是非核心线程,则先判断当前线程数是否大于总线程数

// 如果不小于,则返回false

if (wc >= CAPACITY ||

wc >= (core ? corePoolSize : maximumPoolSize))

return false;

if (compareAndIncrementWorkerCount(c))

break retry;

c = ctl.get(); // Re-read ctl

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

}

}

// 下半部分

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try {

// 创建worker对象

w = new Worker(firstTask);

final Thread t = w.thread;

if (t != null) {

// 获取线程全局锁

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

int rs = runStateOf(ctl.get());

// 判断线程池状态

if (rs < SHUTDOWN ||

(rs == SHUTDOWN && firstTask == null)) {

if (t.isAlive()) // precheck that t is startable

throw new IllegalThreadStateException();

// 将当前线程添加到线程组

workers.add(w);

int s = workers.size();

// 如果线程组中的线程数大于最大线程池数 largestPoolSize赋值s

if (s > largestPoolSize)

largestPoolSize = s;

// 添加成功

workerAdded = true;

}

} finally {

mainLock.unlock();

}

// 添加成功后执行线程

if (workerAdded) {

t.start();

workerStarted = true;

}

}

} finally {

// 添加失败后执行 addWorkerFailed

if (! workerStarted)

addWorkerFailed(w);

}

return workerStarted;

}

再看 addWorkerFailed(),与上边相反,相当于一个回滚操作,会移除失败的工作线程。

private void addWorkerFailed(Worker w) {

// 同样需要全局锁

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

if (w != null)

workers.remove(w);

decrementWorkerCount();

tryTerminate();

} finally {

mainLock.unlock();

}

}

Worker

我们接着看Worker对象。

private final class Worker

extends AbstractQueuedSynchronizer

implements Runnable

{

/

**

* This class will never be serialized, but we provide a

* serialVersionUID to suppress a javac warning.

*/

private static final long serialVersionUID = 6138294804551838833L;

/** Thread this worker is running in. Null if factory fails. */

final Thread thread;

/** Initial task to run. Possibly null. */

Runnable firstTask;

/** Per-thread task counter */

volatile long completedTasks;

Worker(Runnable firstTask) {

setState(-1); // inhibit interrupts until runWorker

this.firstTask = firstTask;

this.thread = getThreadFactory().newThread(this);

}

/** Delegates main run loop to outer runWorker */

public void run() {

runWorker(this);

}

//.....

// 省略下边代码

}

Worker类实现了Runnable接口,所以Worker也是一个线程任务。在构造方法中,创建了一个线程,回过头想想addWorker()里为啥可以t.start()应该很清楚了吧, 并且在构造方法中调用了线程工厂创建了一个线程实例,我们上节讲过线程工厂。其实这也不是关注的亿华云计算重点,重点是这个runWorker()。

final void runWorker(Worker w) {

// 获取当前的线程实例

Thread wt = Thread.currentThread();

// 直接从第一个任务开始执行

Runnable task = w.firstTask;

// 获取完之后把worker的firstTask置为null 防止下次获取到

w.firstTask = null;

// 线程启动之后,通过unlock方法释放锁

w.unlock(); // allow interrupts

// 线程异常退出时 为 true

boolean completedAbruptly = true;

try {

// Worker执行firstTask或从workQueue中获取任务,直到任务为空

while (task != null || (task = getTask()) != null) {

// 获取锁以防止在任务执行过程中发生中断

w.lock();

// 判断边界值 如果线程池中断 则中断线程

if ((runStateAtLeast(ctl.get(), STOP) ||

(Thread.interrupted() &&

runStateAtLeast(ctl.get(), STOP))) &&

!wt.isInterrupted())

wt.interrupt();

try {

// 相当于钩子方法

beforeExecute(wt, task);

Throwable thrown = null;

try {

// 执行任务

task.run();

} catch (RuntimeException x) {

thrown = x; throw x;

} catch (Error x) {

thrown = x; throw x;

} catch (Throwable x) {

thrown = x; throw new Error(x);

} finally {

afterExecute(task, thrown);

}

} finally {

task = null;

w.completedTasks++;

w.unlock();

}

}

completedAbruptly = false;

} finally {

processWorkerExit(w, completedAbruptly);

}

}

​首先去执行创建这个worker时就有的任务,当执行完这个任务后,worker的生命周期并没有结束,在while循环中,worker会不断地调用getTask方法从「阻塞队列」中获取任务然后调用task.run()执行任务,从而达到「复用线程」的目的。只要getTask方法不返回null,此线程就不会退出。

我们接着看getTask()​。

private Runnable getTask() {

boolean timedOut = false; // Did the last poll() time out?

for (;;) {

int c = ctl.get();

int rs = runStateOf(c);

// Check if queue empty only if necessary.

if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

decrementWorkerCount();

return null;

}

int wc = workerCountOf(c);

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 如果运行线程数超过了最大线程数,但是缓存队列已经空了,这时递减worker数量。

// 如果有设置允许线程超时或者线程数量超过了核心线程数量,并且线程在规定时间内均未poll到任务且队列为空则递减worker数量

if ((wc > maximumPoolSize || (timed && timedOut))

&& (wc > 1 || workQueue.isEmpty())) {

if (compareAndDecrementWorkerCount(c))

return null;

continue;

}

try {

// 如果timed为true,则会调用workQueue的poll方法获取任务.

// 超时时间是keepAliveTime。如果超过keepAliveTime时长,

// 如果timed为false, 则会调用workQueue的take方法阻塞在当前。

// 队列中有任务加入时,线程被唤醒,take方法返回任务,并执行。云服务器提供商

Runnable r = timed ?

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

workQueue.take();

if (r != null)

return r;

timedOut = true;

} catch (InterruptedException retry) {

timedOut = false;

}

}

}

大家有没有想过这里为啥要用take和poll,它们都是出队的操作,这么做有什么好处?

take & poll

​我们说take()方法会将核心线程阻塞挂起,这样一来它就不会占用太多的cpu资源,直到拿到Runnable 然后返回。

如果「allowCoreThreadTimeOut」设置为true,那么核心线程就会去调用poll方法,因为poll可能会返回null,所以这时候核心线程满足超时条件也会被销毁。

​非核心线程会workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),如果超时还没有拿到,下一次循环判断「compareAndDecrementWorkerCount」就会返回null,Worker对象的run()方法循环体的判断为null,任务结束,然后线程被系统回收 。

再回头看一下runWorker()是不是设计的很巧妙。

结束语

本节内容不是很好理解,想继续探讨的同学可以继续阅读它的源码,这部分内容了解一下就好,其实我们从源码中可以看到大量的线程状态检查,代码写的很健壮,可以从中学习一下。

很赞哦!(866)