您现在的位置是:亿华云 > IT科技
并发编程之ThreadPoolExecutor线程池原理解析
亿华云2025-10-03 02:55:12【IT科技】0人已围观
简介前言在介绍线程池之前,我们先回顾下线程的基本知识。其中线程池包括ThreadPoolExecutor 默认线程和ScheduledThreadPoolExecutor 定时线程池 ,本篇重点介绍Thr
前言
在介绍线程池之前,并发编程我们先回顾下线程的程池基本知识。其中线程池包括ThreadPoolExecutor 默认线程和ScheduledThreadPoolExecutor 定时线程池 ,原理本篇重点介绍ThreadPoolExecutor线程池。解析
线程
线程是并发编程调度CPU资源的最小单位,线程模型分为KLT模型与ULT模型,程池JVM使用的原理是KLT模型,Java线程与OS线程保持 1:1 的解析映射关系,也就是并发编程说有一个Java线程也会在操作系统里有一个对应的线程。
内核线程模型

内核线程(KLT):系统内核管理线程(KLT),程池内核保存线程的原理状态和上下文信息,线程阻塞不会引起进程阻塞。解析在多处理器系统上,并发编程多线程在多处理器上并行运行。程池线程的原理创建、调度和管理由内核完成,效率比ULT要慢,比进程操作快。
用户线程模型

用户线程(ULT):用户程序实现,不依赖操作系统核心,应用提供创建、同步、调度和管理线程的函数来控制用户线程。云服务器提供商不需要用户态/内核态切换,速度快。内核对ULT无感知,线程阻塞则进程(包括它的所有线程)阻塞。
Java线程生命状态
Java线程有多种生命状态:
NEW,新建 RUNNABLE,运行 BLOCKED,阻塞 WAITING ,等待 TIMED_WAITING,超时等待 TERMINATED,终结状态切换如下图所示:

Java线程实现方式
Java线程实现方式主要有四种:
继承Thread类 实现Runnable接口、 实现Callable接口通过FutureTask包装器来创建Thread线程、 使用ExecutorService、Callable、Future实现有返回结果的多线程。其中前两种方式线程执行完后都没有返回值,后两种是带返回值的。
继承Thread类创建线程
Thread类本质上是实现了Runnable接口的一个实例,代表一个线程的实例。启动线程的唯一方法就是通过Thread类的start()实例方法。start()方法是一个native方法,它将启动一个新线程,服务器托管并执行run()方法。这种方式实现多线程很简单,通过自己的类直接extend Thread,并复写run()方法,就可以启动新线程并执行自己定义的run()方法。例如:
public class MyThread extends Thread { public void run() { System.out.println("关注一角钱技术,获取Java架构资料"); } } MyThread myThread1 = new MyThread(); MyThread myThread2 = new MyThread(); myThread1.start(); myThread2.start();实现Runnable接口创建线程
如果自己的类已经extends另一个类,就无法直接extends Thread,此时,可以实现一个Runnable接口,如下:
// 实现Runnable接口的类将被Thread执行,表示一个基本的任务 public interface Runnable { // run方法就是它所有的内容,就是实际执行的任务 public abstract void run(); }
public class MyThread implements Runnable { public void run() { System.out.println("关注一角钱技术,获取Java架构资料"); } }为了启动MyThread,需要首先实例化一个Thread,并传入自己的MyThread实例:
MyThread myThread = new MyThread(); Thread thread = new Thread(myThread); thread.start();事实上,当传入一个Runnable target参数给Thread后,Thread的run()方法就会调用target.run(),参考JDK源代码:
public void run() { if (target != null) { target.run(); } }实现Callable接口通过FutureTask包装器来创建Thread线程
Callable接口(也只有一个方法)定义如下:
public interface Callable
//Callable同样是任务,与Runnable接口的区别在于它接收泛型,云南idc服务商同时它执行任务后带有返回内容 public class SomeCallable
使用ExecutorService、Callable、Future实现有返回结果的线程
ExecutorService、Callable、Future三个接口实际上都是属于Executor框架。返回结果的线程是在JDK1.5中引入的新特征,有了这种特征就不需要再为了得到返回值而大费周折了。而且自己实现了也可能漏洞百出。(下部分来讲线程池了)
可返回值的任务必须实现Callable接口。 类似的,无返回值的任务必须实现Runnable接口。执行Callable任务后,可以获取一个Future的对象,在该对象上调用get就可以获取到Callable任务返回的Object了。
注意:get方法是阻塞的,即:线程无返回结果,get方法会一直等待。再结合线程池接口ExecutorService就可以实现传说中有返回结果的多线程了。
下面提供了一个完整的有返回结果的多线程测试例子。代码如下:
package com.niuh.thread.v4; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * <p> * 使用ExecutorService、Callable、Future实现有返回结果的线程 * </p> */ public class MyThread { public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println(("----程序开始运行----")); Date date1 = new Date(); int taskSize = 5; // 创建一个线程池 ExecutorService pool = Executors.newFixedThreadPool(taskSize); // 创建多个有返回值的任务 List<Future> list = new ArrayList<Future>(); for (int i = 0; i < taskSize; i++) { Callable c = new MyCallable(i + " "); // 执行任务并获取Future对象 Future f = pool.submit(c); // System.out.println(">>>" + f.get().toString()); list.add(f); } // 关闭线程池 pool.shutdown(); // 获取所有并发任务的运行结果 for (Future f : list) { // 从Future对象上获取任务的返回值,并输出到控制台 System.out.println(">>>" + f.get().toString()); } Date date2 = new Date(); System.out.println("----程序结束运行----,程序运行时间【" + (date2.getTime() - date1.getTime()) + "毫秒】"); } } class MyCallable implements Callable<Object> { private String taskNum; MyCallable(String taskNum) { this.taskNum = taskNum; } public Object call() throws Exception { System.out.println(">>>" + taskNum + "任务启动"); Date dateTmp1 = new Date(); Thread.sleep(1000); Date dateTmp2 = new Date(); long time = dateTmp2.getTime() - dateTmp1.getTime(); System.out.println(">>>" + taskNum + "任务终止"); return taskNum + "任务返回运行结果,当前任务时间【" + time + "毫秒】"; } }协程
协程(纤程,用户级线程),目的是为了追求最大力度的发挥硬件性能和提升软件的速度,协程基本原理是:在某个点挂起当前的任务,并且保存栈信息,去执行另一个任务;等完成或达到某个条件时,再还原原来的栈信息并继续执行(整个过程不需要上下文切换)。
协程的概念很早就提出来了,但直到最近几年才在某些语言(如Lua)中得到广泛应用。
协程的目的:当我们在使用多线程的时候,如果存在长时间的I/O操作。这个时候线程一直处于阻塞状态,如果线程很多的时候,会存在很多线程处于空闲状态,造成了资源应用不彻底。相对的协程不一样了,在单线程中多个任务来回执行如果出现长时间的I/O操作,让其让出目前的协程调度,执行下一个任务。当然可能所有任务,全部卡在同一个点上,但是这只是针对于单线程而言,当所有数据正常返回时,会同时处理当前的I/O操作。
Java原生不支持协程,在纯java代码里需要使用协程的话需要引入第三方包,如:quasar
<dependency> <groupId>co.paralleluniverse</groupId> <artifactId>quasar-core</artifactId> <version>0.8.0</version> <classifier>jdk8</classifier> </dependency>线程池
“线程池”,顾名思义就是一个线程缓存,线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,因此 Java 中提供线程池对线程进行同一分配、调优和监控。
线程池介绍
在web开发中,服务器需要接受并处理请求,所以会为一个请求分配一个线程来进行处理。如果每次请求都创建一个线程的话实现起来非常简单,但是存在一个问题:如果并发的请求数量非常多,但每个线程执行的时间很短,这样就会频繁的创建和销毁线程,如此一来会大大降低系统的效率。可能出现服务器在为每个请求创建新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多。
那么有没有一种办法使执行完一个任务,并不被销毁,而是可以继续执行其他的任务呢?
这就是线程池的目的。线程池为线程生命周期的开销和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到多个任务上。
什么时候使用线程池?
单个任务处理时间比较短; 需要处理的任务数量很大。线程池优势
重用存在的线程。减少线程黄金、消亡的开销,提高性能; 提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行; 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行同一的分配、调优和监控。Executor框架
Executor接口是线程池框架中最基础的部分,定义来一个用于执行 Runnable 的 execute 方法。下面为它的继承与实现

ExecutorService接口
从图中可以看出 Executor 下有一个重要的子接口 ExecutorService ,其中定义来线程池的具体行为

AbstractExcutorService抽象类
此类的定义并没有特殊的意义仅仅是实现了ExecutorService接口

线程池的具体实现

ThreadPoolExecutor
线程池重点属性
//用来标记线程池状态(高3位),线程个数(低29位) //默认是RUNNING状态,线程个数为0 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //线程个数掩码位数,并不是所有平台int类型是32位,所以准确说是具体平台下Integer的二进制位数-3后的剩余位数才是线程的个数, private static final int COUNT_BITS = Integer.SIZE - 3; //线程最大个数(低29位)000 11111111111111111111111111111 private static final int CAPACITY = (1 << COUNT_BITS) - 1;ctl是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。
ctl相关方法
runStateOf:获取运行状态; workerCountOf:获取活动线程数; ctlOf:获取运行状态和活动线程数的值。 / 获取高三位 运行状态 private static int runStateOf(int c) { return c & ~CAPACITY; } //获取低29位 线程个数 private static int workerCountOf(int c) { return c & CAPACITY; } //计算ctl新值,线程状态 与 线程个数 private static int ctlOf(int rs, int wc) { return rs | wc; }线程池存在5种状态
//运行中 111 00000000000000000000000000000 private static final int RUNNING = -1 << COUNT_BITS; //关闭 000 00000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; //停止 001 00000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS; //整理 010 00000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; //终止 011 00000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS;使用一个整形,前3位表示状态,后29位表示线程容量,也就是说线程最多有 230−1 个

也可以看出当ctl小于零表示线程池仍在运行
RUNNING
状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0!SHUTDOWN
状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。STOP
状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。TIDYING
状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。TERMINATED
状态说明:线程池彻底终止,就变成TERMINATED状态。 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。进入TERMINATED的条件如下:
线程池不是RUNNING状态; 线程池状态不是TIDYING状态或TERMINATED状态; 如果线程池状态是SHUTDOWN并且workerQueue为空; workerCount为0; 设置TIDYING状态成功。
线程池参数
corePoolSize
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
maximumPoolSize
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;
keepAliveTim
线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime;
unit
keepAliveTime的单位;
workQueue
用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:
1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
4、priorityBlockingQuene:具有优先级的无界阻塞队列;
threadFactory
它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
handler
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
AbortPolicy:直接抛出异常,默认策略; CallerRunsPolicy:用调用者所在的线程来执行任务; DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务; DiscardPolicy:直接丢弃任务;上面的4种策略都是ThreadPoolExecutor的内部类。

当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
线程池的创建
有四个构造函数,其他三个都是调用下面代码中的这个构造函数
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)线程池监控
public long getTaskCount() //线程池已执行与未执行的任务总数 public long getCompletedTaskCount() //已完成的任务数 public int getPoolSize() //线程池当前的线程数 public int getActiveCount() //线程池中正在执行任务的线程数量线程池原理

核心方法分析
由于篇幅有限,核心方法解析请阅读文末的扩展链接。
PS:以上代码提交在 Github :
https://github.com/Niuh-Study/niuh-juc-final.git
很赞哦!(3335)