当前因为工作需求,要发送大量Http请求,经过实践遍历发送需要6小时才能发送完毕,如果单线程发送请求会导致主线程阻塞。就会存在以下问题:
- 前端用户等待响应时间过长,无法进行下一步操作,不利于用户操作系统
- 响应时间过长超过Tomcat服务器会话时间,导致前后端的请求重新连接,这会出现抛出java.io.IOException: 你的主机中的软件中止了一个已建立的连接;重而终止了还未完成的Http发送任务
- 如果主线程其他任务如:定时Excel数据批量导入,文件上传等等;很容易因为文件格式问题,导致抛出异常,从而把Http的任务中断
- 夜长梦多,长时间发送请求,无法判断是否执行完毕;如果抛出异常,或是需要重启服务,无法判断执行到什么阶段,定位重发位置需要耗费很多时间精力,得不偿失,只能全部重发(一样面临以上问题)。
经过考虑可以使用多线程来解决问题,主线程负责开子线程处理请求。其中根据业务需要可以将多线程分为以下2类
Springboot中已经有集成的ThreadPoolTaskExecutor线程池可以供调用(注意区分Java自带的ThreadPoolExecutor),虽然2种线程池的提供的具体方法不一定一样,但是调度线程过程原理是通用,下面贴出ThreadPoolExecutor的调度线程过程作为创建ThreadPoolTaskExecutor的参考。
@EnableAsync 注解启用Spring 异步方法执行功能
当前的线程池 :
表示:
使用CallerRunsPolicy策略,主线程还没将 (请求创建的线程数 - 10 + 200 ) 的子线程创建完毕, 主线程处于阻塞状态,需要将所有子线程创建完毕才可以返回响应,即使任务是任务类型1,它的响应时间也会无限接近于 任务类型2;根据业务需求可以通过设置更长的队列长度来解决
线程处理完任务后,(最大线程池 - 核心线程池) > 0 && 经过60s空闲时间,就会回收除核心线程池外的空闲线程
@EnableAsync //注解启用Spring 异步方法执行功能 @Configuration public class AsyncTaskConfig { @Bean("AsyncTaskThreadPool") public Executor threadPoolExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 核心线程数:线程池创建时候初始化的线程数 executor.setCorePoolSize(10); // 最大线程数:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程 executor.setMaxPoolSize(20); // 缓冲队列:用来缓冲执行任务的队列 executor.setQueueCapacity(200); // 允许线程的空闲时间60秒:当超过了核心线程之外的线程在空闲时间到达之后会被销毁 executor.setKeepAliveSeconds(60); // 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池 executor.setThreadNamePrefix("AsyncTaskThreadPool-"); // 缓冲队列满了之后的拒绝策略:由调用线程处理(一般是主线程) //AbortPolicy:丢弃任务并抛出 RejectedExecutionException 异常 //DiscardPolicy:丢弃任务,但是不抛出异常。可能导致无法发现系统的异常状态 //DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务 //CallerRunsPolicy:不丢弃任务 由调用线程处理该任务 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭 executor.setAllowCoreThreadTimeOut(true); executor.initialize(); return executor; } }
@Async 根据添加到自定义线程池中执行
@Component public class AsyncRequestComponent { @Async("AsyncTaskThreadPool") public CompletableFuturemockHttpRequest(String url) throws InterruptedException, IOException { String threadName = Thread.currentThread().getName(); System.out.println("线程" + threadName + "开始调用,输出:" + url); String result = ""; // HttpClient RestTemplate Feign or 其他Http请求框架 // 或者其他需要 异步并发 的 逻辑代码 // Thread.sleep(50); result = HttpClientUtil.get(url); System.out.println("线程" + threadName + "调用结束,输出:" + url); return CompletableFuture.completedFuture(result); } }
@Slf4j @SpringBootTest class DemoTestApplicationTests { @Autowired private AsyncRequestComponent asyncRequestComponent; @Test void testAsyncThreadPerformance() throws InterruptedException, ExecutionException { long start = System.currentTimeMillis(); List> list = new ArrayList<>(); // 通过修改循环次数来查看 任务数<缓存队列+核心线程 和 任务数>缓存队列+核心线程 的主线程花费时间 // 如果 任务数<缓存队列+核心线程 主线程不需要等待 // 如果 任务数>缓存队列+核心线程 主线程需要等待 因为后面任务还没创建成功 for(int i=0 ;i < 2000; i++){ CompletableFuture future= asyncRequestComponent.mockHttpRequest(i); list.add(future); } // 任务类型1 主线程不需要等待所有子线程执行完毕,将下面的for循环注释 // 任务类型2 主线程需要等待所有子线程执行完毕:需要遍历future执行get方法等待子线程执行完毕 for(Future> future : list){ while (true) {//CPU高速轮询:每个future都并发轮循,判断完成状态然后获取结果,这一行,是本实现方案的精髓所在。即有10个future在高速轮询,完成一个future的获取结果,就关闭一个轮询 if (future.isDone() && !future.isCancelled()) { //获取future成功完成状态,如果想要限制每个任务的超时时间,取消本行的状态判断+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超时异常使用即可。 String result = (String) future.get();//获取结果 System.out.println("任务i=" + result + "获取完成!" + new Date()); break;//当前future获取结果完毕,跳出while } else { Thread.sleep(1);//每次轮询休息1毫秒(CPU纳秒级),避免CPU高速轮循耗空CPU---》新手别忘记这个 } } } long end = System.currentTimeMillis(); System.out.println("主线程花费时间:"+(end-start)); } }
CPU密集型:对于 CPU 密集型计算,多线程本质上是提升多核 CPU 的利用率,所以对于一个 8 核的 CPU,每个核一个线程,理论上创建 8 个线程就就可以;理论公式:最大线程数 = CPU核数 + 1
IO 密集型任务: IO 密集型任务最大线程数一般会大于 CPU 核心数很多倍,因为 IO 读写速度相比于 CPU 的速度而言是比较慢的,如果我们设置过少的线程数,就可能导致 CPU 资源的浪费。理论公式 : 最大线程数 = 最大线程数 = CPU核心数 / (1 - IO_TIME/REQUEST_TIME) ,比如在某个请求中,请求时长为10秒,调用IO时间为8秒,这时我们阻塞占百分比就是80%,有效利用CPU占比就是20%,假设是八核CPU,我们线程数就是8 / (1 - 80%) = 8 / 0.2 = 40个
最大线程数:
核心线程数: 核心线程数 = 最大线程数 * 20%
获取CPU核心数命令:
window - Java : Runtime.getRuntime().availableProcessors()//获取逻辑核心数,如6核心12线程,那么返回的是12
linux - 物理CPU: cat /proc/cpuinfo| grep “physical id”| sort| uniq| wc -l
linux - 物理CPU中core: cat /proc/cpuinfo| grep “cpu cores”| uniq
linux - 逻辑CPU: cat /proc/cpuinfo| grep “processor”| wc -l
偷懒大法:
参考资料:
https://blog.csdn.net/weixin_49258262/article/details/125463819?spm=1001.2014.3001.5506
https://blog.csdn.net/Wei_Naijia/article/details/127028398?spm=1001.2014.3001.5506
https://blog.csdn.net/wozyb1995/article/details/125044992?spm=1001.2014.3001.5506
https://blog.csdn.net/iampatrick_star/article/details/124490586?spm=1001.2014.3001.5506
https://www.cnblogs.com/651434092qq/p/14240406.html
https://juejin.cn/post/6948034657321484318
https://juejin.cn/post/7072281409053786120