线程池的创建与使用
当我们要并发处理一些任务的时候,需要使用到多线程技术。一般是通过new Thread()
来新建一个线程实现并发执行,但是这样做其实有不少的弊端:主要是因为频繁进行线程的创建和销毁非常耗费性能和资源,一个线程大费周章的创建后,可能只是简单执行一些任务后就马上大费周章的销毁,做不到复用。同时如果短时间内大量的任务到来,线程大量的创建,此时系统会出现性能的瓶颈,甚至占用过多资源而阻塞系统或oom
等状况,从而降低系统的稳定性。因此使用线程池是更常用的方法,使用线程池的好处能够很好的解决上面面临的问题:降低系统资源消耗、提高系统响应速度、方便线程的管理。
一般来说线程池的创建和使用有三种方式:一种是使用juc
包下的Executors
工厂类,另一种是直接new一个ThreadPoolExecutor
实例,其实juc
包下的Executors
工厂类底层也是new的ThreadPoolExecutor
实例来实现的。还有一种是使用Spring
封装的线程池,底层也是对ThreadPoolExecutor
的封装。
Java线程池的核心参数
线程池在创建的时候需要指定7个参数,分别是:
corePoolSize
:核心线程数,线程池中长期存活的线程数量,可以传一个Integer
类型的值。maximumPoolSize
:最大线程数,线程池允许创建的最大线程数量。当线程池中的长期存活的线程都在忙的时候,可以再临时创建线程来执行这个任务,但是新创建的临时线程也是有上限的,线程池中的长期线程+临时的线程<=maximumPoolSize
,可以传一个Integer
类型的值,且不能小于corePoolSize
。keepAliveTime
:空闲线程存活时间。当临时创建的线程执行完任务之后,空闲了keepAliveTime
的时间就会被回收释放,传入的是一个Long
类型的值。TimeUnit
:时间单位,表示上面的keepAliveTime
的时间单位。传入的是一个枚举类型。通常取值:TimeUnit.NANOSECONDS;
纳秒TimeUnit.MICROSECONDS;
微秒TimeUnit.MILLISECONDS;
毫秒TimeUnit.SECONDS;
秒TimeUnit.MINUTES;
分TimeUnit.HOURS;
时TimeUnit.DAYS;
天
BlockingQueue
:线程池任务队列。这是一个阻塞队列,用于存储等待执行的任务。当线程池中的线程都在忙碌时,新提交的任务会被放入队列中等待执行。可传的值可以是不同类型的阻塞队列:new ArrayBlockingQueue<Runnable/Callable>():
ArrayBlockingQueue
,一个由数组结构组成的有界阻塞队列。new LinkedBlockingQueue<Runnable/Callable>(): ``LinkedBlockingQueue
,一个由链表结构组成的有界阻塞队列。new SynchronousQueue<Runnable/Callable>():
SynchronousQueue
,一个不存储元素的阻塞队列,即直接提交给线程不保持它们。new PriorityBlockingQueue<Runnable/Callable>():
PriorityBlockingQueue
,一个支持优先级排序的无界阻塞队列。new DelayQueue<Runnable/Callable>():
DelayQueue
:一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素。new LinkedTransferQueue<Runnable/Callable>():
LinkedTransferQueue
,一个由链表结构组成的无界阻塞队列。与SynchronousQueue
类似,还含有非阻塞方法。new LinkedBlockingDeque<Runnable/Callable>():
LinkedBlockingDeque
,一个由链表结构组成的双向阻塞队列。
比较常用的是
LinkedBlockingQueue
,线程池的排队策略和BlockingQueue
息息相关。ThreadFactory
:创建线程的工厂。用于创建新的线程。通常情况下,可以使用Executors.defaultThreadFactory()
来使用默认的线程工厂,也可以自定义线程工厂以满足特定需求。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33//自定义线程工厂类
//实现ThreadFactory接口,并重写里面的newThread方法
public class MyThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
// 创建并返回自定义的线程对象
Thread thread = new Thread(r);
// 设置线程名称
thread.setName("Thread-" + r.hashCode());
// 设置线程优先级(最大值:10)
thread.setPriority(Thread.MAX_PRIORITY);
// 设置线程的守护状态
thread.setDaemon(false);
// 返回新线程
return thread;
}
}
//或者使用lambda表达式完成传参
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 3000l, TimeUnit.SECONDS, new ArrayBlockingQueue<>(200), r -> {
// 创建并返回自定义的线程对象
Thread thread = new Thread(r);
// 设置线程名称
thread.setName("Thread-" + r.hashCode());
// 设置线程优先级(最大值:10)
thread.setPriority(Thread.MAX_PRIORITY);
// 设置线程的守护状态
thread.setDaemon(false);
// 返回新线程
return thread;
});RejectedExecutionHandler
:拒绝策略。当线程池已经达到最大线程数并且任务队列也已满时,新提交的任务会被拒绝执行。这个参数用于定义在出现这种情况时应采取的策略。可传的值通常是预定义的策略,如:ThreadPoolExecutor.AbortPolicy
:拒绝并抛出异常。(默认为这种)ThreadPoolExecutor.CallerRunsPolicy
:使用当前调用的线程来执行此任务。ThreadPoolExecutor.DiscardOldestPolicy
:抛弃队列头部(最旧)的一个任务,并执行当前任务。ThreadPoolExecutor.DiscardPolicy
:忽略并抛弃当前任务。
Java线程池的使用
在了解了线程池的7个参数之后我么就可以创建一个线程池,使用它了。下面分别是上面提到的3中创建使用的方式。
juc
包下的Executors
工厂类
使用工具类Executors
生产一个线程池,最常用的线程池(三个静态方法)如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19// 特点:
// 1.维护了一个可自动扩容的线程容器,每当需要执行一个新任务时,有活跃的线程就使用该线程,否则就新建一条线程。
// 2.线程池的最大线程数为Integer.MAX_VALUE,基本上不受限制。
// 3.使用完毕后的线程会回归线程池,如果这个线程在60秒之后依旧空闲,那么就会被移除
// 4.线程池初始线程个数为0
// 适用于线程生命周期短且频繁创建的场景
ExecutorService newCachedThreadPool()
// 特点:
// 1.创建一个固定线程数量的线程池,在整个线程池的生命周期中,线程池中的数量不会变化
// 2.维护一个无界队列(暂存已提交的来不及执行的任务)按照任务的提交顺序,将任务执行完毕
// 这种类型的线程池适用于执行长时任务或需要限制并发数量的场景。
ExecutorService newFixedThreadPool(int nThreads)
// 特点:
// 1.线程数量固定为1个,在整个线程池的生命周期中,线程池中的数量不会变化
// 2.维护了一个无界队列(暂存已提交的来不及执行的任务)按照任务的提交顺序,将任务执行完毕
// 这种类型的线程池适用于需要保证任务顺序执行,不需要同时执行的场景
ExecutorService newSingleThreadExecutor()
其中无界队列指的是一个不确定长度的可自由扩容的队列,遵循”先进先出”的特点,理论上可以容量无限的任务。
使用以上方法就可以得到一个具有相应特点的线程池对象,具体情况使用哪个线程池要具体分析。
上面通过Executors
创建的线程池,返回的是一个实现了ExecutorService
接口的线程池,我们通常使用这个接口来承接返回的线程池,我们可以通过这个接口中的一系列方法来操作管理线程池,比如提交任务,关闭线程池等。
1
2
3
4
5
6
7
8
9
10
11
12//提交一个Runnable类型的任务(实现了Runnable接口,并重写了run方法的任务),任务的执行没有返回值,可以不接返回值
Future<?> submit(Runnable task);
//提交一个Callable类型的任务(实现了Callable接口,并重写而来call方法的任务),任务的执行有返回值,返回值封装在一个Future<T>类型的值中,通过其中的get()方法获得
Future<T> submit(Callable<T> task);
// 执行完任务列表中所有已提交的任务,关闭线程池,不再接受新任务。
void shutdown()
// 立刻停止所有正在执行的活动任务,也不再处理任务列表中等待的任务,并直接返回等待执行的任务列表。
List<Runnable> shutdownNow()
new ThreadPoolExecutor
实例
上面我们使用了工具类java.util.concurrent.Executors
的静态方法来获取一个线程池对象,但从实际的开发中讲,这种做法是错误的。比如阿里巴巴Java
开发规范手册中,就明确禁止用这种方式来新建线程池。
原因是什么呢?
主要有以下三个原因:
- 无界队列。
Executors.newFixedThreadPool
固定容量线程池和Executors.newSingleThreadExecutor
单个容量线程池都会创建一个无界的队列来支持任务提交。也就是说,只要不停的往线程池提交任务,这个任务队列就会不断变长。如果因一些意外问题导致任务处理速度完全跟不上提交速度,程序有堆溢出(OutOfMemoryError)
的风险。 - 无限的线程对象。
Executors.newCachedThreadPool
会创建一个线程数量可以无限制增长的线程池,实际上该线程池的线程容量最大值是Integer.MAX_VALUE
。这显然也是很有风险的事情,可能浪费大量线程资源,影响程序的效率。 - 配置不灵活问题:
Executors
工具类提供的方法创建的线程池,其线程数、队列策略等参数都是固定的,无法根据系统的实际需要进行调整。实际上,线程池的配置应该根据硬件条件,实际任务需求等灵活调整。
所以,对于生产环境而言,禁止直接使用Executors
工具类提供的方法创建线程池。而推荐直接通过ThreadPoolExecutor
来创建线程池实例,或者使用Spring
框架封装好的线程池。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25//使用ThreadPoolExecutor手动创建线程池,并提交任务
ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(20,
50,
600L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(200),
r -> {
// 创建并返回自定义的线程对象
Thread thread = new Thread(r);
// 设置线程名称
thread.setName("Thread-" + r.hashCode());
// 设置线程优先级(最大值:10)
thread.setPriority(Thread.MAX_PRIORITY);
// 设置线程的守护状态
thread.setDaemon(false);
// 返回新线程
return thread;
},
new ThreadPoolExecutor.AbortPolicy());
//提交任务
//提交Runnable任务
threadPoolExecutor1.submit(() -> System.out.println("你好朋友!"));
//提交Callable任务
System.out.println(threadPoolExecutor1.submit(() -> "你好呀朋友!!").get());
SpringBoot
配置使用线程池
Spring
框架也基于ThreadPoolExecutor
封装了一个线程池,在Spring
框架的应用中的使用上更加简便。
- 使用
SpringBoot
默认自带的线程池- 在启动类上加上
@EnableAsync
- 在需要异步处理的方法上加上
@Async
注解即可开启该方法的异步处理。
- 在启动类上加上
- 直接注入
ThreadPoolTaskExecutor
组件,该组件是Spring
框架在加载容器的时候自动创建并注册到容器中的。可以直接使用这个组件往Spring
自带的线程池中提交任务。1
2
3
4
5
6
7
8
9
10
11
12
13
14
class ThreadPoolApplicationTestsServiceImpl implements ThreadPoolApplicationTestsService {
//直接注入即可使用
ThreadPoolTaskExecutor threadPoolTaskExecutor;
void contextLoads() throws InterruptedException {
//往Spring自带的线程池中提交任务,获得返回结果,并打印
System.out.println(threadPoolTaskExecutor.submit(() -> "你好呀小朋友!!");
Thread.sleep(1000);
//往Spring自带的线程池中提交任务,获得返回结果,并打印
System.out.println(threadPoolTaskExecutor.submit(() -> "你好呀大朋友!!");
}
} - 自己配置一个线程池。这里利用
SpringBoot
的自动装配的功能,配置一个自定义的线程池,并注册到容器中,供其他的组件注入使用。- 在配置文件中配置好相应的配置:
1
2
3
4
5
6
7
8
9
10# task.pool.corePoolSize=50
# task.pool.maxPoolSize=100
# task.pool.keepAliveSeconds=3000
# task.pool.queueCapacity=100
task:
pool:
core-size: 50
max-size: 100
keep-alive: 3000
queue-capacity: 100 - 写一个读取配置文件中相应配置的参数配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22package com.tzjw.marketweixin.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* 线程池参数类
*
* @author Xiaobin
* @since 2023/09/21 09:55
*/
public class ThreadPoolProperties {
private int coreSize;
private int maxSize;
private int keepAlive;
private int queueCapacity;
} - 写一个线程池配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54package com.tzjw.marketweixin.config;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 线程池配置类
*
* @author Xiaobin
* @since 2023/09/21 09:59
*/
public class TaskExecutePool{
private ThreadPoolProperties threadPoolProperties;
/**
* 修改默认线程池的配置,将自定义的线程池注册到容器中
*
* @return ThreadPoolTaskExecutor
*/
public ThreadPoolTaskExecutor myThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(threadPoolProperties.getCoreSize());
// 最大线程数
executor.setMaxPoolSize(threadPoolProperties.getMaxSize());
// 非核心线程活跃时间
executor.setKeepAliveSeconds(threadPoolProperties.getKeepAlive());
// 队列容量
executor.setQueueCapacity(threadPoolProperties.getQueueCapacity());
// 设置线程的前缀名
executor.setThreadNamePrefix("market");
// 设置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 是否在任务执行完后关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(false);
//初始化
executor.initialize();
return executor;
}
} - 到这里我们已将自定义的线程池注册到容器中,在容器的组件中注入该线程池就可以往线程池中提交任务并行处理。为了方便往线程池中提交任务,我们还可以写一个工具类,注册到容器中,简化提交任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43package com.tzjw.marketweixin.util;
import com.tzjw.marketweixin.config.TaskExecutePool;
import lombok.Data;
import lombok.SneakyThrows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.concurrent.*;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
/**
* 使用线程池的工具类
*
* @author Xiaobin
* @since 2023/09/21 10:25
*/
public class SpringThreadPoolUtil {
ThreadPoolTaskExecutor taskExecutor;
/**
* 往线程池中提交任务
* @param task
* @return java.util.concurrent.Future<T>
* @author Xiaobin
* @since 2023/09/21 16:46
*/
public <T> Future<T> submitTask(Callable<T> task) {
try {
return taskExecutor.submit(task);
} catch (Exception e) {
throw new RuntimeException("Error submitting task to the thread pool.", e);
}
}
} - 通过上面的封装之后,我们后续只需要在项目中引入
SpringThreadPoolUtil
这个组件,调用其中的submitTask
方法,传入一个Callable
类型的任务就可以向线程池中提交任务,并获得返回值。具体的使用实例如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class HomeIndexServiceImpl implements HomeIndexService {
//注入工具类对象
SpringThreadPoolUtil springThreadPoolUtil;
public Map<String, Object> getIndexData() {
// 使用线程池提交数据库查询任务
Future<List> adFuture = springThreadPoolUtil.submitTask(() -> {
MarketAdExample marketAdExample = new MarketAdExample();
marketAdExample.setOrderByClause("add_time desc");
marketAdExample.createCriteria().andDeletedEqualTo(false);
return marketAdMapper.selectByExample(new MarketAdExample());
});
//解析
List<MarketAd> = adFuture.get();
}
}
- 在配置文件中配置好相应的配置: