多线程读写大量数据到excel 您所在的位置:网站首页 并发写入文件 多线程读写大量数据到excel

多线程读写大量数据到excel

2024-06-27 04:42| 来源: 网络整理| 查看: 265

  ↵起因是这样的,用户要下载对账单明细,也就是交易明细,我们公司的数据库的设计是,一天一张表,类似于trace_20190708,trace_20190709  .......  这样的类型,所以设计生产者的时候是,一个线程负责某天的表,线程数不要超过查询的天数总和(例如读取20190720-20190730 这十天的数据,那么线程数不能超过10个,也不能过多,看自己系统内存情况),否则,会有线程被一直阻塞。

注 :其实也可以多线程,读同一张表,这里提供下思路,一般是分页读取,我们可以a线程读取(select * from user limit 0,10  下面就直接用limit x,y代表了)limit 2n,100   ,B线程读取limit 2n+1,100 这样写设计sql读取,这样设计的目的是,不用写锁,若是多个线程用同一个 limit n,100,这样设计,那么会用并发现象,那么就要加锁,比如我在读limit 10,20  ,那么你要读limt30,20 你要等我读完,但是我多线程就是为了读取速度快,并发执行,加锁等待了,多线程的意义何在,所以在sql 上做些文章就好。

测试类

package com.example.demo.test; /** * @Author: myc * @Description: * @Date 下午 4:51 2019/6/26 0026 */ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 测试POI * * @author alex */ public class PoiTest { public static void main(String[] args) { /** * 使用线程池进行线程管理。 */ ExecutorService es = Executors.newFixedThreadPool(20); /** * 使用计数栅栏 */ CountDownLatch doneSignal = new CountDownLatch(1); String xlsFile = "d:/xlsx/poiSXXFSBigData" + "2019010" + ".xlsx"; PoiExcel.multiThreadWrite(es, doneSignal, xlsFile); es.shutdown(); } }

读写excel ,这里采用了生产者-消费者模式,将读数据库的数据,与写入excel,两个步骤分离,采用了blockqueue。考虑到一个excel的工作簿数据的容量大概是100W行,所以将其分多个工作簿,处理,这样就可以读写更多数据。但是有个问题是,多线程下,读写的并发问题,在多个消费者的情况下,会有些问题,所以建议开一个消费者就好。

sheet.setRandomAccessWindowSize(windowSize);注意这个设置,这个设置是将数据读到内存中windowSize个,超过这个数后就写入到磁盘零时文件中。若是设置为 -1,那么数据会全部读取到内存中,这样数据过多内存会被撑爆 package com.example.demo.test; import org.apache.poi.ss.usermodel.Cell; import org.apache.poi.ss.usermodel.Row; import org.apache.poi.ss.util.SheetUtil; import org.apache.poi.xssf.streaming.SXSSFSheet; import org.apache.poi.xssf.streaming.SXSSFWorkbook; import java.io.FileOutputStream; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Calendar; import java.util.Date; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * @Author: myc * @Description: 用Poi读写大量数据 * @Date 下午 2:12 2019/6/28 0028 */ public class PoiExcel { private static SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd"); protected static volatile SXSSFWorkbook wb = new SXSSFWorkbook(100); /** * 总行号 */ protected static AtomicInteger rowNo = new AtomicInteger(0); /** * 页行号 */ protected static AtomicInteger pageRowNo = new AtomicInteger(0); protected static AtomicInteger count = new AtomicInteger(0); public static BlockingQueue queue = new ArrayBlockingQueue(5); public static BlockingQueue sqlQueue = new LinkedBlockingQueue(); /** * excel 每个sheet大小 */ protected static final int MAX_SHEET_PAGE_SIZE = 200000; /** * 查询总记录数 */ protected static int TOTAL_COUNT; /** * 判断CountDownLatch,当所有线程运行结束,就写入到文件流中 */ protected static AtomicInteger DONESIGNAL_COUNT; /** * 表名数组 */ protected static List tableNameArr = new ArrayList(); /** * 读取数据库中存储的数据行数 */ public static int PAGE_SIZE = 50000; /** * 工作表对象 */ protected static volatile SXSSFSheet sheet; /** * excel工作簿数组 */ protected static ArrayList sheetList = new ArrayList(); /** * cell列数 */ private static final int cellLength = 10; private static final int windowSize = 200; public void customerData(CountDownLatch doneSignal) throws Exception { List list = queue.take(); int len = list.size(); for (int i = 0; i < len; i++) { Row row_value = null; synchronized (rowNo) { if (rowNo.get() % MAX_SHEET_PAGE_SIZE == 0) { if (count.get() < sheetList.size()) { sheet = sheetList.get(count.get()); pageRowNo.set(0); //每当新建了工作表就将当前工作表的行号重置为0 createHeader(); setColumnWidthByType(sheet, cellLength); count.incrementAndGet(); } } row_value = sheet.createRow(pageRowNo.incrementAndGet()); rowNo.incrementAndGet(); } addToRow(i, row_value, list); } System.out.println("sheet name " + sheet.getSheetName() + " rowNo " + rowNo + " DONESIGNAL_COUNT " + DONESIGNAL_COUNT); DONESIGNAL_COUNT.getAndAdd(-list.size()); list.clear(); if (DONESIGNAL_COUNT.get() == 0) { doneSignal.countDown(); } } protected void addToRow(int i, Row row_value, List _list) { List list = _list; Cell cel0_value = row_value.createCell(0); cel0_value.setCellValue(list.get(i).getTraceNo()); Cell cel2_value = row_value.createCell(1); cel2_value.setCellValue(list.get(i).getMerchantNum()); Cell cel3_value = row_value.createCell(2); cel3_value.setCellValue(list.get(i).getTotalFee() + ""); Cell cel4_value = row_value.createCell(3); cel4_value.setCellValue(list.get(i).getMerchantName()); Cell cel5_value = row_value.createCell(4); cel5_value.setCellValue(list.get(i).getDynamicType()); Cell cel6_value = row_value.createCell(5); cel6_value.setCellValue(list.get(i).getTransBegin()); Cell cel7_value = row_value.createCell(6); cel7_value.setCellValue(list.get(i).getTransStatus()); Cell cel8_value = row_value.createCell(7); cel8_value.setCellValue(list.get(i).getRefundFee() + ""); Cell cel9_value = row_value.createCell(8); cel9_value.setCellValue(list.get(i).getOutTransNo()); Cell cel10_value = row_value.createCell(9); cel10_value.setCellValue(list.get(i).getRateFee()); //Cell cel11_value = row_value.createCell(10); // cel11_value.setCellValue(list.get(i).getBankType()); /* Cell cel12_value = row_value.createCell(11); cel2_value.setCellValue(list.get(i).getTerminalNum());*/ } /** * 定义表头 */ protected void createHeader() { Row row = sheet.createRow(0); Cell cel0 = row.createCell(0); cel0.setCellValue("traceNo"); Cell cel2 = row.createCell(1); cel2.setCellValue("merchantNum"); Cell cel3 = row.createCell(2); cel3.setCellValue("totalFee"); Cell cel4 = row.createCell(3); cel4.setCellValue("merchantName"); Cell cel5 = row.createCell(4); cel5.setCellValue("dynamicType"); Cell cel6 = row.createCell(5); cel6.setCellValue(" transBegin "); Cell cel7 = row.createCell(6); cel7.setCellValue("transStatus"); Cell cel8 = row.createCell(7); cel8.setCellValue("refundFee"); Cell cel9 = row.createCell(8); cel9.setCellValue("outTransNo"); Cell cel10 = row.createCell(9); cel10.setCellValue("rateFee"); //Cell cel11 = row.createCell(10); // cel11.setCellValue("bankType"); /* Cell cel2 = row.createCell(11); cel2.setCellValue("terminalNum");*/ } protected static int getListCount(List tableNameArr) throws Exception { PreparedStatement stmt = DbUtils.getStm(); int listCount = 0; for (int i = 0; i < tableNameArr.size(); i++) { String queryCount = "select count(1) from " + tableNameArr.get(i); ResultSet rsTotal = stmt.executeQuery(queryCount); rsTotal.next(); listCount += rsTotal.getInt(1); } return listCount; } /** * 生成工作簿 * * @param sheet */ protected static void setSheet(SXSSFSheet sheet) { int size = 1; if (TOTAL_COUNT > MAX_SHEET_PAGE_SIZE) { size = TOTAL_COUNT / MAX_SHEET_PAGE_SIZE == 0 ? TOTAL_COUNT / MAX_SHEET_PAGE_SIZE : (TOTAL_COUNT / MAX_SHEET_PAGE_SIZE) + 1; } for (int i = 0; i < size; i++) { /**建立新的sheet对象*/ sheet = wb.createSheet("我的第" + i + "个工作簿"); sheet.setRandomAccessWindowSize(windowSize); /**动态指定当前的工作表*/ sheet = wb.getSheetAt(i); sheetList.add(sheet); } } /** * 根据类型指定excel文件的列宽 */ private void setColumnWidthByType(SXSSFSheet sheet, int titleLength) { sheet.trackAllColumnsForAutoSizing(); for (int i = 0; i < titleLength; i++) { int columnWidth = sheet.getRow(0).getCell(i).getStringCellValue().length();//获取表头的宽度 int autowidth = (int) SheetUtil.getColumnWidth(sheet, i, false, 1, sheet.getLastRowNum()); if (columnWidth > autowidth) { sheet.setColumnWidth(i, (int) 400.0D * (columnWidth + 1)); } else { sheet.autoSizeColumn(i); } } } /** * 添加表名 * * @param dateList */ private static void addTable(List dateList) { dateList.forEach(date -> { /**兴业*/ tableNameArr.add("xingye_bill_download_day_101590267206_" + date); tableNameArr.add("xingye_bill_download_day_101540080217_" + date); /**汇付*/ tableNameArr.add("posp_huifu_detail_day_" + date); }); } /** * 添加sql * * @param dateList * @return */ private static List getSqlList(List dateList, String where) { List listSql = new ArrayList(); dateList.forEach(date -> { /**兴业*/ String sql1 = "select id,merchant_num merchantNum,merchant_name merchantName,trans_type dynamicType,trans_time transBegin" + " ,trans_status transStatus,total_fee totalFee,refund_fee refundFee,third_merchant_num outTransNo," + " rate_fee rateFee ,trace_num traceNo from xingye_bill_download_day_101590267206_" + date + " WHERE id > ownId " + where + " ORDER BY id ASC LIMIT " + PAGE_SIZE; String sql2 = "select id,merchant_num merchantNum,merchant_name merchantName,trans_type dynamicType,trans_time transBegin" + " ,trans_status transStatus,total_fee totalFee,refund_fee refundFee,third_merchant_num outTransNo," + " rate_fee rateFee ,trace_num traceNo from xingye_bill_download_day_101540080217_" + date + " WHERE id > ownId " + where + " ORDER BY id ASC LIMIT " + PAGE_SIZE; /**汇付*/ String sql3 = "select id,merchant_num merchantNum,merchant_name merchantName ,trade_type dynamicType,DATE_FORMAT(trade_date,'%Y-%m-%d-%H-%i-%s') transBegin " + " ,trade_status transStatus,total_fee totalFee,recorded_money refundFee,outside_order_num outTransNo, " + " rate_fee rateFee ,trace_num traceNo from posp_huifu_detail_day_" + date + " WHERE id > ownId " + where + " ORDER BY id ASC LIMIT " + PAGE_SIZE; listSql.add(sql1); listSql.add(sql2); listSql.add(sql3); }); return listSql; } /** * 得到两个日期之间的天数,数组 * * @param dBegin * @param dEnd * @return */ public static List findDates(Date dBegin, Date dEnd) { List lDate = new ArrayList(); lDate.add(sdf.format(dBegin)); Calendar calBegin = Calendar.getInstance(); // 使用给定的 Date 设置此 Calendar 的时间 calBegin.setTime(dBegin); Calendar calEnd = Calendar.getInstance(); // 使用给定的 Date 设置此 Calendar 的时间 calEnd.setTime(dEnd); // 测试此日期是否在指定日期之后 while (dEnd.after(calBegin.getTime())) { // 根据日历的规则,为给定的日历字段添加或减去指定的时间量 calBegin.add(Calendar.DAY_OF_MONTH, 1); lDate.add(sdf.format(calBegin.getTime())); } return lDate; } /** * 使用多线程进行Excel写操作,提高写入效率。 */ public static void multiThreadWrite(ExecutorService es, CountDownLatch doneSignal, String xlsFile) { try { /**预生产数据*/ long startTime = System.currentTimeMillis(); //开始时间 Date begin = sdf.parse("20190602"); Date end = sdf.parse("20190602"); List dateList = findDates(begin, end); addTable(dateList); TOTAL_COUNT = getListCount(tableNameArr); DONESIGNAL_COUNT = new AtomicInteger(TOTAL_COUNT); setSheet(sheet); sheet = sheetList.get(0); String where = ""; getSqlList(dateList, where).forEach(sql -> { try { sqlQueue.put(sql); } catch (InterruptedException e) { e.printStackTrace(); } }); /**多线程处理数据*/ int size = sqlQueue.size(); /**最多创建10个线程,确保线程不在开始时就被阻塞*/ if (size > 0) { size = size > 12 ? 10 : size; for (int i = 0; i < size; i++) { es.submit(new PoiProductor(doneSignal)); } } PoiExcel poiExcel = new PoiExcel(); es.submit(new PoiWriter(doneSignal, poiExcel)); doneSignal.await(); System.out.println("read finish execute time: " + (System.currentTimeMillis() - startTime) / 1000 + " s"); FileOutputStream os = new FileOutputStream(xlsFile); wb.write(os); os.flush(); os.close(); System.out.println(" outPutStream finish execute time: " + (System.currentTimeMillis() - startTime) / 1000 + " s"); } catch (Exception e) { e.printStackTrace(); } } }

从数据库读数 生产者   ↵

package com.example.demo.test; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.text.ParseException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; /** * @Author: myc * @Description: 获取对账单表中的数据并放在queue中 ,这里每个线程对应一张表,读完可读剩余的某一张表,保证原子性。 * 新建的线程个数,不要超过sqlQueue的大小,否则会有线程阻塞消耗内存。 * @Date 下午 5:37 2019/6/28 0028 */ public class PoiProductor implements Runnable { private final CountDownLatch doneSignal; public PoiProductor(CountDownLatch doneSignal) { this.doneSignal = doneSignal; } @Override public void run() { List list = null; String sql = null; int id = 0; while (true) { try { //synchronized (doneSignal) { if (null == list || list.size() < 1) { id = 0; if (PoiExcel.sqlQueue.size() > 0) { sql = PoiExcel.sqlQueue.take(); } else { break; } } //} if (null != list && list.size() > 0) { id = getId(list); } list = getListData(sql, id); PoiExcel.queue.put(list); } catch (Exception e) { e.printStackTrace(); } } } public static int getId(List list) throws Exception { List _list = list; int id = _list.get(_list.size() - 1).getId(); return id; } private static void addToList(List _list, ResultSet rs) throws SQLException, ParseException { List list = _list; while (rs.next()) { String dynamicType = getDynamicType(rs); String transStatus = getTransStatus(rs); PoiEntity usera = new PoiEntity(); usera.setId((rs.getInt("id"))); usera.setTraceNo((rs.getString("traceNo"))); usera.setMerchantNum((rs.getString("merchantNum"))); usera.setTotalFee(rs.getBigDecimal("totalFee")); usera.setMerchantName(rs.getString("merchantName")); // usera.setTerminalNum(rs.getString("terminalNum")); usera.setDynamicType(dynamicType); usera.setTransBegin(rs.getString("transBegin")); usera.setTransStatus(transStatus); usera.setRefundFee(rs.getBigDecimal("refundFee")); usera.setOutTransNo(rs.getString("outTransNo")); // usera.setBankType(rs.getString("bankType")); usera.setRateFee(rs.getString("rateFee")); list.add(usera); } } private static String getTransStatus(ResultSet rs) throws SQLException { String transStatus = rs.getString("transStatus"); /* if (transStatus.contains("weixin")) { transStatus = "微信"; }*/ return transStatus; } private static String getDynamicType(ResultSet rs) throws SQLException { String dynamicType = rs.getString("dynamicType"); /* if (dynamicType.contains("weixin")) { dynamicType = "微信支付"; } if (dynamicType.contains("alipay")) { dynamicType = "支付宝支付"; }*/ return dynamicType; } public static List getListData(String sql, int id) throws Exception { List _list = new ArrayList(); PreparedStatement stmt = DbUtils.getStm(); //posp_merchant_account_stats_detail_day_20190607 sql = sql.replace("ownId", id + ""); ResultSet rs = stmt.executeQuery(sql); if (null != rs) { addToList(_list, rs); return _list; } else { _list.clear(); return null; } } }

将数据写入到excel     --消费者

package com.example.demo.test; import java.util.concurrent.CountDownLatch; /** * @Author: myc 进行sheet写操作 * @Description: * @Date 上午 9:58 2019/6/28 0028 */ public class PoiWriter implements Runnable { private final CountDownLatch doneSignal; private PoiExcel poiExcel; public PoiWriter(CountDownLatch doneSignal,PoiExcel poiExcel) { this.doneSignal = doneSignal; this.poiExcel = poiExcel; } @Override public void run() { try { while (true) { if(doneSignal.getCount()


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有