相关推荐recommended
SpringBoot高效批量插入百万数据
作者:mmseoamin日期:2024-04-27

SpringBoot高效批量插入百万数据

前言:我相信很多小伙伴和我一样在初学的时候,面对几万几十万数据插入,不知道如何下手,面对百万级别数据时更是不知所措,我们大部分初学者,很多人都喜欢for循环插入数据,或者是开启多个线程,然后分批使用for循环插入,当我们需要将大量数据存储到数据库中时,传统的逐条插入方式显然效率低下,并且容易导致性能瓶颈。而批量插入是一种更加高效的方式,可以大幅提高数据的插入速度,特别是在数据量较大的情况下。本文将介绍如何使用 Spring Boot 实现高效批量插入百万数据,以解决传统逐条插入方式存在的性能问题。我们将使用不同的插入方式来比较。

1.抛出问题

传统的单条插入存在什么问题:

  1. 性能问题:如果循环插入的数据量比较大,每次插入都需要与数据库建立连接、执行插入操作,这将导致频繁的网络通信和数据库操作,性能会受到影响。可以考虑批量插入数据来提高性能,例如使用 JDBC 的批处理功能或使用框架提供的批处理方法。
  2. 事务问题:默认情况下,Spring Boot 的事务管理是基于注解的,每次循环插入数据都会开启一个新的事务,这可能导致事务管理的开销过大。可以考虑将整个循环插入数据放在一个事务中,或者使用编程式事务管理来控制事务的粒度。
  3. 数据库连接问题:在循环过程中,如果每次都重新获取数据库连接,可能会导致连接资源的浪费和性能下降。可以考虑使用连接池技术来管理数据库连接,确保连接的复用和高效利用。
  4. 异常处理问题:在循环插入数据时,可能会出现插入失败、异常等情况。需要适当处理异常,例如记录错误日志、回滚事务等,以确保数据的完整性和一致性。

2.前期准备工作

框架:springboot+mybatis plus +mysql

  • 准备工作

    创建一个简单的springboot项目,pom依赖

     
                  
                      org.springframework.boot
                      spring-boot-starter-web
                  
                  
                      com.baomidou
                      mybatis-plus-boot-starter
                      3.4.2
                  
                  
                      mysql
                      mysql-connector-java
                      8.0.33
                  
          
                  
                      org.projectlombok
                      lombok
                      true
                  
                  
                      cn.hutool
                      hutool-all
                      5.8.15
                  
                  
                      org.springframework.boot
                      spring-boot-starter-test
                      test
                  
              
    
    • 创建测试需要使用的表
      CREATE TABLE `student` (
              `id` int NOT NULL AUTO_INCREMENT,
              `name` varchar(255) DEFAULT NULL,
              `age` int DEFAULT NULL,
              `addr` varchar(255) DEFAULT NULL,
              `addr_Num` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
              PRIMARY KEY (`id`)
            ) ENGINE=InnoDB AUTO_INCREMENT=8497107 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
      
      • 修改application.yml配置
         server:
                port: 8090
              spring:
                datasource:
                  url:  jdbc:mysql://localhost:3306/boot_study?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true
                  username: root
                  password: 123456
                  driver-class-name: com.mysql.cj.jdbc.Driver
        
        • 创建实体类StudentDO

          注意:在实际业务中,我们应该明确去定义controller service 层的数据模型,数据传输XxxDTO、数据表实体映射XxxDO、返回给前台数据实体XxxVO,这些模型数据都需要根据实际情况在Service实现类和Controller层转换

          这里为了演示方便就不按规范定义

          @TableName(value = "student")
                @Data
                public class StudentDO {
                    /**  主键  type:自增 */
                    @TableId(type = IdType.AUTO)
                    private Integer id;
                
                    /**  名字 */
                    private String name;
                
                    /**  年龄 */
                    private Integer age;
                
                    /**  地址 */
                    private String addr;
                
                    /**  地址号  @TableField:与表字段映射 */
                    @TableField(value = "addr_num")
                    private String addrNum;
                
                    public StudentDO(String name, int age, String addr, String addrNum) {
                        this.name = name;
                        this.age = age;
                        this.addr = addr;
                        this.addrNum = addrNum;
                    }
                }
          
          • controller定义
            @RestController
                  @RequestMapping("/student")
                  public class StudentController {
                      @Autowired
                      private StudentMapper studentMapper;
                      @Autowired
                      private StudentService studentService;
                      @Autowired
                      private SqlSessionFactory sqlSessionFactory;
                  
                      @Autowired
                      private ThreadPoolTaskExecutor taskExecutor;
                  
                      @Autowired
                      private PlatformTransactionManager transactionManager;
                      
                  
                  }
            
            • service和impl定义
                 public interface StudentService extends IService {
                    }
                    //实现定义
                    @Service
                    public class StudentServiceImpl extends ServiceImpl implements StudentService {
                    }
              
              • Mapper定义
                public interface StudentMapper extends BaseMapper {
                       @Insert("")
                          public int insertSplice(@Param("studentDOList") List studentDOList);
                      }
                

                3.测试示例演示

                模拟100万条数据不同方式插入

                1. for循环单条插入(不建议)

                  这里100万数据大概要20分钟以上,所以以10万条数据类推

                  10万条数据总耗时348.864秒

                      @GetMapping("/for")
                           public void insertForData () {
                               long start = System.currentTimeMillis();
                       
                               for (int i = 0; i < 1000000 ; i++) {
                                   StudentDO StudentDO = new StudentDO("张三"+i, i, "地址"+i, i+"号");
                                   studentMapper.insert(StudentDO);
                       
                               }
                               long end = System.currentTimeMillis();
                               System.out.println("插入数据耗费时间:"+(end-start));
                           }
                

                结果:实际上不知道等了多久很慢很慢,总体时间差不多半个多小时,因为这里的for循环进行单条插入时,每次都是在获取连接(Connection)、释放连接和资源关闭等操作上,(如果数据量大的情况下)极其消耗资源,导致时间长。

                2. xml拼接foreach sql插入(大量数据不建议)

                10万条数据插入数据耗费时间:3.554秒

                 @GetMapping("/sql")
                           public void insertSqlData () {
                               long start = System.currentTimeMillis();
                               ArrayList arrayList = new ArrayList<>();
                               for (int i = 0; i < 100000 ; i++) {
                                   StudentDO StudentDO = new StudentDO("张三"+i, i, "地址"+i, i+"号");
                                   arrayList.add(StudentDO);
                       
                               }
                               studentMapper.insertSplice(arrayList);
                               long end = System.currentTimeMillis();
                               System.out.println("插入数据耗费时间:"+(end-start));
                           }
                

                结果:我们在Mapper里面是要insert注解拼接,拼接结果就是将所有的数据集成在一条SQL语句的value值上,其由于提交到服务器上的insert语句少了,相就不需要每次获取连接(Connection)、释放连接和资源关闭,网络负载少了,插入的性能有了提高。但是在数据量大的情况下可能会出现内存溢出、解析SQL语句耗时等情况。

                3. mybatis-plus 批量插入saveBatch(推荐)

                10万条数据插入数据耗费时间:2.481秒,在数据量不大的情况下和上面差不多

                50万条数据插入数据耗费时间:12.473秒

                   @GetMapping("/batch")
                           public void insertSaveBatchData () {
                               long start = System.currentTimeMillis();
                               ArrayList arrayList = new ArrayList<>();
                               for (int i = 0; i < 100000 ; i++) {
                                   StudentDO StudentDO = new StudentDO("张三"+i, i, "地址"+i, i+"号");
                                   arrayList.add(StudentDO);
                       
                               }
                               studentService.saveBatch(arrayList);
                               long end = System.currentTimeMillis();
                               System.out.println("插入数据耗费时间:"+(end-start));
                           }
                

                结果:使用MyBatis-Plus实现IService接口中批处理saveBatch()方法,可以很明显的看到性能有了提升,我们可以查看一下源码,它的底层实现原理利用分片处理(batchSize = 1000) + 分批提交事务的操作,来提高插入性能,并没有在连接上消耗性能,MySQLJDBC驱动默认情况下忽略saveBatch()方法中的executeBatch()语句,将需要批量处理的一组SQL语句进行拆散,执行时一条一条给MySQL数据库,造成实际上是分片插入,即与单条插入方式相比,有提高,但是性能未能得到实质性的提高。

                1. 手动开启批处理模式+批量插入手动提交(推荐)

                  10万条数据插入数据耗费时间:2.481秒,

                  50万条数据插入数据耗费时间:13.436秒

                  和上面相比不管是大数据量还是小数据量两者都是差不多

                @GetMapping("/forSaveBatch")
                           public void insertforSaveBatchData () {
                               //创建批量插入SqlSession
                               SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false);
                               StudentMapper studentMapper = sqlSession.getMapper(StudentMapper.class);
                               long start = System.currentTimeMillis();
                               ArrayList arrayList = new ArrayList<>();
                               for (int i = 0; i < 500000 ; i++) {
                                   StudentDO StudentDO = new StudentDO("张三"+i, i, "地址"+i, i+"号");
                                   studentMapper.insert(StudentDO);
                       
                               }
                               sqlSession.commit();
                               sqlSession.close();
                               long end = System.currentTimeMillis();
                               System.out.println("插入数据耗费时间:"+(end-start));
                           }
                

                结果:手动开启批处理,手动处理关闭自动提交事务,共用同一个SqlSession之后,for循环单条插入的性能得到实质性的提高;由于同一个SqlSession省去对资源相关操作的耗能、减少对事务处理的时间等,从而极大程度上提高执行效率。

                5. ThreadPoolTaskExecutor分割多线程插入(大数据量强烈推荐)

                50万条数据插入数据耗费时间:3。536秒,插入速度直接是前面的4倍,是不是很疑惑这样就快了这么多?

                原理:多线程批量插入的过程,首先定义了一个线程池(ThreadPoolTaskExecutor),用于管理线程的生命周期和执行任务。然后,我们将要插入的数据列表按照指定的批次大小分割成多个子列表,并开启多个线程来执行插入操作,通过 TransactionManager 获取事务管理器,并使用 TransactionDefinition 定义事务属性。然后,在每个线程中,我们通过 transactionManager.getTransaction() 方法获取事务状态,并在插入操作中使用该状态来管理事务。

                在插入操作完成后,我们再根据操作结果调用transactionManager.commit()或 transactionManager.rollback() 方法来提交或回滚事务。在每个线程执行完毕后,都会调用 CountDownLatch 的 countDown() 方法,以便主线程等待所有线程都执行完毕后再返回。

                @GetMapping("/threadPoolInsert")
                           public void insertThreadPoolBatchData () {
                       
                       
                               ArrayList arrayList = new ArrayList<>();
                               for (int i = 0; i < 500000 ; i++) {
                                   StudentDO StudentDO = new StudentDO("张三"+i, i, "武汉"+i, i+"号");
                                   arrayList.add(StudentDO);
                       
                               }
                               int count = arrayList.size();
                               int pageSize = 10000;
                               int threadNum = count % pageSize == 0 ?  count / pageSize:  count / pageSize + 1;
                               CountDownLatch downLatch = new CountDownLatch(threadNum);
                               long start = System.currentTimeMillis();
                               for (int i = 0; i < threadNum; i++) {
                                   //开始序号
                                   int startIndex = i * pageSize;
                                   //结束序号
                                   int endIndex = Math.min(count, (i+1)*pageSize);
                                   //分割list
                                   List StudentDOs = arrayList.subList(startIndex, endIndex);
                                   taskExecutor.execute(() -> {
                                       DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
                                       TransactionStatus status = transactionManager.getTransaction(definition);
                                       try {
                       
                                           studentMapper.insertSplice(StudentDOs);
                                           transactionManager.commit(status);
                       
                                       }catch (Exception e){
                                           transactionManager.rollback(status);
                                           e.printStackTrace();
                       
                                       }finally {
                                           //执行完后 计数
                                           downLatch.countDown();
                                       }
                                   });
                       
                               }
                               try {
                                   //等待
                                   downLatch.await();
                               } catch (InterruptedException e) {
                                   throw new RuntimeException(e);
                               }
                               long end = System.currentTimeMillis();
                               System.out.println("插入数据耗费时间:"+(end-start));
                           }
                
                1. ThreadPoolTaskExecutor分割异步插入

                  除了上面多线程分割插入,我们也可以使用多线程异步插入其实和上面插入的原理是差不多,下面演示异步插入

                  • 修改application.yml增加配置

                    这个参数根据自己的电脑配置合理设置

                  async:
                           executor:
                             thread:
                               core_pool_size: 35
                               max_pool_size: 35
                               queue_capacity: 99999
                               name:
                                 prefix:  async-testDB-
                
                • 自定义ThreadPoolTaskExecutor配置
                  @EnableAsync
                           @Configuration
                           public class ExecutorConfig {
                               @Value("${async.executor.thread.core_pool_size}")
                               private int corePoolSize;
                               @Value("${async.executor.thread.max_pool_size}")
                               private int maxPoolSize;
                               @Value("${async.executor.thread.queue_capacity}")
                               private int queueCapacity;
                               @Value("${async.executor.thread.name.prefix}")
                               private String namePrefix;
                           
                               @Bean(name = "asyncServiceExecutor")
                               public Executor asyncServiceExecutor() {
                                   ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
                                   //设置核心线程数
                                   taskExecutor.setCorePoolSize(corePoolSize);
                                   //设置最大线程数
                                   taskExecutor.setMaxPoolSize(maxPoolSize);
                                   //设置队列容量
                                   taskExecutor.setQueueCapacity(queueCapacity);
                                   //设置线程名前缀
                                   taskExecutor.setThreadNamePrefix(namePrefix);
                                   //设置拒绝策略
                                   // rejection-policy:当pool已经达到max size的时候,如何处理新任务
                                   // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
                                   taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
                                   return taskExecutor;
                               }
                           
                           }
                  
                  • 定义异步service和实现类
                     //接口服务
                             public interface AsyncService {
                             
                                 void executeAsync(List studentDOList, CountDownLatch countDownLatch);
                             }
                             
                             //实现类
                             @Service
                             public class AsyncServiceImpl extends ServiceImpl implements AsyncService {
                                 @Autowired
                                 private StudentService studentService;
                                 @Async("asyncServiceExecutor")
                                 @Override
                                 public void executeAsync(List studentDOList, CountDownLatch countDownLatch) {
                                     try{
                                         //异步线程要做的事情
                                         studentService.saveBatch(studentDOList);
                                     }finally {
                                         countDownLatch.countDown();// 很关键, 无论上面程序是否异常必须执行countDown,否则await无法释放
                                     }
                                 }
                             }
                         
                    
                    • control代码

                      这里需要注意修改, 因为我们在ExecutorConfig配置类里面重新设置了ThreadPoolTaskExecutor

                      @Autowired

                      private ThreadPoolTaskExecutor taskExecutor;

                      改为

                      @Autowired

                      private Executor taskExecutor;

                      @GetMapping("/asyncInsertData")
                                   public void asyncInsertData() {
                                       List studentDOList = getTestData();
                                       //测试每100条数据插入开一个线程
                                       long start = System.currentTimeMillis();
                                       List> lists = ListUtil.split(studentDOList, 10000);
                                       CountDownLatch countDownLatch = new CountDownLatch(lists.size());
                                       for (List listSub:lists) {
                                           asyncService.executeAsync(listSub,countDownLatch);
                                       }
                                       try {
                                           countDownLatch.await(); //保证之前的所有的线程都执行完成,才会走下面的;
                                           // 这样就可以在下面拿到所有线程执行完的集合结果
                                       } catch (Exception e) {
                                           e.printStackTrace();
                                       }
                                       long end = System.currentTimeMillis();
                                       System.out.println("插入数据耗费时间:"+(end-start));
                                   }
                               
                               
                                   public List getTestData() {
                                       ArrayList arrayList = new ArrayList<>();
                                       for (int i = 0; i < 500000  ; i++) {
                                           StudentDO studentDO = new StudentDO("张三"+i, i, "武汉"+i, i+"号");
                                           arrayList.add(studentDO);
                               
                                       }
                                       return arrayList;
                                   }
                           
                      
                       50万条数据插入数据耗费时间:2.604秒,这里插入和上面差不多因为他们使用的都是多线程插入
                      

                      总结:经过上面的示例演示我们心里已经有谱了,知道什么时候该使用哪一种数据插入方式,针对对不同线程数的测试,发现不是线程数越多越好,具体多少合适,通常的算法:CPU核心数量*2 +2 个线程。

                      实际要根据每个人的电脑配置情况设置合适的线程数,可以根据下面这个公式获取:

                       int processNum = Runtime.getRuntime().availableProcessors(); // 返回可用处理器的Java虚拟机的数量
                          int corePoolSize = (int) (processNum / (1 - 0.2));
                          int maxPoolSize = (int) (processNum / (1 - 0.5));