【死记硬背】
1 理解
线程是稀缺资源,它的创建和销毁是一个相对偏重且耗资源的操作,而Java线程依赖于内核线程,创建线程需要进行操作系统状态切换,为避免资源过度消耗需要设法重用线程执行多个任务。线程池就是一个线程缓存,负责对线程进行统一分配、调度和监控。
2 线程池的作用
* 限定线程的个数,不会导致由于线程过多导致系统运行缓慢或崩溃。
* 线程池不需要每次都去创建和销毁,节约了资源,并且响应速度快。
3 什么时候用线程池
* 单个任务处理的时间比较短。
* 需要处理的任务数量很大。
4 如何设置线程池的大小
* CPU密集型任务:
CPU个数+1的线程数
* IO密集型任务:
两倍CPU个数+1的线程数
5 线程池的组成
一般的线程池主要由4部分组成:
5.1 线程池管理器:用于创建并管理线程池。
5.2 工作线程:线程池中的线程。
5.3 任务接口:每个任务必须实现的接口,用于工作线程调度其运行。
5.4 任务队列:用于存放待处理的任务,提供一种缓冲机制。
6 线程池的实现方式
线程池的主要实现方式有五种:newCachedThreadPool、newFixedThreadPool、newScheduledThreadPool、newSingleThreadExecutor和newWorkStealingPool。是通过Executor框架实现的,主要用到了Executor、Executors、ExecutorService、ThreadPoolExecutor、Callable、Future和FutureTask这几个类。
7 线程池涉及到的主要参数
7.1 corePoolSize:指定了线程池中的线程数量。
7.2 maximumPoolSize:指定了线程池中最大线程数量。
7.3 keepAliveTime:当前线程池数量超过corePoolSize时,多余的空闲线程的存活时间,即多少时间内会被销毁。
7.4 unit:keepAliveTime的时间单位。
7.5 workQueue:任务队列,被提交但尚未被执行的任务。
7.6 threadFactory:线程工厂,用于创建线程,一般用默认的即可。
7.7 handler:拒绝策略,当任务太多来不及处理,如何拒绝任务。
8 拒绝策略
JDK内置的拒绝策略如下:
8.1 AbortPolicy:直接抛出异常,阻止系统正常运行。
8.2 CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。
8.3 DiscardOldestPolicy:丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
8.4 DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这是最好的一种方案。
以上内置拒绝策略均实现了RejectedExecutionHandler接口,若以上策略扔无法满足实际需要,完全可以自己扩展RejectedExecutionHandler接口。
9 Java线程池的工作过程
9.1 线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
9.2 当调用 execute() 方法添加一个任务时,线程池会做如下判断:
a) 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
b) 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
c) 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
d) 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池
会抛出异常 RejectExecutionException。
9.3 当一个线程完成任务时,它会从队列中取下一个任务来执行。
9.4 当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
【答案解析】
五种常用线程池的实现方式:
newCachedThreadPool
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 它是一个可以无限扩大的线程池;
* 它比较适合处理执行时间比较小的任务;
* corePoolSize为0,maximumPoolSize为无限大,意味着线程数量可以无限大;
* keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死;
* 采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,
* 就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。
*/
public class CachedThreadPoolTest {
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i=0;i<1000;i++){
/*try{
Thread.sleep(2000);
}catch (InterruptedException e){
e.printStackTrace();
}*/
cachedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"正在被执行");
}
});
System.out.println("执行完毕");
}
}
}
newFixedThreadPool
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 它是一种固定大小的线程池;
* corePoolSize和maximunPoolSize都为用户设定的线程数量nThreads;
* keepAliveTime为0,意味着一旦有多余的空闲线程,就会被立即停止掉;
* 但这里keepAliveTime无效;
* 阻塞队列采用了LinkedBlockingQueue,它是一个无界队列;
* 由于阻塞队列是一个无界队列,因此永远不可能拒绝任务;
* 由于采用了无界队列,实际线程数量将永远维持在nThreads,因此maximumPoolSize和keepAliveTime将无效
*/
public class FixedThreadPoolTest {
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i=0;i<10;i++){
final int index = i;
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
try{
System.out.println(Thread.currentThread().getName()+"正在执行"+index);
Thread.sleep(2000);
}catch (InterruptedException e){
e.printStackTrace();
}
}
});
}
}
}
newScheduledThreadPool
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 它接收SchduledFutureTask类型的任务,有两种提交任务的方式:
* 1 scheduledAtFixedRate
* 2 scheduledWithFixedDelay
* SchduledFutureTask接收的参数:
* time:任务开始的时间
* sequenceNumber:任务的序号
* period:任务执行的时间间隔
* 它采用DelayQueue存储等待的任务
* DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若time相同则根据sequenceNumber排序;
* DelayQueue也是一个无界队列;
* 工作线程的执行过程:
* 工作线程会从DelayQueue取已经到期的任务去执行;
* 执行结束后重新设置任务的到期时间,再次放回DelayQueue
*/
public class ScheduledThreadPoolTest {
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
/**/
scheduledThreadPool.schedule(new Runnable(){
public void run(){
System.out.println("延迟1秒执行");
}
}, 1, TimeUnit.SECONDS);
scheduledThreadPool.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println("执行");
}
},1,1,TimeUnit.SECONDS);
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("执行3");
}
},1,3,TimeUnit.SECONDS);
}
}
newSingleThreadExecutor
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 它只会创建一条工作线程处理任务;
* 采用的阻塞队列为LinkedBlockingQueue;
*/
public class SingleThreadExecutorTest {
public static void main(String[] args) {
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable() {
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "正在被执行,打印的值是:" + index);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
newWorkStealingPool
import java.util.concurrent.*;
public class WorkStealingPoolTest {
// 线程数
private static final int threads = 10;
// 用于计数线程是否执行完成
static CountDownLatch countDownLatch = new CountDownLatch(threads);
public static void main(String[] args) throws Exception{
test3();
}
public static void test1() throws Exception{
System.out.println("---- start ----");
ExecutorService executorService = Executors.newWorkStealingPool();
for (int i = 0; i < threads; i++) {
executorService.execute(() -> {
try {
System.out.println(Thread.currentThread().getName());
} catch (Exception e) {
System.out.println(e);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
System.out.println("---- end ----");
}
public static void test2() throws InterruptedException{
System.out.println("---- start ----");
ExecutorService executorService = Executors.newWorkStealingPool();
for (int i = 0; i < threads; i++) {
// Callable 带返回值
executorService.submit(new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}
}));
}
countDownLatch.await();
System.out.println("---- end ----");
}
public static void test3() throws Exception{
System.out.println("---- start ----");
ExecutorService executorService = Executors.newWorkStealingPool();
for (int i = 0; i < threads; i++) {
// Runnable 带返回值
FutureTask> futureTask = new FutureTask<>(new Callable() {
/**
* call
* @return currentThreadName
*/
@Override
public String call() {
return Thread.currentThread().getName();
}
});
executorService.submit(new Thread(futureTask));
System.out.println(futureTask.get());
}
System.out.println("---- end ----");
}
}
【温馨提示】
点赞+收藏文章,关注我并私信回复【面试题解析】,即可100%免费领取楼主的所有面试题资料!