【并发编程】SpringBoot创建线程池的六种方式
作者:mmseoamin日期:2024-01-18
1. 自定义线程池
1.1 示例代码
/**
 * 自定义线程池
 * 

* 优点:可以自定义参数 *

*/ @Test public void newThreadPoolExecutor() { ThreadPoolExecutor executor = new ThreadPoolExecutor( // 核心线程数 3, // 最大线程数 5, // 空闲线程最大存活时间 60L, // 空闲线程最大存活时间单位 TimeUnit.SECONDS, // 等待队列及大小 new ArrayBlockingQueue<>(100), // 创建新线程时使用的工厂 Executors.defaultThreadFactory(), // 当线程池达到最大时的处理策略 // new ThreadPoolExecutor.AbortPolicy() // 抛出RejectedExecutionHandler异常 new ThreadPoolExecutor.CallerRunsPolicy() // 交由调用者的线程执行 // new ThreadPoolExecutor.DiscardOldestPolicy() // 丢掉最早未处理的任务 // new ThreadPoolExecutor.DiscardPolicy() // 丢掉新提交的任务 ); // 总共5个任务 for (int i = 1; i <= 5; i++) { int taskIndex = i; executor.execute(() -> { log.info("线程 " + Thread.currentThread().getName() + " 正在执行任务 " + taskIndex); // 每个任务耗时1秒 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } }); } executor.shutdown(); }

  控制台打印:

20:09:50.032 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-1 正在执行任务 1
20:09:50.032 [pool-1-thread-2] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-2 正在执行任务 2
20:09:50.032 [pool-1-thread-3] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-3 正在执行任务 3
20:09:51.038 [pool-1-thread-2] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-2 正在执行任务 5
20:09:51.038 [pool-1-thread-3] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-3 正在执行任务 4
2. 固定长度线程池
2.1 示例代码
/**
 * 固定大小线程池
 * 

* 优点:当任务执行较快,且任务较少时使用方便 *

*

* 风险:当处理较慢时,等待队列的任务堆积会导致OOM *

*/ @Test public void newFixThreadPool() { // 3个固定线程 ExecutorService executorService = Executors.newFixedThreadPool(3); // 总共5个任务 for (int i = 1; i <= 5; i++) { int taskIndex = i; executorService.execute(() -> { log.info("线程 " + Thread.currentThread().getName() + " 正在执行任务 " + taskIndex); // 每个任务耗时1秒 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } }); } executorService.shutdown(); }

  控制台打印:

20:16:27.040 [pool-1-thread-2] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-2 正在执行任务 2
20:16:27.040 [pool-1-thread-3] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-3 正在执行任务 3
20:16:27.040 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-1 正在执行任务 1
20:16:28.048 [pool-1-thread-3] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-3 正在执行任务 4
20:16:28.048 [pool-1-thread-2] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-2 正在执行任务 5

  前3个任务被同时执行,因为刚好有3个核心线程。后2个任务会被存放到阻塞队列,当执行前3个任务的某个线程空闲时会从队列中获取任务并执行。

2.2 源码剖析
/**
 * Creates a thread pool that reuses a fixed number of threads
 * operating off a shared unbounded queue.  At any point, at most
 * {@code nThreads} threads will be active processing tasks.
 * If additional tasks are submitted when all threads are active,
 * they will wait in the queue until a thread is available.
 * If any thread terminates due to a failure during execution
 * prior to shutdown, a new one will take its place if needed to
 * execute subsequent tasks.  The threads in the pool will exist
 * until it is explicitly {@link ExecutorService#shutdown shutdown}.
 *
 * @param nThreads the number of threads in the pool
 * @return the newly created thread pool
 * @throws IllegalArgumentException if {@code nThreads <= 0}
 */
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue());
}

  该类型线程池的核心线程数和最大线程数为指定的参数,空闲线程的存活线程时间为0毫秒,等待队列使用LinkedBlockingQueue,初始化大小为Integer.MAX_VALUE(即:2147483647)。

  当任务执行较慢时,阻塞队列存有大量的任务等待,这些任务会占用大量的内存,从而可能导致OOM。

3. 单一线程池
3.1 示例代码
/**
 * 单一线程池
 * 

* 优势:保存任务按照提交的顺序执行 *

*

* 风险:当处理较慢时,等待队列的任务堆积会导致OOM *

*/ @Test public void newSingleThreadExecutor() { // 1个线程 ExecutorService executor = Executors.newSingleThreadExecutor(); // 总共5个任务 for (int i = 1; i <= 5; i++) { int taskIndex = i; executor.execute(() -> { log.info("线程 " + Thread.currentThread().getName() + " 正在执行任务 " + taskIndex); // 每个任务耗时1秒 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } }); } executor.shutdown(); }

  控制台打印:

20:31:04.970 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-1 正在执行任务 1
20:31:05.974 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-1 正在执行任务 2
20:31:06.974 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-1 正在执行任务 3
20:31:07.975 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-1 正在执行任务 4
20:31:08.976 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-1 正在执行任务 5

  所有任务按照提交的顺序执行。

3.2 源码剖析
/**
 * Creates an Executor that uses a single worker thread operating
 * off an unbounded queue. (Note however that if this single
 * thread terminates due to a failure during execution prior to
 * shutdown, a new one will take its place if needed to execute
 * subsequent tasks.)  Tasks are guaranteed to execute
 * sequentially, and no more than one task will be active at any
 * given time. Unlike the otherwise equivalent
 * {@code newFixedThreadPool(1)} the returned executor is
 * guaranteed not to be reconfigurable to use additional threads.
 *
 * @return the newly created single-threaded Executor
 */
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue()));
}

  该类型线程池的核心线程数和最大线程数都为1,空闲线程的存活线程时间为0毫秒,等待队列使用LinkedBlockingQueue,初始化大小为Integer.MAX_VALUE(即:2147483647)。

  当任务执行较慢时,阻塞队列存有大量的任务等待,这些任务会占用大量的内存,从而可能导致OOM。

4. 共享线程池
4.1 示例代码
/**
 * 共享线程池
 * 

* 优势:当在某一时间段内任务较多,且执行较快时方便使用 *

*

* 风险:当处理较慢时,会创建大量的线程 *

*/ @Test public void newCachedThreadPool() { ExecutorService executor = Executors.newCachedThreadPool(); // 总共5个任务 for (int i = 1; i <= 5; i++) { int taskIndex = i; executor.execute(() -> { log.info("线程 " + Thread.currentThread().getName() + " 正在执行任务 " + taskIndex); // 每个任务耗时1秒 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } }); } executor.shutdown(); }

  控制台打印:

20:45:31.351 [pool-1-thread-4] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-4 正在执行任务 4
20:45:31.351 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-1 正在执行任务 1
20:45:31.351 [pool-1-thread-5] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-5 正在执行任务 5
20:45:31.358 [pool-1-thread-2] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-2 正在执行任务 2
20:45:31.359 [pool-1-thread-3] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-3 正在执行任务 3

  每一个任务都创建了新的线程。

4.2 源码剖析
/**
 * Creates a thread pool that creates new threads as needed, but
 * will reuse previously constructed threads when they are
 * available.  These pools will typically improve the performance
 * of programs that execute many short-lived asynchronous tasks.
 * Calls to {@code execute} will reuse previously constructed
 * threads if available. If no existing thread is available, a new
 * thread will be created and added to the pool. Threads that have
 * not been used for sixty seconds are terminated and removed from
 * the cache. Thus, a pool that remains idle for long enough will
 * not consume any resources. Note that pools with similar
 * properties but different details (for example, timeout parameters)
 * may be created using {@link ThreadPoolExecutor} constructors.
 *
 * @return the newly created thread pool
 */
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue());
}

  该类型线程池的核心线程数为0,最大线程数为Integer.MAX_VALUE(即:2147483647),空闲线程最大存活时间为60秒,等待队列使用SynchronousQueue,该队列不存储数据,只做转发,具体可参考:【并发编程】Java 阻塞队列。

  当任务较多或执行较慢时,会创建大量的线程,从而导致OOM。

5. 定时线程池
5.1 示例代码
/**
 * 定时线程池
 * 

* 优点:可以定时执行某些任务 *

*

* 风险:当处理较慢时,等待队列的任务堆积会导致OOM *

*/ @Test public void newScheduledThreadPool() { // // 单一线程 // ExecutorService executor = Executors.newSingleThreadScheduledExecutor(); // 指定核心线程数 ScheduledExecutorService executor = Executors.newScheduledThreadPool(3); executor.schedule(() -> { log.info("3秒后开始执行,以后不再执行"); // 每个任务耗时1秒 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } }, 3, TimeUnit.SECONDS); // // executor.scheduleAtFixedRate(() -> { // log.info("3秒后开始执行,以后每2秒执行一次"); // // // 每个任务耗时1秒 // try { // TimeUnit.SECONDS.sleep(1); // } catch (InterruptedException e) { // e.printStackTrace(); // } // }, 3, 2, TimeUnit.SECONDS); // // executor.scheduleWithFixedDelay(() -> { // log.info("3秒后开始执行,以后延迟2秒执行一次"); // // // 每个任务耗时1秒 // try { // TimeUnit.SECONDS.sleep(1); // } catch (InterruptedException e) { // e.printStackTrace(); // } // }, 3, 2, TimeUnit.SECONDS); }

  控制台打印 - 1:

21:18:46.494 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 3秒后开始执行,以后不再执行

  启动后3秒开始执行,执行完成后不再继续执行。

  控制台打印 - 2:

21:22:47.078 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 3秒后开始执行,以后每2秒执行一次
21:22:49.075 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 3秒后开始执行,以后每2秒执行一次
21:22:51.075 [pool-1-thread-2] INFO com.c3stones.test.ThreadPoolTest - 3秒后开始执行,以后每2秒执行一次

  启动后3秒开始执行,以后每两秒执行一次。

  控制台打印 - 3:

21:28:09.701 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 3秒后开始执行,以后延迟2秒执行一次
21:28:12.705 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 3秒后开始执行,以后延迟2秒执行一次
21:28:15.707 [pool-1-thread-2] INFO com.c3stones.test.ThreadPoolTest - 3秒后开始执行,以后延迟2秒执行一次

  启动后3秒开始执行,以后每次执行时间为任务的耗时时间加固定的延迟时间。

  假设每次任务固定延迟2秒,第一次任务在第3秒开始执行,任务耗时1秒;第二次任务将在第一次完成后2秒开始执行(即第6秒),耗时2秒;第三次任务将在第二次完成后2秒开始执行(即第10秒),依次类推。

6. SpringBoot中注入异步线程池
6.1 自定义线程配置类
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
 * 自定义线程池配置类
 *
 * @author CL
 */
@Configuration
public class TaskExecutorConfig {
    /**
     * 自定义任务执行器
     *
     * @return {@link TaskExecutor}
     */
    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 核心线程数,默认1
        int corePoolSize = Runtime.getRuntime().availableProcessors();
        executor.setCorePoolSize(corePoolSize);
        // 最大线程数,默认Integer.MAX_VALUE
        executor.setMaxPoolSize(corePoolSize * 2 + 1);
        // 空闲线程最大存活时间,默认60秒
        executor.setKeepAliveSeconds(3);
        // 等待队列及大小,默认Integer.MAX_VALUE
        executor.setQueueCapacity(500);
        // 线程的名称前缀,默认该Bean名称简写:org.springframework.util.ClassUtils.getShortName(java.lang.Class)
        executor.setThreadNamePrefix("custom-thread-");
        // 当线程池达到最大时的处理策略,默认抛出RejectedExecutionHandler异常
//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());         // 抛出RejectedExecutionHandler异常
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());    // 交由调用者的线程执行
//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); // 丢掉最早未处理的任务
//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());       // 丢掉新提交的任务
        // 等待所有任务结束后再关闭线程池,默认false
        executor.setWaitForTasksToCompleteOnShutdown(true);
        // 等待所有任务结束最长等待时间,默认0毫秒
        executor.setAwaitTerminationSeconds(10);
        // 执行初始化
        executor.initialize();
        return executor;
    }
}
  • 在Service注入使用
    /**
     * 示例Service
     *
     * @author CL
     */
    public interface DemoService {
        /**
         * 示例方法
         *
         * @return {@link String}
         */
        void demo();
    }
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.core.task.TaskExecutor;
    import org.springframework.stereotype.Service;
    import javax.annotation.Resource;
    /**
     * 示例Service实现
     *
     * @author CL
     */
    @Slf4j
    @Service
    public class DemoServiceImpl implements DemoService {
        @Resource
        private TaskExecutor taskExecutor;
        /**
         * 示例方法
         */
        @Override
        public void demo() {
            taskExecutor.execute(() -> {
                log.info("线程 " + Thread.currentThread().getName() + " 正在执行Service中的方法");
            });
        }
    }
    
    • 异步任务指定线程池
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.scheduling.annotation.Async;
      import org.springframework.scheduling.annotation.EnableAsync;
      import org.springframework.stereotype.Component;
      /**
       * 示例异步任务
       *
       * @author CL
       */
      @Slf4j
      @Component
      @EnableAsync
      public class DemoAsync {
          /**
           * 示例方法
           */
          @Async(value = "taskExecutor")
          public void demo() {
              log.info("线程 " + Thread.currentThread().getName() + " 正在执行Async中的方法");
          }
      }
      
      • 定时任务调度指定线程池
        import org.springframework.context.annotation.Bean;
        import org.springframework.context.annotation.Configuration;
        import org.springframework.scheduling.TaskScheduler;
        import org.springframework.scheduling.annotation.SchedulingConfigurer;
        import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
        import org.springframework.scheduling.config.ScheduledTaskRegistrar;
        import java.util.concurrent.Executors;
        import java.util.concurrent.ScheduledExecutorService;
        import java.util.concurrent.ThreadPoolExecutor;
        /**
         * 自定义定时任务调度配置类
         *
         * @author CL
         */
        @Configuration
        public class SheduledConfig implements SchedulingConfigurer {
            /**
             * 配置定时任务
             *
             * @param scheduledTaskRegistrar 配置任务注册器
             */
            @Override
            public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
                scheduledTaskRegistrar.setScheduler(taskScheduler());
        //        // 第二种方式
        //        scheduledTaskRegistrar.setScheduler(scheduledExecutorService());
            }
            /**
             * 自定义任务调度器
             *
             * @return {@link TaskScheduler}
             */
            @Bean
            public TaskScheduler taskScheduler() {
                ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
                executor.setPoolSize(5);
                executor.setThreadNamePrefix("custom-scheduler-");
                executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
                executor.initialize();
                return executor;
            }
        //    /**
        //     * 自定义任务线程池
        //     *
        //     * @return {@link ScheduledExecutorService}
        //     */
        //    @Bean
        //    public ScheduledExecutorService scheduledExecutorService() {
        //        return Executors.newScheduledThreadPool(5);
        //    }
        }
        

        6.2 测试

        • 编写测试Controller
          import com.c3tones.async.DemoAsync;
          import com.c3tones.service.DemoService;
          import lombok.extern.slf4j.Slf4j;
          import org.springframework.web.bind.annotation.RequestMapping;
          import org.springframework.web.bind.annotation.RestController;
          import javax.annotation.Resource;
          /**
           * 示例Controller
           *
           * @author CL
           */
          @Slf4j
          @RestController
          public class DemoController {
              @Resource
              private DemoService demoService;
              @Resource
              private DemoAsync demoAsync;
              /**
               * Service示例方法
               *
               * @return {@link String}
               */
              @RequestMapping("/service")
              public void service() {
                  log.info("Service示例方法开始执行");
                  demoService.demo();
                  log.info("Service示例方法结束执行");
              }
              /**
               * 异步示例方法
               *
               * @return {@link String}
               */
              @RequestMapping("/async")
              public void async() {
                  log.info("异步示例方法开始执行");
                  demoAsync.demo();
                  log.info("异步示例方法结束执行");
              }
          }
          
          • 启动项目
          • 测试Service中的自定义线程池
            curl http://127.0.0.1:8080/service
            

              控制台打印:

            2023-03-19 22:26:26.896  INFO 136568 --- [nio-8080-exec-3] com.c3tones.controller.DemoController    : Service示例方法开始执行
            2023-03-19 22:26:26.897  INFO 136568 --- [nio-8080-exec-3] com.c3tones.controller.DemoController    : Service示例方法结束执行
            2023-03-19 22:26:26.897  INFO 136568 --- [custom-thread-1] com.c3tones.service.DemoServiceImpl      : 线程 custom-thread-1 正在执行Service中的方法
            

              调用接口同步打印日志,自定义线程异步执行任务。

            • 测试异步任务中的自定义线程池
              curl http://127.0.0.1:8080/async
              

                控制台打印:

              2023-03-19 22:28:08.349  INFO 136568 --- [nio-8080-exec-7] com.c3tones.controller.DemoController    : 异步示例方法开始执行
              2023-03-19 22:28:08.355  INFO 136568 --- [nio-8080-exec-7] com.c3tones.controller.DemoController    : 异步示例方法结束执行
              2023-03-19 22:28:08.363  INFO 136568 --- [custom-thread-2] com.c3tones.async.DemoAsync              : 线程 custom-thread-2 正在执行Async中的方法
              

                调用接口同步打印日志,异步线程异步执行任务。

              • 测试定时任务中的自定义线程池
                • 编写测试方法
                  import lombok.extern.slf4j.Slf4j;
                  import org.springframework.scheduling.annotation.EnableScheduling;
                  import org.springframework.scheduling.annotation.Scheduled;
                  import org.springframework.stereotype.Component;
                  /**
                   * 示例定时任务
                   *
                   * @author CL
                   */
                  @Slf4j
                  @Component
                  @EnableScheduling
                  public class DemoScheduled {
                      /**
                       * 示例方法
                       */
                      @Scheduled(cron = "0/3 * * * * ? ")
                      public void demo() {
                          log.info("线程 " + Thread.currentThread().getName() + " 正在执行Scheduled中的方法");
                      }
                  }