mysql JDBC的三种查询(普通、流式、游标)
作者:mmseoamin日期:2023-12-21

使用JDBC向mysql发送查询时,有三种方式:

  • 常规查询:JDBC驱动会阻塞的一次性读取全部查询的数据到 JVM 内存中,或者分页读取
  • 流式查询:每次执行rs.next时会判断数据是否需要从mysql服务器获取,如果需要触发读取一批数据(可能n行)加载到 JVM 内存进行业务处理
  • 游标查询:通过 fetchSize 参数,控制每次从mysql服务器一次读取多少行数据

    1、常规查询

    public static void normalQuery() throws SQLException {
        Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3307/test?useSSL=false", "root", "123456");
        PreparedStatement statement = connection.prepareStatement(sql);
        //statement.setFetchSize(100); //不起作用
        ResultSet resultSet = statement.executeQuery();
        
        while(resultSet.next()){
            System.out.println(resultSet.getString(2));
        }
        resultSet.close();
        statement.close();
        connection.close();
    }

    1)说明:

    1. 第四行设置featchSize不起作用。
    2. 第五行statement.executeQuery()执行查询会阻塞,因为需要等到所有数据返回并放到内存中;接下来每次执行resultSet.next()方法会从内存中获取数据。

    2)将jvm内存设置较小(-Xms16m -Xmx16m),对于大数据的查询会产生OOM:

    mysql JDBC的三种查询(普通、流式、游标),第1张

    为了避免OOM,通常我们会使用分页查询,或者下面的两种方式。

    2、流式查询

    public static void streamQuery() throws Exception { 
        Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3307/test?useSSL=false", "root", "123456");
        PreparedStatement statement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);    
        statement.setFetchSize(Integer.MIN_VALUE); 
        //或者通过 com.mysql.jdbc.StatementImpl
        ((StatementImpl) statement).enableStreamingResults();
        
        ResultSet rs = statement.executeQuery();
        while (rs.next()) {
            System.out.println(rs.getString(2));
        }
        rs.close();
        statement.close();
        connection.close();
    }

    2.1)流式查询的条件:

    随着大数据的到来,对于百万、千万的数据使用流式查询可以有效避免OOM。在执行statement.executeQuery()时不会从TCP响应流中读取完所有数据,当下面执行rs.next()时会按照需要从TCP响应流中读取部分数据。

    1. 创建Statement的时候需要制定ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY
    2. 设置fetchSize位Integer.MIN_VALUE

    或者通过com.mysql.jdbc.StatementImpl的enableStreamingResults()方法设置。二者是一致的。看mysql的jdbc(com.mysql.jdbc.StatementImpl)源码:

    mysql JDBC的三种查询(普通、流式、游标),第2张

    2.2)流式查询原理:

    1)基本概念

    我们要知道jdbc客户端和mysql服务器之间是通过TCP建立的通信,使用mysql协议进行传输数据。首先声明一个概念:在三次握手建立了TCP连接后,就可以在这个通道上进行通信了,直到关闭该连接。

    在 TCP 中发送端和接收端**可以是客户端/服务端,也可以是服务器/客户端**,通信的双方在任意时刻既可以是接收数据也可以是发送数据(全双工)。在通信中,收发双方都不保持记录的边界,所以需要按照一定的协议进行表示。在mysql中会按照mysql协议来进行交互。

    有了上面的概念,我们重新来定义这两种查询:

    在执行st.executeQuery()时,jdbc驱动会通过connection对象和mysql服务器建立TCP连接,同时在这个链接通道中发送sql命令,并接受返回。二者的区别是:

    1. 普通查询:也叫批量查询,jdbc客户端会阻塞的一次性从TCP通道中读取完mysql服务的返回数据;
    2. 流式查询:分批的从TCP通道中读取mysql服务返回的数据,每次读取的数据量并不是一行(通常是一个package大小),jdbc客户端在调用rs.next()方法时会根据需要从TCP流通道中读取部分数据。(并不是每次读区一行数据,网上说的几乎都是错的!)

    2)源码查看:

    从statement.executeQuery()方法跟进去,主要的调用连如下:

    protected ResultSetInternalMethods executeInternal(int maxRowsToRetrieve, Buffer sendPacket, boolean createStreamingResultSet, boolean queryIsSelectOnly,
                Field[] metadataFromCache, boolean isBatch) throws SQLException {
            synchronized (checkClosed().getConnectionMutex()) {
                MySQLConnection locallyScopedConnection = this.connection;
                rs = locallyScopedConnection.execSQL(this, null, maxRowsToRetrieve, sendPacket, this.resultSetType, this.resultSetConcurrency,
                                createStreamingResultSet, this.currentCatalog, metadataFromCache, isBatch);
                return rs;
            }
    public ResultSetInternalMethods execSQL(StatementImpl callingStatement, String sql, int maxRows, Buffer packet, int resultSetType, int resultSetConcurrency,
                boolean streamResults, String catalog, Field[] cachedMetadata, boolean isBatch) throws SQLException {
            synchronized (getConnectionMutex()) {
                return this.io.sqlQueryDirect(callingStatement, null, null, packet, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog,
                            cachedMetadata);
            }
    }
    final ResultSetInternalMethods sqlQueryDirect(StatementImpl callingStatement, String query, String characterEncoding, Buffer queryPacket, int maxRows,
                int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, Field[] cachedMetadata) throws Exception {
            Buffer resultPacket = sendCommand(MysqlDefs.QUERY, null, queryPacket, false, null, 0);
            ResultSetInternalMethods rs = readAllResults(callingStatement, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, resultPacket,
                        false, -1L, cachedMetadata);
            return rs;
    }
    ResultSetImpl readAllResults(StatementImpl callingStatement, int maxRows, int resultSetType, int resultSetConcurrency, boolean streamResults,
                String catalog, Buffer resultPacket, boolean isBinaryEncoded, long preSentColumnCount, Field[] metadataFromCache) throws SQLException {
            ResultSetImpl topLevelResultSet = readResultsForQueryOrUpdate(callingStatement, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog,
                    resultPacket, isBinaryEncoded, preSentColumnCount, metadataFromCache);
            return topLevelResultSet;
    }
    protected final ResultSetImpl readResultsForQueryOrUpdate(StatementImpl callingStatement, int maxRows, int resultSetType, int resultSetConcurrency,
                boolean streamResults, String catalog, Buffer resultPacket, boolean isBinaryEncoded, long preSentColumnCount, Field[] metadataFromCache) throws SQLException {
                com.mysql.jdbc.ResultSetImpl results = getResultSet(callingStatement, columnCount, maxRows, resultSetType, resultSetConcurrency, streamResults,
                        catalog, isBinaryEncoded, metadataFromCache);
                return results;
            }
    }
    protected ResultSetImpl getResultSet(StatementImpl callingStatement, long columnCount, int maxRows, int resultSetType, int resultSetConcurrency,
                boolean streamResults, String catalog, boolean isBinaryEncoded, Field[] metadataFromCache) throws SQLException {
            Buffer packet; // The packet from the server
            RowData rowData = null;
            if (!streamResults) {
                rowData = readSingleRowSet(columnCount, maxRows, resultSetConcurrency, isBinaryEncoded, (metadataFromCache == null) ? fields : metadataFromCache);
            } else {
                rowData = new RowDataDynamic(this, (int) columnCount, (metadataFromCache == null) ? fields : metadataFromCache, isBinaryEncoded);
                this.streamingData = rowData;
            }
            ResultSetImpl rs = buildResultSetWithRows(callingStatement, catalog, (metadataFromCache == null) ? fields : metadataFromCache, rowData, resultSetType,
                    resultSetConcurrency, isBinaryEncoded);
            return rs;
    }

    说明:

    1. sqlQueryDirect()方法中的sendCommand会通过io发送sql命令请求到mysql服务器,并获取返回流mysqlOutput
    2. getResultSet()方法会判断是否是流式查询还是批量查询。MySQL驱动会根据不同的参数设置选择对应的ResultSet实现类,分别对应三种查询方式:
    • RowDataStatic 静态结果集,默认的查询方式,普通查询
    • RowDataDynamic 动态结果集,流式查询
    • RowDataCursor 游标结果集,服务器端基于游标查询

      看上述代码(41行),对于批量查询:readSingleRowSet方法会循环掉用nextRow方法获取所有数据,然后放到jvm内存的rows中:

      mysql JDBC的三种查询(普通、流式、游标),第3张

      对于流式查询:直接创建RowDataDynamic对象返回。后面在掉用rs.next()获取数据时会根据需要从mysqlOutput流中读取数据。

      2.3)流式查询的坑:

      public static void streamQuery2() throws Exception { 
          Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3307/test?useSSL=false", "root", "123456");
          //statement1
          PreparedStatement statement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);    
          statement.setFetchSize(Integer.MIN_VALUE); 
          ResultSet rs = statement.executeQuery();
          if (rs.next()) {
              System.out.println(rs.getString(2));
          }
          //statement2
          PreparedStatement statement2 = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);    
          statement2.setFetchSize(Integer.MIN_VALUE); 
          ResultSet rs2 = statement2.executeQuery();
          if (rs2.next()) {
              System.out.println(rs2.getString(2));
          }
      //      rs.close();
      //      statement.close();
      //      connection.close();
      }

      执行结果:

      test1
      java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@45c8e616 is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.
      	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:869)
      	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:865)
      	at com.mysql.jdbc.MysqlIO.checkForOutstandingStreamingData(MysqlIO.java:3217)
      	at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2453)
      	at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2683)
      	at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2482)
      	at com.mysql.jdbc.StatementImpl.executeSimpleNonQuery(StatementImpl.java:1465)
      	at com.mysql.jdbc.StatementImpl.setupStreamingTimeout(StatementImpl.java:726)
      	at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1939)
      	at com.tencent.clue_disp_api.MysqlTest.streamQuery2(MysqlTest.java:79)
      	at com.tencent.clue_disp_api.MysqlTest.main(MysqlTest.java:25)

      MySQL Connector/J 5.1 Developer Guide中原文:

      There are some caveats with this approach. You must read all of the rows in the result set (or close it) before you can issue any other queries on the connection, or an exception will be thrown. 也就是说当通过流式查询获取一个ResultSet后,通过next迭代出所有元素之前或者调用close关闭它之前,不能使用同一个数据库连接去发起另外一个查询,否者抛出异常(第一次调用的正常,第二次的抛出异常)。

      2.4)抓包验证:

      mysql JDBC的三种查询(普通、流式、游标),第4张

      查看3307 > 62169的包可以发现,ack都是1324,证明都是针对当时sql请求的返回数据。

      mysql JDBC的三种查询(普通、流式、游标),第5张

      3、游标查询

      public static void cursorQuery() throws Exception {
          Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3307/test?useSSL=false&useCursorFetch=true", "root", "123456");
          ((JDBC4Connection) connection).setUseCursorFetch(true); //com.mysql.jdbc.JDBC4Connection
          Statement statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);    
          statement.setFetchSize(2);    
          ResultSet rs = statement.executeQuery(sql);    
          while (rs.next()) {
              System.out.println(rs.getString(2));
              Thread.sleep(5000);
          }
          
          rs.close();
          statement.close();
          connection.close();
      }

      1)说明:

      • 在连接参数中需要拼接useCursorFetch=true;
      • 创建Statement时需要设置ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY
      • 设置fetchSize控制每一次获取多少条数据

        2)抓包验证:

        通过wireshark抓包,可以看到每执行一次rs.next() 就会向mysql服务发送一个请求,同时mysql服务返回两条数据:

        mysql JDBC的三种查询(普通、流式、游标),第6张

        3)游标查询需要注意的点:

        由于MySQL方不知道客户端什么时候将数据消费完,而自身的对应表可能会有DML写入操作,此时MySQL需要建立一个临时空间来存放需要拿走的数据。因此对于当你启用useCursorFetch读取大表的时候会看到MySQL上的几个现象:

        1. IOPS飙升 (IOPS (Input/Output Per Second):磁盘每秒的读写次数)
        2. 磁盘空间飙升
        3. 客户端JDBC发起SQL后,长时间等待SQL响应数据,这段时间就是服务端在准备数据
        4. 在数据准备完成后,开始传输数据的阶段,网络响应开始飙升,IOPS由“读写”转变为“读取”。
        5. CPU和内存会有一定比例的上升