SpringBoot+MyBatis流式查询,处理大规模数据,提高系统的性能和响应能力 | 您所在的位置:网站首页 › 大数据查询次数多久更新 › SpringBoot+MyBatis流式查询,处理大规模数据,提高系统的性能和响应能力 |
推荐大家关注一个公众号 推荐文章: 1、springBoot对接kafka,批量、并发、异步获取消息,并动态、批量插入库表; 2、SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据; 3、SpringBoot使用@Async实现多线程异步。 一、简介 在大量数据的时代,处理海量数据成为了许多应用的核心需求。而在Spring Boot与MyBatis的结合中,实现流式的查询可以有效地处理大规模的数据,并提高系统的性能和响应能力。若没有流式查询,想要从数据库取1000 万条数据而又没有足够的内存时,分页查询效率又取决于表设计,若设计不好,就无法执行高效的分页查询。因此流式查询是一个数据库访问框架必须具备的功能。 1.1、什么是mybatis的流式查询? 流式查询是一种分批次逐行读取数据的方式,是一个迭代器,可以通过遍历迭代器来取出结果集,而不是一次性将所有数据加载到内存中。通过流式查询,我们可以遍历迭代器来取出结果集,避免一次性取出大量的数据而占用太多的内存,提高系统的性能和可扩展性。 1.2、Cursor介绍 org.apache.ibatis.cursor.Cursor接口有三个抽象方法,分别是 1、isOpen() :判断cursor是否正处于打开状态; 2、isConsumed() :判断查询结果是否全部读取完; 3、getCurrentIndex() :查询已读取数据在全部数据里的索引位置; public interface Cursor extends Closeable, Iterable { //判断cursor是否正处于打开状态 //当返回true,则表示cursor已经开始从数据库里刷新数据了; boolean isOpen(); //判断查询结果是否全部读取完; //当返回true,则表示查询sql匹配的全部数据都消费完了; boolean isConsumed(); //查询已读取数据在全部数据里的索引位置; //第一条数据的索引位置为0;当返回索引位置为-1时,则表示已经没有数据可以读取; int getCurrentIndex();}二、实现流式查询的步骤 mybatis的流式查询,就是服务端程序查询数据的过程中,与远程数据库一直保持连接,不断地去数据库拉取数据,提交事务并关闭sqlsession后,数据库连接断开,停止数据拉取,需要注意的是使用这种方式,需要自己手动维护sqlsession和事务的提交。 2.1、mapper层接口 /** * 功能描述:采用流式查询limit条学生信息 * @MethodName: findStudentByScan * @MethodParam: [limit] * @Return: org.apache.ibatis.cursor.Cursor * @Author: yyalin * @CreateDate: 2022/3/9 13:48 */ @Select("select t.s_id AS studentId , t.student_name, t.age, t.phone, t.addr from student t limit #{limit}") @Options(fetchSize = 2) Cursor findStudentByScan(@Param("limit") long limit);2.2、service层实现接口 2.2.1、@Transactional注解方式 @Transactionalpublic List findStudentByScan(long limit) throws Exception { List students=new ArrayList(); log.info("入参limit:"+limit); //缺点:注意 Spring 框架当中注解使用的坑:只在外部调用时生效。 // 在当前类中调用这个方法,依旧会报错。 Cursor cursor2 =studentMapper.findStudentByScan(limit); //Cursor 实现了迭代器接口,因此在实际使用当中,从 Cursor 取数据非常简单 cursor2.forEach(stu -> { log.info("游标的当前索引CurrentIndex:"+cursor2.getCurrentIndex()); log.info("用于在取数据之前判断 Cursor 对象是否是打开状态。" + "只有当打开时 Cursor 才能取数据;:"+cursor2.isOpen()); //:true log.info("用于判断查询结果是否全部取完:"+cursor2.isConsumed()); //false log.info("输出的信息:"+ JSON.toJSONString(stu)); //可以对数据进行处理,主要用于处理大批量的数据 students.add(stu); }); log.info("返回多少条数据:"+(cursor2.getCurrentIndex()+1)); //返回多少条数据:需要加1才是返回总数 log.info("用于在取数据之前判断 Cursor 对象是否是打开状态。" + "只有当打开时 Cursor 才能取数据2;:"+cursor2.isOpen()); //:false log.info("用于判断查询结果是否全部取完2:"+cursor2.isConsumed()); //true if(cursor2.isConsumed()){ return students; } return students;}注意:Spring 框架当中注解使用的坑:只在外部调用时生效。在当前类中调用这个方法,依旧会报错。 日志输出: 2.2.2、SqlSessionFactory方式 //方案一:SqlSessionFactory方式 public List findStudentByScan01(long limit) throws Exception { List students=new ArrayList(); log.info("入参limit:"+limit); //方案一:SqlSessionFactory方式 // 创建sqlSessionFactory SqlSessionFactory sqlSessionFactory = ApplicationContextUtils.getBean("sqlSessionFactory"); //建立数据库连接,并保证最后可以关闭 SqlSession sqlSession = sqlSessionFactory.openSession(); //保证得到的 Cursor 对象是打开状态的 Cursor cursor = sqlSession.getMapper(StudentMapper.class) .findStudentByScan(limit); cursor.forEach(stu -> { //对获取到的数据进行处理 stu.getAge(); }); log.info("返回多少条数据:"+(cursor.getCurrentIndex()+1)); //返回多少条数据:需要加1才是返回总数 log.info("用于在取数据之前判断 Cursor 对象是否是打开状态。" + "只有当打开时 Cursor 才能取数据;:"+cursor.isOpen()); //:false log.info("用于判断查询结果是否全部取完:"+cursor.isConsumed()); //true if(cursor.isConsumed()){ // 关闭资源 sqlSession.close(); return students; } return students; }日志输出: 2.2.3、ApplicationContextUtils从容器中获取bean package com.wonders.common.utils;import org.springframework.beans.BeansException;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;import org.springframework.stereotype.Component; /** * @author yyalin * @desc:此类的用途:使用ApplicationContextUtils工具类 在spring容器中获取beans * @create 2022-1-19 * @Version: V1.0 */@Componentpublic class ApplicationContextUtils implements ApplicationContextAware { private static ApplicationContext applicationContext; /** * 功能描述:实现ApplicationContextAware接口的context注入函数, * 将其存入静态变量applicationContext中 * @MethodName: setApplicationContext * @MethodParam: [ctx] * @Author: yyalin * @CreateDate: 2022-1-19 13:54 */ @Override public void setApplicationContext(ApplicationContext ctx) throws BeansException { applicationContext = ctx; } /** * 功能描述:取得存储在静态变量中的ApplicationContext * @MethodName: getApplicationContext * @Return: org.springframework.context.ApplicationContext * @Author: yyalin * @CreateDate: 2022-1-19 16:38 */ public static ApplicationContext getApplicationContext() { return applicationContext; } /** * 功能描述:从静态变量ApplicationContext中取得Bean, 自动转型为所赋值对象的类型 * @MethodName: getBean * @MethodParam: [clazz] * @Author: yyalin * @CreateDate: 2022-1-19 17:13 */ public static T getBean(Class clazz) { return applicationContext.getBean(clazz); } /** * 功能描述:从静态变量ApplicationContext中取得Bean, 自动转型为所赋值对象的类型 * @MethodName: getBean * @MethodParam: [name] * @Author: yyalin * @CreateDate: 2022-1-19 16:45 */ @SuppressWarnings("unchecked") public static T getBean(String name) { return (T) applicationContext.getBean(name); } }三、流式查询的优势 减少内存消耗:流式查询不会一次性加载所有数据到内存中,而是逐行读取数据,减少了内存的占用,特别适用于大数据量的场景。 提高系统性能:流式查询可以避免一次性加载大量数据导致的性能问题,通过分批次处理数据,提高系统的响应能力。 支持大数据处理:对于需要处理大规模数据的应用,流式查询可以有效地处理海量数据,提供高效的数据处理能力。 四、注意事项 mybatis的流式查询的本意,是避免大量数据的查询而导致内存溢出,因此dao层查询返回的是一个迭代器(Cursor),可以每次从迭代器中取出一条查询结果,在实际业务开发过程中,即是根据实际的jvm内存大小,从迭代器中取出一定数量的数据后,再进行数据处理,待处理完之后,继续取出一定数据再处理,以此类推直到全部数据处理完,这样做的最大好处就是能够降低内存使用和垃圾回收器的负担,使数据处理的过程相对更加高效、可控,内存溢出的风险较小; 好处很明显,缺点也很就明显,处理的时间可能会变长,需要引入多线程异步操作,并且在迭代器遍历和数据处理的过程中,数据库连接不能断开,即当前sqlSession要保持持续打开状态,一量断开,数据读取就会中断,所以关于这块的处理,使用mybatis原生的sqlSession进行手动查询、提交事务、回滚和关闭sqlSession最为稳妥、最简单。 更多详细资料,请关注个人微信公众号或搜索“程序猿小杨”添加。 参考: https://mp.weixin.qq.com/s/bpcdaGR4rPOroMUnv7PtgA |
CopyRight 2018-2019 实验室设备网 版权所有 |