本文主要讲述通过MyBatis、JDBC等做大数据量数据插入的案例和结果。
验证的数据库表结构如下:
CREATE TABLE `t_user` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '用户id', `username` varchar(64) DEFAULT NULL COMMENT '用户名称', `age` int(4) DEFAULT NULL COMMENT '年龄', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='用户信息表';
话不多说,开整!
/** *用户实体
* * @Author zjq * @Date 2021/8/3 */ @Data public class User { private int id; private String username; private int age; }
public interface UserMapper {
    /**
     * 批量插入用户
     * @param userList
     */
    void batchInsertUser(@Param("list") List userList);
}
  
    
    
        insert into t_user(username,age) values
        
            (
            #{item.username},
            #{item.age}
            )
         
     
 
jdbc.driver=com.mysql.jdbc.Driver jdbc.url=jdbc:mysql://localhost:3306/test jdbc.username=root jdbc.password=root
MyBatis直接一次性批量插入30万条,代码如下:
    @Test
    public void testBatchInsertUser() throws IOException {
        InputStream resourceAsStream =
                Resources.getResourceAsStream("sqlMapConfig.xml");
        SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(resourceAsStream);
        SqlSession session = sqlSessionFactory.openSession();
        System.out.println("===== 开始插入数据 =====");
        long startTime = System.currentTimeMillis();
        try {
            List userList = new ArrayList<>();
            for (int i = 1; i <= 300000; i++) {
                User user = new User();
                user.setId(i);
                user.setUsername("共饮一杯无 " + i);
                user.setAge((int) (Math.random() * 100));
                userList.add(user);
            }
            session.insert("batchInsertUser", userList); // 最后插入剩余的数据
            session.commit();
            long spendTime = System.currentTimeMillis()-startTime;
            System.out.println("成功插入 30 万条数据,耗时:"+spendTime+"毫秒");
        } finally {
            session.close();
        }
    }
  
可以看到控制台输出:
Cause: com.mysql.jdbc.PacketTooBigException: Packet for query is too large (27759038 >yun 4194304). You can change this value on the server by setting the max_allowed_packet’ variable.
 yun 4194304). You can change this value on the server by setting the max_allowed_packet' variable" />
yun 4194304). You can change this value on the server by setting the max_allowed_packet' variable" />
超出最大数据包限制了,可以通过调整max_allowed_packet限制来提高可以传输的内容,不过由于30万条数据超出太多,这个不可取,梭哈看来是不行了 😅😅😅
既然梭哈不行那我们就一条一条循环着插入行不行呢
mapper接口和mapper文件中新增单个用户新增的内容如下:
    /**
     * 新增单个用户
     * @param user
     */
    void insertUser(User user);
 
    
    
        insert into t_user(username,age) values
            (
            #{username},
            #{age}
            )
     
 
调整执行代码如下:
    @Test
    public void testCirculateInsertUser() throws IOException {
        InputStream resourceAsStream =
                Resources.getResourceAsStream("sqlMapConfig.xml");
        SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(resourceAsStream);
        SqlSession session = sqlSessionFactory.openSession();
        System.out.println("===== 开始插入数据 =====");
        long startTime = System.currentTimeMillis();
        try {
            for (int i = 1; i <= 300000; i++) {
                User user = new User();
                user.setId(i);
                user.setUsername("共饮一杯无 " + i);
                user.setAge((int) (Math.random() * 100));
                // 一条一条新增
                session.insert("insertUser", user);
                session.commit();
            }
            long spendTime = System.currentTimeMillis()-startTime;
            System.out.println("成功插入 30 万条数据,耗时:"+spendTime+"毫秒");
        } finally {
            session.close();
        }
    }
 
执行后可以发现磁盘IO占比飙升,一直处于高位。

等啊等等啊等,好久还没执行完

先不管他了太慢了先搞其他的,等会再来看看结果吧。
two thousand year later …
控制台输出如下:

总共执行了14909367毫秒,换算出来是4小时八分钟。太慢了。。

👇👇👇还是优化下之前的批处理方案吧
先清理表数据,然后优化批处理执行插入:
-- 清空用户表 TRUNCATE table t_user;
以下是通过 MyBatis 实现 30 万条数据插入代码实现:
    /**
     * 分批次批量插入
     * @throws IOException
     */
    @Test
    public void testBatchInsertUser() throws IOException {
        InputStream resourceAsStream =
                Resources.getResourceAsStream("sqlMapConfig.xml");
        SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(resourceAsStream);
        SqlSession session = sqlSessionFactory.openSession();
        System.out.println("===== 开始插入数据 =====");
        long startTime = System.currentTimeMillis();
        int waitTime = 10;
        try {
            List userList = new ArrayList<>();
            for (int i = 1; i <= 300000; i++) {
                User user = new User();
                user.setId(i);
                user.setUsername("共饮一杯无 " + i);
                user.setAge((int) (Math.random() * 100));
                userList.add(user);
                if (i % 1000 == 0) {
                    session.insert("batchInsertUser", userList);
                    // 每 1000 条数据提交一次事务
                    session.commit();
                    userList.clear();
                    // 等待一段时间
                    Thread.sleep(waitTime * 1000);
                }
            }
            // 最后插入剩余的数据
            if(!CollectionUtils.isEmpty(userList)) {
                session.insert("batchInsertUser", userList);
                session.commit();
            }
            long spendTime = System.currentTimeMillis()-startTime;
            System.out.println("成功插入 30 万条数据,耗时:"+spendTime+"毫秒");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            session.close();
        }
    }
  
使用了 MyBatis 的批处理操作,将每 1000 条数据放在一个批次中插入,能够较为有效地提高插入速度。同时请注意在循环插入时要带有合适的等待时间和批处理大小,以防止出现内存占用过高等问题。此外,还需要在配置文件中设置合理的连接池和数据库的参数,以获得更好的性能。

在上面的示例中,我们每插入1000行数据就进行一次批处理提交,并等待10秒钟。这有助于控制内存占用,并确保插入操作平稳进行。

五十分钟执行完毕,时间主要用在了等待上。
如果低谷时期执行,CPU和磁盘性能又足够的情况下,直接批处理不等待执行:
    /**
     * 分批次批量插入
     * @throws IOException
     */
    @Test
    public void testBatchInsertUser() throws IOException {
        InputStream resourceAsStream =
                Resources.getResourceAsStream("sqlMapConfig.xml");
        SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(resourceAsStream);
        SqlSession session = sqlSessionFactory.openSession();
        System.out.println("===== 开始插入数据 =====");
        long startTime = System.currentTimeMillis();
        int waitTime = 10;
        try {
            List userList = new ArrayList<>();
            for (int i = 1; i <= 300000; i++) {
                User user = new User();
                user.setId(i);
                user.setUsername("共饮一杯无 " + i);
                user.setAge((int) (Math.random() * 100));
                userList.add(user);
                if (i % 1000 == 0) {
                    session.insert("batchInsertUser", userList);
                    // 每 1000 条数据提交一次事务
                    session.commit();
                    userList.clear();
                }
            }
            // 最后插入剩余的数据
            if(!CollectionUtils.isEmpty(userList)) {
                session.insert("batchInsertUser", userList);
                session.commit();
            }
            long spendTime = System.currentTimeMillis()-startTime;
            System.out.println("成功插入 30 万条数据,耗时:"+spendTime+"毫秒");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            session.close();
        }
    }
  
则24秒可以完成数据插入操作:


可以看到短时CPU和磁盘占用会飙高。
把批处理的量再调大一些调到5000,在执行:

13秒插入成功30万条,直接芜湖起飞🛫🛫🛫
JDBC循环插入的话跟上面的mybatis逐条插入类似,不再赘述。
以下是 Java 使用 JDBC 批处理实现 30 万条数据插入的示例代码。请注意,该代码仅提供思路,具体实现需根据实际情况进行修改。
    /**
     * JDBC分批次批量插入
     * @throws IOException
     */
    @Test
    public void testJDBCBatchInsertUser() throws IOException {
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        String databaseURL = "jdbc:mysql://localhost:3306/test";
        String user = "root";
        String password = "root";
        try {
            connection = DriverManager.getConnection(databaseURL, user, password);
            // 关闭自动提交事务,改为手动提交
            connection.setAutoCommit(false);
            System.out.println("===== 开始插入数据 =====");
            long startTime = System.currentTimeMillis();
            String sqlInsert = "INSERT INTO t_user ( username, age) VALUES ( ?, ?)";
            preparedStatement = connection.prepareStatement(sqlInsert);
            Random random = new Random();
            for (int i = 1; i <= 300000; i++) {
                preparedStatement.setString(1, "共饮一杯无 " + i);
                preparedStatement.setInt(2, random.nextInt(100));
                // 添加到批处理中
                preparedStatement.addBatch();
                if (i % 1000 == 0) {
                    // 每1000条数据提交一次
                    preparedStatement.executeBatch();
                    connection.commit();
                    System.out.println("成功插入第 "+ i+" 条数据");
                }
            }
            // 处理剩余的数据
            preparedStatement.executeBatch();
            connection.commit();
            long spendTime = System.currentTimeMillis()-startTime;
            System.out.println("成功插入 30 万条数据,耗时:"+spendTime+"毫秒");
        } catch (SQLException e) {
            System.out.println("Error: " + e.getMessage());
        } finally {
            if (preparedStatement != null) {
                try {
                    preparedStatement.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }
 


上述示例代码中,我们通过 JDBC 连接 MySQL 数据库,并执行批处理操作插入数据。具体实现步骤如下:
使用setAutoCommit(false) 来禁止自动提交事务,然后在每次批量插入之后手动提交事务。每次插入数据时都新建一个 PreparedStatement 对象以避免状态不一致问题。在插入数据的循环中,每 10000 条数据就执行一次 executeBatch() 插入数据。
另外,需要根据实际情况优化连接池和数据库的相关配置,以防止连接超时等问题。
实现高效的大量数据插入需要结合以下优化策略(建议综合使用):
本文内容到此结束了,
如有收获欢迎点赞👍收藏💖关注✔️,您的鼓励是我最大的动力。
如有错误❌疑问💬欢迎各位指出。
主页:共饮一杯无的博客汇总👨💻
保持热爱,奔赴下一场山海。🏃🏃🏃