最近有个数据同步的工作,可是批量插入Oracle的时候很慢很慢,数也不多,但就是很慢,为此,写了个多线程的插入代码,作为笔记,防止以后忘记,所以记录一下。
1、创建执行数据插入任务类
import com.byqh.eaewdp.system.dao.xxxMapper;
import com.byqh.eaewdp.system.entity.xxxEntity;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;import java.util.List;/*** DataInsertTask 类实现了 Runnable 接口,用于在多线程环境下执行数据插入任务。* 它封装了数据批次和 SQL 会话工厂,以便在单独的线程中执行数据库插入操作。*/
public class DataInsertTask implements Runnable {// 私有成员变量,用于存储要插入数据库的数据批次private final List<xxxEntity> dataBatch;// 私有成员变量,用于创建 SQL 会话private final SqlSessionFactory sqlSessionFactory;/*** 构造函数,用于初始化 DataInsertTask 实例。** @param dataBatch 要插入数据库的数据批次* @param sqlSessionFactory SQL 会话工厂,用于创建 SQL 会话*/public DataInsertTask(List<xxxEntity> dataBatch, SqlSessionFactory sqlSessionFactory) {this.dataBatch = dataBatch;this.sqlSessionFactory = sqlSessionFactory;}/*** 实现 Runnable 接口的 run 方法。* 在新的线程中执行此方法时,会打开一个 SQL 会话(以批量执行模式),* 执行数据插入操作,并提交会话以保存更改。*/@Overridepublic void run() {try (SqlSession session = sqlSessionFactory.openSession(ExecutorType.BATCH)) {// 从 SQL 会话中获取 TpjTyrzYgxxMapper 的实例xxxMapper mapper = session.getMapper(xxxMapper.class);// 调用 mapper 的 insertBatchSomeColumn 方法来插入数据批次// 假设该方法已经配置为支持批量插入mapper.insertBatchSomeColumn(dataBatch);// 提交会话以保存对数据库的更改session.commit();} catch (Exception e) {// 在这里处理可能的异常,例如记录日志或抛出运行时异常// 注意:在 try-with-resources 语句中,不需要显式关闭 SqlSession,// 因为它会在语句结束时自动关闭(无论是否发生异常)e.printStackTrace();}}}
2、数据分割
/*** 将给定的列表分割成多个子列表,每个子列表包含最多指定数量的元素。** @param list 要分割的原始列表。* @param size 每个子列表(批次)应包含的最大元素数量。* @return 包含所有子列表的列表,其中每个子列表都是原始列表的一部分。*/private List<List<xxxEntity>> partitionList(List<xxxEntity> list, int size) {// 创建一个新的列表来存储分割后的子列表List<List<xxxEntity>> parts = new ArrayList<>();// 获取原始列表的大小final int N = list.size();// 使用for循环,每次迭代处理size个元素for (int i = 0; i < N; i += size) {// 计算当前子列表的结束索引,不超过原始列表的大小int end = Math.min(N, i + size);// 使用subList方法从原始列表中截取一个子列表,并将其添加到parts列表中parts.add(list.subList(i, end));}// 返回包含所有子列表的列表return parts;}
3、并发插入数据
@Autowiredprivate SqlSessionFactory sqlSessionFactory;/*** 并发地插入数据批次。** @param dataList 包含要插入的数据的列表。* @param batchSize 每个批次中的元素数量。* @param threadPoolSize 线程池中的线程数量。*/public void insertDataInBatchesConcurrently(List<xxxEntity> dataList, int batchSize, int threadPoolSize) {// 创建一个固定大小的线程池ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);// 将数据列表分成多个批次List<List<xxxEntity>> batches = partitionList(dataList, batchSize);try {// 遍历每个批次for (List<xxxEntity> batch : batches) {// 为每个批次创建一个数据插入任务DataInsertTask task = new DataInsertTask(batch, sqlSessionFactory);// 提交任务到线程池执行executor.submit(task);}// 关闭线程池,不再接受新任务,但已提交的任务会继续执行executor.shutdown();// 等待线程池中的所有任务完成,最多等待180秒if (!executor.awaitTermination(180, TimeUnit.SECONDS)) {// 需要停止已提交的任务executor.shutdownNow();// 如果超时,则这里的逻辑实际上应该是打印一条错误消息或进行其他处理System.err.println("任务执行超时,但请注意shutdown()不会停止已提交的任务。");} else {// 所有任务成功完成或在超时前完成System.out.println("所有任务完成。");}} catch (InterruptedException e) {// 如果当前线程在等待时被中断,则尝试停止所有正在执行的任务executor.shutdownNow();// 保留中断状态,以便调用者可以决定如何处理Thread.currentThread().interrupt();// 打印中断消息System.err.println("任务执行被中断");}}
4、插入的SQL
<insert id="insertBatchSomeColumn">INSERT ALL<foreach collection="list" item="item" index="index" separator=" ">INTO "TABLE_NAME"("CODE", "NAME")VALUES(#{item.code,jdbcType=VARCHAR},#{item.name,jdbcType=VARCHAR})</foreach>SELECT 1 FROM DUAL</insert>