java使用jdbcTemplate查询并插入百万级数据解决方案 您所在的位置:网站首页 java批量更新百万数据 java使用jdbcTemplate查询并插入百万级数据解决方案

java使用jdbcTemplate查询并插入百万级数据解决方案

2024-07-09 20:16| 来源: 网络整理| 查看: 265

背景:使用JdbcTemplate查询500万数据,然后插入到数据库。

这么多的数据按照普通的方式直接查询然后插入,服务器肯定会挂掉,我尝试过使用分页查询的方式去进行分批查询插入,虽然也能达到保证服务器不挂掉的效果,但是有一个严重的问题,每次查询的数据很难保证顺序性,第一次一查询的数据可能又出现在第N次的查询结果中,虽然可以通过在查询sql中加上排序,可以保证多次查询的顺序不变,但是这种分页查询方式还是不够严谨,因为在多次查询过程中,可能数据有新增或删除,即使保证了排序唯一性,也会导致数据少取或取重复问题。

这个过程中需要解决的问题:

一、内存溢出

使用jdbcTemplate.queryForList查询一次读取500万条数据,会占用大量内存,一般的服务器都会内存溢出报错,jdbcTemplate默认使用RowMapperResultSetExtractor来处理ResultSet结果集,会将数据全部读取到内存:

因此我们需要自己写一个实现类继承ResultSetExtractor,去实现读取ResultSet的逻辑。

一、批量插入速度慢

我们使用jdbcTemplate的batchUpdate方法批量保存数据时,要想真正进行批量保存需要几个条件

1.首先要数据库本身要支持批量更新,一般主流数据库都会支持。

2.插入的sql语句不要使用子查询

插入语句只使用insert into table() values()这种,不要在values中使用select语句

3.数据源连接设置rewriteBatchedStatements=true这个参数

在oracle驱动中rewriteBatchedStatements参数默认是开启的,mysql没有开启,需要在数据源url连接中手动设置:

 自定义ResultSetExtractor如下:

package com.zhou.db.model; import com.zhou.db.util.SqlUtil; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.ResultSetExtractor; import org.springframework.jdbc.support.JdbcUtils; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; /** * 查询数据自定义处理ResultSet * @author lang.zhou * @since 2023/1/9 17:42 */ @Slf4j public abstract class DataMapCallBackExtractor implements ResultSetExtractor { /** * 每次读取10000条时开始插入 */ @Getter private int batchQuerySize = 1000; @Getter private List columnList = new ArrayList(0); /** * 数据条数 */ @Getter private int dataCount = 0; public DataMapCallBackExtractor() { } public DataMapCallBackExtractor(int batchQuerySize) { if(batchQuerySize > 1000){ this.batchQuerySize = batchQuerySize; } } @Override public List extractData(ResultSet rs) throws SQLException, DataAccessException { ResultSetMetaData resultSetMetaData = rs.getMetaData(); //结果集列数 int count = resultSetMetaData.getColumnCount(); //已经执行回调的次数 int times = 0; //读取列信息 for (int i = 1; i < count + 1; i++) { columnList.add(SqlUtil.readResultColumn(resultSetMetaData,i)); } //读取列信息后回调 this.columnInfoCallback(columnList); List list = new ArrayList(); while(rs.next()){ //总条数增加 dataCount ++; Map e = new LinkedHashMap(count); //读取这一行的数据 for (int i = 1; i < count + 1; i++) { e.putIfAbsent(JdbcUtils.lookupColumnName(resultSetMetaData, i), JdbcUtils.getResultSetValue(rs, i)); } list.add(e); //读取满10000条时开始插入数据 if(list.size() >= batchQuerySize){ times ++; this.dataCallback(list,times,dataCount); //处理完成清空已读取的数据,释放内存 list.clear(); } } //可能最后一次读取不满10000条,插入剩余的数据 if(list.size() > 0){ times ++; this.dataCallback(list,times,dataCount); list.clear(); } return new ArrayList(0); } /** * 读取batchQuerySize条数据后自定义处理回调 */ public abstract void dataCallback(List list, int times, int n); /** * 读取列信息后回调 */ public void columnInfoCallback(List columnList){ } }

 我们拿到ResultSet后,每次只读取10000条数据存到List中,然后将这些数据插入数据库,在插入结束之后清空这个List,jvm会回收这些数据释放内存,一直重复这个过程直到结果集读取完毕,这样能保证内存中只流程10000条数据,就避免了内存泄漏的情况产生。

分批插入代码,提升插入速度:

/** * 数据分批插入 */ public void batchSizeUpdate(List list, String sql,NamedParameterJdbcTemplate namedParameterJdbcTemplate, int batchSize){ int size = list.size(); int n = size / batchSize; int l = size % batchSize; if(l > 0){ n++; } log.info("总共分"+n+"次插入"); for (int i = 0; i < n; i++) { int start = i*batchSize; int end = (i+1)*batchSize; if(end > size){ end = size; } batchUpdate(list.subList(start,end),sql, namedParameterJdbcTemplate); log.info("第"+(i+1)+"次插入完毕"); } } private void batchUpdate(List list, String sql,NamedParameterJdbcTemplate namedParameterJdbcTemplate){ Map [] param = new Map[list.size()]; for(int c= 0;c 0; } public boolean isDate(){ return name != null && ("DATE".equalsIgnoreCase(dataType) || "TIMESTAMP".equalsIgnoreCase(dataType) || "DATETIME".equalsIgnoreCase(dataType)); } public boolean isNumber(){ return name != null && ("NUMBER".equalsIgnoreCase(dataType) || "DECIMAL".equalsIgnoreCase(dataType) || "INTEGER".equalsIgnoreCase(dataType) || "INT".equalsIgnoreCase(dataType)|| "BIGINT".equalsIgnoreCase(dataType)|| "DOUBLE".equalsIgnoreCase(dataType)|| "LONG".equalsIgnoreCase(dataType))); } public boolean isChar(){ return name != null && "CHAR".equalsIgnoreCase(dataType); } public boolean allowNull(){ return !isPk() && Objects.equals(nullable,"Y"); } }



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有