整体思路

整体思路很简单,就是在文件读取和数据多线程处理这两步发力

  • Excel数据分片读取
  • 线程池异步处理数据
  • Mybatis-Plus批量存储

实现过程

使用EasyExcel分片读取Excel大文件

https://easyexcel.opensource.alibaba.com/

参照EasyExcel官方文档,实现自己的读文件监听器,只需要新建自己的监听器类并实现ReadListener 接口,为了通用性,可以使用泛型<T>,然后实现监听器方法。

其中BATCH_COUNT指定每个分片的数据条数,具体数值可以最后调试的时候自己测试,找到最佳的数据量,需要考虑到Mysql一条语句批量插入的数据条数太大也会影响速度。

invoke方法指定每个分片数据的处理方式,doAfterAllAnalysed方法与invoke 方法保持一致即可,它处理的就是文件最后剩余的不足一个分片的数据。

代码如下:

/**
 * @author xg BLACK
 * @date 2022/11/24 23:28
 * description: EXCEL页面读取监听器
 */
@Slf4j
public class GeoPageReadListener<T> implements ReadListener<T> {
    /**
     * 单个处理数据量,经测试1000条数据效果较好
     */
    public static int BATCH_COUNT = 1000;

    /**
     * 数据的临时存储
     */
    private List<T> cachedDataList = new ArrayList<>(BATCH_COUNT);

    /**
     * consumer
     */
    private final Consumer<List<T>> consumer;

    public GeoPageReadListener(Consumer<List<T>> consumer) {
        this.consumer = consumer;
    }


    @Override
    public void invoke(T data, AnalysisContext context) {
        cachedDataList.add(data);
        if (cachedDataList.size() >= BATCH_COUNT) {
            log.info("读取数据量:{}", cachedDataList.size());
            consumer.accept(cachedDataList);
            cachedDataList = new ArrayList(BATCH_COUNT);
        }
    }

    /**
     * 所有数据读取完成之后调用
     *
     * @param context
     */
    @Override
    public void doAfterAllAnalysed(AnalysisContext context) {
        if (CollectionUtils.isNotEmpty(cachedDataList)) {
            //处理剩余的数据
            log.info("读取数据量:{}", cachedDataList.size());
            consumer.accept(cachedDataList);
        }
    }
}

定义线程池

定义的线程池相当于是8个核心线程,任务队列是无界队列。线程数也是要在最后测试的时候取一个合适的值。

/**
 * @author xg BLACK
 * @date 2022/11/25 03:18
 * description:
 */
@Configuration
public class GeoAsyncConfiguration {

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 设置核心线程数
        executor.setCorePoolSize(8);
        // 设置最大线程数,当核心线程数满了且队列满了,会创建新的线程,直到达到最大线程数。而当前队列是无界队列,所以不会满,此处设置无效。直接使用默认值,无限大
        //executor.setMaxPoolSize(16);
        // 设置队列容量,默认为Integer.MAX_VALUE
        //executor.setQueueCapacity(10000);
        // 设置线程活跃时间(秒)
        executor.setKeepAliveSeconds(60);
        // 设置默认线程名称
        executor.setThreadNamePrefix("geo-admin-thread-");
        // 设置拒绝策略。无界队列不会出现满的情况,所以不配置存储策略。直接抛出 RejectedExecutionException 异常
        //executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }
}

实现真正的批量插入

经过查看日志发现Mybatis-Plus的saveBatch在最后还是循环调用的INSERT INTO语句,这种情况下,测试多线程速度和单线程相差不大,所以需要实现真正的批量插入语句,两种方式,一种是在xml文件中自己拼接SQL语句,一种是通过给Mybatis-Plus注入器,增强批量插入。

这里是用的是增强注入器的方式,最后执行批量插入的时候是将所有数据拼接到一条SQL里面了。

/**
 * @author xg BLACK
 * @date 2022/11/25 11:00
 * description: mybatis-plus SQL注入器,增强批量插入
 */
@Component
public class BatchSqlInjector extends DefaultSqlInjector {
    @Override
    public List<AbstractMethod> getMethodList(Class<?> mapperClass, TableInfo tableInfo) {
        //MyBatis-Plus默认方法
        List<AbstractMethod> methodList = super.getMethodList(mapperClass, tableInfo);
        //注入InsertBatchSomeColumn。官方标注此处只通过MySQL测试,其他数据库未测试 https://gitee.com/baomidou/mybatis-plus/blob/3.0/mybatis-plus-extension/src/main/java/com/baomidou/mybatisplus/extension/injector/methods/InsertBatchSomeColumn.java
        //在当前当前项目中,pg库测试可用
        //例1: t -> !t.isLogicDelete() , 表示不要逻辑删除字段
        //例2: t -> !t.getProperty().equals("version") , 表示不要字段名为 version 的字段
        //例3: t -> t.getFieldFill() != FieldFill.UPDATE) , 表示不要填充策略为 UPDATE 的字段
        methodList.add(new InsertBatchSomeColumn(t -> !t.getProperty().equals("version")));
        return methodList;
    }
}

自定义SpiceBaseMapper,并继承BaseMapper,这样在BaseMapper功能都能实现的基础上,在SpiceBaseMapper上增加新的批量插入方法。

/**
 * @author xg BLACK
 * @date 2022/11/25 11:06
 * description:
 */
@Service
@Component
public interface SpiceBaseMapper<T> extends BaseMapper<T> {
    /**
     *  批量插入
     * @param entityList
     * @return
     */
    int insertBatchSomeColumn(List<T> entityList);
}

接下来,像使用BaseMapper一样使用就可以

/**
 * @author xg BLACK
 * @date 2022/11/25 11:08
 * description:
 */
@Mapper
@Component
public interface DataSwMbqcdssBatchInsertMapper extends SpiceBaseMapper<DataSwMbqcdssDO> {

}

Controller中接口

Controller中接口

@PostMapping("/create-sw-mbqcdss")
//@ApiOperation("创建数据源并导入密闭墙点实时数据")//swagger注解,可以删掉
//@PreAuthorize("@ss.hasPermission('geo:function-datasource:create')")//校验接口权限,可以删掉
public CommonResult<DataExcelImportRespVO> createFunctionDatasourceAndSwMbqcdss(@Valid FunctionDatasourceCreateReqVO createReqVO, @RequestParam("file") MultipartFile file) throws IOException, ExecutionException, InterruptedException {
    long filesize = file.getSize();
    log.info("导入文件大小:{}k",filesize / 1024);
    return success(functionDatasourceService.createFunctionDatasourceAndSwMbqcdss(createReqVO,file));
}

Service接口

/**
 * 创建数据源密闭墙点实时数据
 *
 * @param createReqVO 创建信息
 * @param file 要保存的数据
 * @return 导出响应对象
 */
DataExcelImportRespVO createFunctionDatasourceAndSwMbqcdss(@Valid FunctionDatasourceCreateReqVO createReqVO, MultipartFile file) throws IOException, ExecutionException, InterruptedException;

ServiceImpl实现类

@Override
@Transactional(rollbackFor = Exception.class) // 添加事务,异常则回滚所有导入
public DataExcelImportRespVO createFunctionDatasourceAndSwMbqcdss(FunctionDatasourceCreateReqVO createReqVO, MultipartFile file) throws IOException, ExecutionException, InterruptedException {
    //新增数据源,或者id
    FunctionDatasourceDO functionDatasource = FunctionDatasourceConvert.INSTANCE.convert(createReqVO);
    functionDatasourceMapper.insert(functionDatasource);
    Long id = functionDatasource.getId();
    //获取当前登录用户
    Long loginUserId = SecurityFrameworkUtils.getLoginUserId();
    //开始计时
    TimeInterval timer = DateUtil.timer();
    //DataExcelImportRespVO respVO = readExcelAndSave(DataSwMbqcdssImportExcelVO.class, file, data -> DataSwMbqcdssConvert.INSTANCE.convert(data).setDsId(id), dataSwMbqcdssInsertMapper::saveBatch);
    DataExcelImportRespVO respVO = readExcelAndSaveAsync(DataSwMbqcdssImportExcelVO.class,
            file,
            data -> {
                DataSwMbqcdssDO newDo = DataSwMbqcdssConvert.INSTANCE.convert(data).setDsId(id);
                //数据预处理,因为目前的批处理方法不会再自动填充数据,所以这里手动填充
                newDo.setCreator(String.valueOf(loginUserId));
                newDo.setUpdater(String.valueOf(loginUserId));
                newDo.setDeleted(false);
                return newDo;
            },
            dataSwMbqcdssBatchInsertMapper::insertBatchSomeColumn);
    //结束计时
    long interval = timer.interval();
    log.info("导入数据共花费:{}s", interval / 1000);
    respVO.setTime(interval);
    // 存储文件
    saveDsFile(id,file);
    return respVO;
}

只关注开始计时到计时结束中间的部分即可,其他都是一些业务需要的处理。

因为需要大数据量导入的数据类不止一个,所以将异步多线程导入数据readExcelAndSaveAsync单独提取出来封装了一个方法。参数分别为hea:Excel导入实体类的class;file:要导入的Excel文件;function:函数型函数式接口,数据处理函数,对数据加工操作(除了业务相关的数据处理操作,如果原本有使用Mybatis-Plus的自动填充,此时需要手动填充,因为自定义注入器实现的批量插入不会自动填充);dbFunction:函数型函数式接口,数据库操作方法,即上面定义的批量插入方法;

泛型<T> Excel导入实体类 例如DataSwMbqcdssImportExcelVO

泛型<R> 数据库实体类 例如DataSwMbqcdssDO

/**
 * 异步多线程导入数据
 * 采用自定义注入mybatis-plus的SQL注入器,实现真正的BatchInsert,但是需要注意的是项目配置文件需要在jdbc的url后面加上rewriteBatchedStatements=true
 * @param head  Excel导入实体类的class
 * @param file  要导入的Excel文件
 * @param function  数据处理函数,对数据加工
 * @param dbFunction  数据库操作
 * @return 导入结果
 * @param <T>  Excel导入实体类   例如DataSwMbqcdssImportExcelVO
 * @param <R>  数据库实体类  例如DataSwMbqcdssDO
 * @throws IOException
 * @throws ExecutionException
 * @throws InterruptedException
 */
private <T,R> DataExcelImportRespVO readExcelAndSaveAsync(Class<T> head, MultipartFile file, Function<T,R> function, Function<List<R>,Integer> dbFunction) throws IOException, ExecutionException, InterruptedException {
    Integer successCount = 0;
    Integer failCount = 0;
    //存储异步线程的执行结果
    Collection<Future<int[]>> futures = new ArrayList<Future<int[]>>();

    EasyExcel.read(file.getInputStream(), head, new GeoPageReadListener<T>(dataList -> {
        //转换DO,并设置数据源id

        List<R> list = dataList.parallelStream().map(function).collect(Collectors.toList());
        //异步批量插入
        futures.add(functionDatasourceAsyncService.saveAsyncBatch(list,dbFunction));
    })).sheet().doRead();
    //等待异步线程执行完毕
    for (Future<int[]> future : futures) {
        int[] counts = future.get();
        successCount += counts[0];
        failCount += counts[1];
    }
    log.info("存储成功总数据量:{},存储失败总数据量:{}", successCount,failCount);
    DataExcelImportRespVO respVO = DataExcelImportRespVO.builder().successCount(successCount).failCount(failCount).build();
    return respVO;
}

DataExcelImportRespVO是导入结果类,里面存储成功数量和失败数量

/**
 * @author xg BLACK
 * @date 2022/11/1 15:20
 * description:
 */
@ApiModel("管理后台 - 数据导入 Response VO")
@Data
@Builder
public class DataExcelImportRespVO {
    @ApiModelProperty(value = "导入成功的数量")
    private Integer successCount;

    @ApiModelProperty(value = "导入失败的数量")
    private Integer failCount;

    @ApiModelProperty(value = "导入用时")
    private Long time;
}

异步存储,使用@Async 异步存储,主线程通过Future<int[]>等待线程返回结果

/**
 * 批量插入
 * @param list  要分批处理的数据
 * @param dbFunction  数据库操作的方法
 * @param <T> 数据库实体类
 * @return 返回处理结果
 */
@Async
public <T> Future<int[]> saveAsyncBatch(List<T> list, Function<List<T>,Integer> dbFunction) {
    int size = list.size();
    int[] result = new int[2];
    log.info("saveAsyncBatch当前数据分片大小 size:{}",size);
    try {
        if (dbFunction.apply(list) > 0) {
            result[0] = size;
            log.info("{} 分片存储数据成功,数据量:{}",Thread.currentThread().getName(), size);
        } else {
            result[1] = size;
            log.info("{} 分片存储数据失败:{}",Thread.currentThread().getName(), size);
        }
    }catch (Exception e){
        result[1] = size;
        log.error("{} 分片存储数据出现异常,{}",Thread.currentThread().getName(),e.getMessage());
    }

    return new AsyncResult<int[]>(result);
}

其他优化

需要根据测试结果慢慢的调整,需要优化的点如下:

  • 分片数据大小
  • 线程数
  • 数据库连接池的连接数
  • 数据库和程序之间的网络通信情况

总结

其实整体思路很简单,其中几个关键点就是多线程和实现真正的批量插入。线程也不是越多越好,过犹不及,合适的才是最好的。