当我们要并发处理一些任务的时候,需要使用到多线程技术。一般是通过new Thread()来新建一个线程实现并发执行,但是这样做其实有不少的弊端:主要是因为频繁进行线程的创建和销毁非常耗费性能和资源,一个线程大费周章的创建后,可能只是简单执行一些任务后就马上大费周章的销毁,做不到复用。同时如果短时间内大量的任务到来,线程大量的创建,此时系统会出现性能的瓶颈,甚至占用过多资源而阻塞系统或oom等状况,从而降低系统的稳定性。因此使用线程池是更常用的方法,使用线程池的好处能够很好的解决上面面临的问题:降低系统资源消耗、提高系统响应速度、方便线程的管理。

一般来说线程池的创建和使用有三种方式:一种是使用juc包下的Executors 工厂类,另一种是直接new一个ThreadPoolExecutor实例,其实juc包下的Executors 工厂类底层也是new的ThreadPoolExecutor实例来实现的。还有一种是使用Spring封装的线程池,底层也是对ThreadPoolExecutor的封装。

Java线程池的核心参数

线程池在创建的时候需要指定7个参数,分别是:

  1. corePoolSize:核心线程数,线程池中长期存活的线程数量,可以传一个Integer类型的值。

  2. maximumPoolSize:最大线程数,线程池允许创建的最大线程数量。当线程池中的长期存活的线程都在忙的时候,可以再临时创建线程来执行这个任务,但是新创建的临时线程也是有上限的,线程池中的长期线程+临时的线程<=maximumPoolSize,可以传一个Integer类型的值,且不能小于corePoolSize

  3. keepAliveTime:空闲线程存活时间。当临时创建的线程执行完任务之后,空闲了keepAliveTime的时间就会被回收释放,传入的是一个Long类型的值。

  4. TimeUnit:时间单位,表示上面的keepAliveTime的时间单位。传入的是一个枚举类型。通常取值:

    • TimeUnit.NANOSECONDS;纳秒
    • TimeUnit.MICROSECONDS;微秒
    • TimeUnit.MILLISECONDS;毫秒
    • TimeUnit.SECONDS;
    • TimeUnit.MINUTES;
    • TimeUnit.HOURS;
    • TimeUnit.DAYS;
  5. BlockingQueue:线程池任务队列。这是一个阻塞队列,用于存储等待执行的任务。当线程池中的线程都在忙碌时,新提交的任务会被放入队列中等待执行。可传的值可以是不同类型的阻塞队列:

    • new ArrayBlockingQueue<Runnable/Callable>(): ArrayBlockingQueue,一个由数组结构组成的有界阻塞队列
    • new LinkedBlockingQueue<Runnable/Callable>(): ``LinkedBlockingQueue,一个由链表结构组成的有界阻塞队列
    • new SynchronousQueue<Runnable/Callable>(): SynchronousQueue,一个不存储元素的阻塞队列,即直接提交给线程不保持它们。
    • new PriorityBlockingQueue<Runnable/Callable>(): PriorityBlockingQueue,一个支持优先级排序的无界阻塞队列
    • new DelayQueue<Runnable/Callable>(): DelayQueue:一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素。
    • new LinkedTransferQueue<Runnable/Callable>(): LinkedTransferQueue,一个由链表结构组成的无界阻塞队列。与SynchronousQueue类似,还含有非阻塞方法。
    • new LinkedBlockingDeque<Runnable/Callable>(): LinkedBlockingDeque,一个由链表结构组成的双向阻塞队列

    比较常用的是LinkedBlockingQueue,线程池的排队策略和BlockingQueue 息息相关。

  6. ThreadFactory:创建线程的工厂。用于创建新的线程。通常情况下,可以使用 Executors.defaultThreadFactory()来使用默认的线程工厂,也可以自定义线程工厂以满足特定需求。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    //自定义线程工厂类
    //实现ThreadFactory接口,并重写里面的newThread方法
    public class MyThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
    // 创建并返回自定义的线程对象
    Thread thread = new Thread(r);

    // 设置线程名称
    thread.setName("Thread-" + r.hashCode());
    // 设置线程优先级(最大值:10)
    thread.setPriority(Thread.MAX_PRIORITY);
    // 设置线程的守护状态
    thread.setDaemon(false);

    // 返回新线程
    return thread;
    }
    }

    //或者使用lambda表达式完成传参
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 3000l, TimeUnit.SECONDS, new ArrayBlockingQueue<>(200), r -> {
    // 创建并返回自定义的线程对象
    Thread thread = new Thread(r);
    // 设置线程名称
    thread.setName("Thread-" + r.hashCode());
    // 设置线程优先级(最大值:10)
    thread.setPriority(Thread.MAX_PRIORITY);
    // 设置线程的守护状态
    thread.setDaemon(false);
    // 返回新线程
    return thread;
    });
  7. RejectedExecutionHandler:拒绝策略。当线程池已经达到最大线程数并且任务队列也已满时,新提交的任务会被拒绝执行。这个参数用于定义在出现这种情况时应采取的策略。可传的值通常是预定义的策略,如:

    • ThreadPoolExecutor.AbortPolicy:拒绝并抛出异常。(默认为这种)
    • ThreadPoolExecutor.CallerRunsPolicy:使用当前调用的线程来执行此任务。
    • ThreadPoolExecutor.DiscardOldestPolicy:抛弃队列头部(最旧)的一个任务,并执行当前任务。
    • ThreadPoolExecutor.DiscardPolicy:忽略并抛弃当前任务。

Java线程池的使用

在了解了线程池的7个参数之后我么就可以创建一个线程池,使用它了。下面分别是上面提到的3中创建使用的方式。

juc包下的Executors 工厂类

使用工具类Executors生产一个线程池,最常用的线程池(三个静态方法)如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 特点:
// 1.维护了一个可自动扩容的线程容器,每当需要执行一个新任务时,有活跃的线程就使用该线程,否则就新建一条线程。
// 2.线程池的最大线程数为Integer.MAX_VALUE,基本上不受限制。
// 3.使用完毕后的线程会回归线程池,如果这个线程在60秒之后依旧空闲,那么就会被移除
// 4.线程池初始线程个数为0
// 适用于线程生命周期短且频繁创建的场景
ExecutorService newCachedThreadPool()

// 特点:
// 1.创建一个固定线程数量的线程池,在整个线程池的生命周期中,线程池中的数量不会变化
// 2.维护一个无界队列(暂存已提交的来不及执行的任务)按照任务的提交顺序,将任务执行完毕
// 这种类型的线程池适用于执行长时任务或需要限制并发数量的场景。
ExecutorService newFixedThreadPool(int nThreads)

// 特点:
// 1.线程数量固定为1个,在整个线程池的生命周期中,线程池中的数量不会变化
// 2.维护了一个无界队列(暂存已提交的来不及执行的任务)按照任务的提交顺序,将任务执行完毕
// 这种类型的线程池适用于需要保证任务顺序执行,不需要同时执行的场景
ExecutorService newSingleThreadExecutor()

其中无界队列指的是一个不确定长度的可自由扩容的队列,遵循”先进先出”的特点,理论上可以容量无限的任务。
使用以上方法就可以得到一个具有相应特点的线程池对象,具体情况使用哪个线程池要具体分析。
上面通过Executors创建的线程池,返回的是一个实现了ExecutorService接口的线程池,我们通常使用这个接口来承接返回的线程池,我们可以通过这个接口中的一系列方法来操作管理线程池,比如提交任务,关闭线程池等。
1
2
3
4
5
6
7
8
9
10
11
12
//提交一个Runnable类型的任务(实现了Runnable接口,并重写了run方法的任务),任务的执行没有返回值,可以不接返回值
Future<?> submit(Runnable task);

//提交一个Callable类型的任务(实现了Callable接口,并重写而来call方法的任务),任务的执行有返回值,返回值封装在一个Future<T>类型的值中,通过其中的get()方法获得
Future<T> submit(Callable<T> task);

// 执行完任务列表中所有已提交的任务,关闭线程池,不再接受新任务。
void shutdown()

// 立刻停止所有正在执行的活动任务,也不再处理任务列表中等待的任务,并直接返回等待执行的任务列表。
List<Runnable> shutdownNow()

new ThreadPoolExecutor实例

上面我们使用了工具类java.util.concurrent.Executors的静态方法来获取一个线程池对象,但从实际的开发中讲,这种做法是错误的。比如阿里巴巴Java开发规范手册中,就明确禁止用这种方式来新建线程池。
原因是什么呢?
主要有以下三个原因:

  • 无界队列。Executors.newFixedThreadPool固定容量线程池和Executors.newSingleThreadExecutor单个容量线程池都会创建一个无界的队列来支持任务提交。也就是说,只要不停的往线程池提交任务,这个任务队列就会不断变长。如果因一些意外问题导致任务处理速度完全跟不上提交速度,程序有堆溢出(OutOfMemoryError)的风险。
  • 无限的线程对象。Executors.newCachedThreadPool会创建一个线程数量可以无限制增长的线程池,实际上该线程池的线程容量最大值是Integer.MAX_VALUE。这显然也是很有风险的事情,可能浪费大量线程资源,影响程序的效率。
  • 配置不灵活问题:Executors工具类提供的方法创建的线程池,其线程数、队列策略等参数都是固定的,无法根据系统的实际需要进行调整。实际上,线程池的配置应该根据硬件条件,实际任务需求等灵活调整。

所以,对于生产环境而言,禁止直接使用Executors工具类提供的方法创建线程池。而推荐直接通过ThreadPoolExecutor来创建线程池实例,或者使用Spring框架封装好的线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//使用ThreadPoolExecutor手动创建线程池,并提交任务
ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(20,
50,
600L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(200),
r -> {
// 创建并返回自定义的线程对象
Thread thread = new Thread(r);
// 设置线程名称
thread.setName("Thread-" + r.hashCode());
// 设置线程优先级(最大值:10)
thread.setPriority(Thread.MAX_PRIORITY);
// 设置线程的守护状态
thread.setDaemon(false);
// 返回新线程
return thread;
},
new ThreadPoolExecutor.AbortPolicy());

//提交任务
//提交Runnable任务
threadPoolExecutor1.submit(() -> System.out.println("你好朋友!"));
//提交Callable任务
System.out.println(threadPoolExecutor1.submit(() -> "你好呀朋友!!").get());

SpringBoot配置使用线程池

Spring框架也基于ThreadPoolExecutor封装了一个线程池,在Spring框架的应用中的使用上更加简便。

  1. 使用SpringBoot默认自带的线程池
    • 在启动类上加上@EnableAsync
    • 在需要异步处理的方法上加上@Async注解即可开启该方法的异步处理。
  2. 直接注入ThreadPoolTaskExecutor组件,该组件是Spring框架在加载容器的时候自动创建并注册到容器中的。可以直接使用这个组件往Spring自带的线程池中提交任务。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @Service
    class ThreadPoolApplicationTestsServiceImpl implements ThreadPoolApplicationTestsService {
    //直接注入即可使用
    @Autowired
    ThreadPoolTaskExecutor threadPoolTaskExecutor;

    void contextLoads() throws InterruptedException {
    //往Spring自带的线程池中提交任务,获得返回结果,并打印
    System.out.println(threadPoolTaskExecutor.submit(() -> "你好呀小朋友!!");
    Thread.sleep(1000);
    //往Spring自带的线程池中提交任务,获得返回结果,并打印
    System.out.println(threadPoolTaskExecutor.submit(() -> "你好呀大朋友!!");
    }
    }
  3. 自己配置一个线程池。这里利用SpringBoot的自动装配的功能,配置一个自定义的线程池,并注册到容器中,供其他的组件注入使用。
    1. 在配置文件中配置好相应的配置:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      #  task.pool.corePoolSize=50
      # task.pool.maxPoolSize=100
      # task.pool.keepAliveSeconds=3000
      # task.pool.queueCapacity=100
      task:
      pool:
      core-size: 50
      max-size: 100
      keep-alive: 3000
      queue-capacity: 100
    2. 写一个读取配置文件中相应配置的参数配置类
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      package com.tzjw.marketweixin.config;

      import lombok.Data;
      import org.springframework.boot.context.properties.ConfigurationProperties;

      /**
      * 线程池参数类
      *
      * @author Xiaobin
      * @since 2023/09/21 09:55
      */
      @ConfigurationProperties(prefix = "task.pool")
      @Data
      public class ThreadPoolProperties {
      private int coreSize;

      private int maxSize;

      private int keepAlive;

      private int queueCapacity;
      }
    3. 写一个线程池配置类
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      package com.tzjw.marketweixin.config;

      import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.boot.context.properties.EnableConfigurationProperties;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.scheduling.annotation.AsyncConfigurer;
      import org.springframework.scheduling.annotation.EnableAsync;
      import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
      import java.util.concurrent.Executor;
      import java.util.concurrent.ThreadPoolExecutor;

      /**
      * 线程池配置类
      *
      * @author Xiaobin
      * @since 2023/09/21 09:59
      */
      @Configuration
      @EnableAsync
      @EnableConfigurationProperties(ThreadPoolProperties.class)
      public class TaskExecutePool{

      @Autowired
      private ThreadPoolProperties threadPoolProperties;

      /**
      * 修改默认线程池的配置,将自定义的线程池注册到容器中
      *
      * @return ThreadPoolTaskExecutor
      */
      @Bean
      public ThreadPoolTaskExecutor myThreadPoolTaskExecutor() {
      ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
      // 核心线程数
      executor.setCorePoolSize(threadPoolProperties.getCoreSize());
      // 最大线程数
      executor.setMaxPoolSize(threadPoolProperties.getMaxSize());
      // 非核心线程活跃时间
      executor.setKeepAliveSeconds(threadPoolProperties.getKeepAlive());
      // 队列容量
      executor.setQueueCapacity(threadPoolProperties.getQueueCapacity());
      // 设置线程的前缀名
      executor.setThreadNamePrefix("market");
      // 设置拒绝策略
      executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
      // 是否在任务执行完后关闭线程池
      executor.setWaitForTasksToCompleteOnShutdown(false);
      //初始化
      executor.initialize();
      return executor;
      }
      }
    4. 到这里我们已将自定义的线程池注册到容器中,在容器的组件中注入该线程池就可以往线程池中提交任务并行处理。为了方便往线程池中提交任务,我们还可以写一个工具类,注册到容器中,简化提交任务。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      package com.tzjw.marketweixin.util;

      import com.tzjw.marketweixin.config.TaskExecutePool;
      import lombok.Data;
      import lombok.SneakyThrows;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.core.task.TaskExecutor;
      import org.springframework.scheduling.annotation.Async;
      import org.springframework.scheduling.annotation.AsyncResult;
      import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
      import org.springframework.stereotype.Component;

      import java.util.concurrent.*;
      import java.util.concurrent.Callable;
      import java.util.concurrent.Future;

      /**
      * 使用线程池的工具类
      *
      * @author Xiaobin
      * @since 2023/09/21 10:25
      */
      @Component
      public class SpringThreadPoolUtil {

      @Autowired
      ThreadPoolTaskExecutor taskExecutor;

      /**
      * 往线程池中提交任务
      * @param task
      * @return java.util.concurrent.Future<T>
      * @author Xiaobin
      * @since 2023/09/21 16:46
      */
      public <T> Future<T> submitTask(Callable<T> task) {
      try {
      return taskExecutor.submit(task);
      } catch (Exception e) {
      throw new RuntimeException("Error submitting task to the thread pool.", e);
      }
      }
      }
    5. 通过上面的封装之后,我们后续只需要在项目中引入SpringThreadPoolUtil这个组件,调用其中的submitTask方法,传入一个Callable类型的任务就可以向线程池中提交任务,并获得返回值。具体的使用实例如下:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      @Service
      public class HomeIndexServiceImpl implements HomeIndexService {
      //注入工具类对象
      @Autowired
      SpringThreadPoolUtil springThreadPoolUtil;

      public Map<String, Object> getIndexData() {
      // 使用线程池提交数据库查询任务
      Future<List> adFuture = springThreadPoolUtil.submitTask(() -> {
      MarketAdExample marketAdExample = new MarketAdExample();
      marketAdExample.setOrderByClause("add_time desc");
      marketAdExample.createCriteria().andDeletedEqualTo(false);
      return marketAdMapper.selectByExample(new MarketAdExample());
      });
      //解析
      List<MarketAd> = adFuture.get();
      }
      }