java使用jdbcTemplate查询并插入百万级数据解决方案 | 您所在的位置:网站首页 › java批量更新百万数据 › java使用jdbcTemplate查询并插入百万级数据解决方案 |
背景:使用JdbcTemplate查询500万数据,然后插入到数据库。 这么多的数据按照普通的方式直接查询然后插入,服务器肯定会挂掉,我尝试过使用分页查询的方式去进行分批查询插入,虽然也能达到保证服务器不挂掉的效果,但是有一个严重的问题,每次查询的数据很难保证顺序性,第一次一查询的数据可能又出现在第N次的查询结果中,虽然可以通过在查询sql中加上排序,可以保证多次查询的顺序不变,但是这种分页查询方式还是不够严谨,因为在多次查询过程中,可能数据有新增或删除,即使保证了排序唯一性,也会导致数据少取或取重复问题。 这个过程中需要解决的问题: 一、内存溢出 使用jdbcTemplate.queryForList查询一次读取500万条数据,会占用大量内存,一般的服务器都会内存溢出报错,jdbcTemplate默认使用RowMapperResultSetExtractor来处理ResultSet结果集,会将数据全部读取到内存: 因此我们需要自己写一个实现类继承ResultSetExtractor,去实现读取ResultSet的逻辑。 一、批量插入速度慢 我们使用jdbcTemplate的batchUpdate方法批量保存数据时,要想真正进行批量保存需要几个条件 1.首先要数据库本身要支持批量更新,一般主流数据库都会支持。 2.插入的sql语句不要使用子查询 插入语句只使用insert into table() values()这种,不要在values中使用select语句 3.数据源连接设置rewriteBatchedStatements=true这个参数 在oracle驱动中rewriteBatchedStatements参数默认是开启的,mysql没有开启,需要在数据源url连接中手动设置: 自定义ResultSetExtractor如下: package com.zhou.db.model; import com.zhou.db.util.SqlUtil; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.ResultSetExtractor; import org.springframework.jdbc.support.JdbcUtils; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; /** * 查询数据自定义处理ResultSet * @author lang.zhou * @since 2023/1/9 17:42 */ @Slf4j public abstract class DataMapCallBackExtractor implements ResultSetExtractor { /** * 每次读取10000条时开始插入 */ @Getter private int batchQuerySize = 1000; @Getter private List columnList = new ArrayList(0); /** * 数据条数 */ @Getter private int dataCount = 0; public DataMapCallBackExtractor() { } public DataMapCallBackExtractor(int batchQuerySize) { if(batchQuerySize > 1000){ this.batchQuerySize = batchQuerySize; } } @Override public List extractData(ResultSet rs) throws SQLException, DataAccessException { ResultSetMetaData resultSetMetaData = rs.getMetaData(); //结果集列数 int count = resultSetMetaData.getColumnCount(); //已经执行回调的次数 int times = 0; //读取列信息 for (int i = 1; i < count + 1; i++) { columnList.add(SqlUtil.readResultColumn(resultSetMetaData,i)); } //读取列信息后回调 this.columnInfoCallback(columnList); List list = new ArrayList(); while(rs.next()){ //总条数增加 dataCount ++; Map e = new LinkedHashMap(count); //读取这一行的数据 for (int i = 1; i < count + 1; i++) { e.putIfAbsent(JdbcUtils.lookupColumnName(resultSetMetaData, i), JdbcUtils.getResultSetValue(rs, i)); } list.add(e); //读取满10000条时开始插入数据 if(list.size() >= batchQuerySize){ times ++; this.dataCallback(list,times,dataCount); //处理完成清空已读取的数据,释放内存 list.clear(); } } //可能最后一次读取不满10000条,插入剩余的数据 if(list.size() > 0){ times ++; this.dataCallback(list,times,dataCount); list.clear(); } return new ArrayList(0); } /** * 读取batchQuerySize条数据后自定义处理回调 */ public abstract void dataCallback(List list, int times, int n); /** * 读取列信息后回调 */ public void columnInfoCallback(List columnList){ } }我们拿到ResultSet后,每次只读取10000条数据存到List中,然后将这些数据插入数据库,在插入结束之后清空这个List,jvm会回收这些数据释放内存,一直重复这个过程直到结果集读取完毕,这样能保证内存中只流程10000条数据,就避免了内存泄漏的情况产生。 分批插入代码,提升插入速度: /** * 数据分批插入 */ public void batchSizeUpdate(List list, String sql,NamedParameterJdbcTemplate namedParameterJdbcTemplate, int batchSize){ int size = list.size(); int n = size / batchSize; int l = size % batchSize; if(l > 0){ n++; } log.info("总共分"+n+"次插入"); for (int i = 0; i < n; i++) { int start = i*batchSize; int end = (i+1)*batchSize; if(end > size){ end = size; } batchUpdate(list.subList(start,end),sql, namedParameterJdbcTemplate); log.info("第"+(i+1)+"次插入完毕"); } } private void batchUpdate(List list, String sql,NamedParameterJdbcTemplate namedParameterJdbcTemplate){ Map [] param = new Map[list.size()]; for(int c= 0;c 0; } public boolean isDate(){ return name != null && ("DATE".equalsIgnoreCase(dataType) || "TIMESTAMP".equalsIgnoreCase(dataType) || "DATETIME".equalsIgnoreCase(dataType)); } public boolean isNumber(){ return name != null && ("NUMBER".equalsIgnoreCase(dataType) || "DECIMAL".equalsIgnoreCase(dataType) || "INTEGER".equalsIgnoreCase(dataType) || "INT".equalsIgnoreCase(dataType)|| "BIGINT".equalsIgnoreCase(dataType)|| "DOUBLE".equalsIgnoreCase(dataType)|| "LONG".equalsIgnoreCase(dataType))); } public boolean isChar(){ return name != null && "CHAR".equalsIgnoreCase(dataType); } public boolean allowNull(){ return !isPk() && Objects.equals(nullable,"Y"); } } |
CopyRight 2018-2019 实验室设备网 版权所有 |