springboot整合springbatch和xxljob实现定时数据同步(完整代码)
前言:公司一些老项目的定时任务使用spring自带的调度器来完成数据同步的,久而久之带来很多的不方便。首先没办法快速定位监控定时任务的执行情况,其次就是在做数据同步处理的时候,遇到业务比较复杂,数据量比较大的情况,会经常执行非常慢,超时导致定时任务执行失败。基于上诉的两种需求,公司决定将老项目的定时任务进行重构和迁移,然后我们组长架构选型是使用xxljob调度器和springbatch批处理两个轻量级框架,因为之前对springbatch不是很熟悉,趁此机会学习记录下。
之前我就写过文章springboot整合xxljob使用,现在是在此基础上再整合springbatch实现批处理逻辑,如果对xxljob不是很熟悉可以先看这个:https://blog.csdn.net/qq798867485/article/details/131423174
当然要使用xxljob首先要搭建任务调度中心,这个我也写过一篇文章:https://blog.csdn.net/qq798867485/article/details/131415408
言归正传,我们学习任意一个新技术,首先离不开官方文档,springbatch官方文档地址:https://docs.spring.io/spring-batch/docs/4.3.10/reference/html/
Spring Batch是一个轻量级的综合批处理框架,旨在实现对企业系统的日常运作至关重要的强大批处理应用的开发。Spring Batch不是一个调度框架。Spring Batch旨在与调度器一起工作,而不是取代调度器。
1. springbatch的基本
1.1 基础架构
这是springbatch的基本架构图,无论多复杂的业务都离不开这个架构。
在 Spring Batch 中,
Job
只是一个Step
实例的容器。 JobExecution是运行过程中实际发生的事情的主要存储机制,它包含了许多必须被控制和持久化的属性,JobExecution的ExecutionContext可以实现数据在不同Step传递.
每个
Job
完全由一个或多个步骤(step)组成。一个Step
包含定义和控制实际批处理的所有必要信息.StepExecution的ExecutionContext是不可以在不同Step传递数据的。 ItemReader对一个Step
的输入是对象是一个个读取, ItemProcessor也是对一个个item单独处理业务逻辑,而 ItemWriter每次都是一批或一大批的item来处理输出数据的。
1.2 数据表
springbatch有9张表,依赖包自己提供了相关的建表语句脚本,这些脚本位于
org.springframework.batch.core
包中。其中有六张表是比较重要的,我们可以通过这些表数据了解批处理的执行状态和结果,方便追踪日志,多次批处理等。BATCH_JOB_INSTANCE :保存与
JobInstance
相关的所有信息,是整个层次结构的顶层BATCH_JOB_EXECUTION_PARAMS:保存与
JobParameters
对象相关的所有信息。它包含传递给Job
的 0 个或多个键/值对,可作为 job 运行时的参数记录。对于每个有助于生成 job 标识的参数,IDENTIFYING
标志都会被设置为true
。BATCH_JOB_EXECUTION:保存与
JobExecution
对象相关的所有信息。每次运行一个Job
时,该表中总会有一个新的被称为JobExecution
的对象和一行新记录。BATCH_STEP_EXECUTION :保存与
StepExecution
对象相关的所有信息。该表在很多方面都与BATCH_JOB_EXECUTION
表相似,每个Step
创建的每个JobExecution
总是至少有一个条目。BATCH_JOB_EXECUTION_CONTEXT :保存与
Job
ExecutionContext
相关的所有信息。每个JobExecution
都有一个Job
ExecutionContext
,它包含特定 job 执行所需的所有 job 级数据。这些数据通常代表 job 失败后必须检索的状态,以便JobInstance
可以 “从原处开始”。BATCH_STEP_EXECUTION_CONTEXT :保存与
Step
的ExecutionContext
相关的所有信息。每个StepExecution
都有一个ExecutionContext
,它包含特定 step execution 需要持久化的所有数据。这些数据通常代表失败后必须检索的状态,以便JobInstance
可以 “从原处开始”。
2. springboot整合springbatch
2.1 引入springbatch依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId>
</dependency>
2.2 在yml文件
关闭项目启动时,自动启动job,其次项目不配置数据库表自动生成,因为在启动项目之前,就在对应的数据库建好相关的表,建表脚本可以到org.springframework.batch.core取相关数据库的脚本,我这边使用的mysql数据库,对应的建表脚本如下
spring:batch:job:enabled: false
建表脚本:
-- Autogenerated: do not edit this fileCREATE TABLE BATCH_JOB_INSTANCE (JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY ,VERSION BIGINT ,JOB_NAME VARCHAR(100) NOT NULL,JOB_KEY VARCHAR(32) NOT NULL,constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ENGINE=InnoDB;CREATE TABLE BATCH_JOB_EXECUTION (JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,VERSION BIGINT ,JOB_INSTANCE_ID BIGINT NOT NULL,CREATE_TIME DATETIME(6) NOT NULL,START_TIME DATETIME(6) DEFAULT NULL ,END_TIME DATETIME(6) DEFAULT NULL ,STATUS VARCHAR(10) ,EXIT_CODE VARCHAR(2500) ,EXIT_MESSAGE VARCHAR(2500) ,LAST_UPDATED DATETIME(6),JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ENGINE=InnoDB;CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (JOB_EXECUTION_ID BIGINT NOT NULL ,TYPE_CD VARCHAR(6) NOT NULL ,KEY_NAME VARCHAR(100) NOT NULL ,STRING_VAL VARCHAR(250) ,DATE_VAL DATETIME(6) DEFAULT NULL ,LONG_VAL BIGINT ,DOUBLE_VAL DOUBLE PRECISION ,IDENTIFYING CHAR(1) NOT NULL ,constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;CREATE TABLE BATCH_STEP_EXECUTION (STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,VERSION BIGINT NOT NULL,STEP_NAME VARCHAR(100) NOT NULL,JOB_EXECUTION_ID BIGINT NOT NULL,START_TIME DATETIME(6) NOT NULL ,END_TIME DATETIME(6) DEFAULT NULL ,STATUS VARCHAR(10) ,COMMIT_COUNT BIGINT ,READ_COUNT BIGINT ,FILTER_COUNT BIGINT ,WRITE_COUNT BIGINT ,READ_SKIP_COUNT BIGINT ,WRITE_SKIP_COUNT BIGINT ,PROCESS_SKIP_COUNT BIGINT ,ROLLBACK_COUNT BIGINT ,EXIT_CODE VARCHAR(2500) ,EXIT_MESSAGE VARCHAR(2500) ,LAST_UPDATED DATETIME(6),constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,SHORT_CONTEXT VARCHAR(2500) NOT NULL,SERIALIZED_CONTEXT TEXT ,constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ENGINE=InnoDB;CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,SHORT_CONTEXT VARCHAR(2500) NOT NULL,SERIALIZED_CONTEXT TEXT ,constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;CREATE TABLE BATCH_STEP_EXECUTION_SEQ (ID BIGINT NOT NULL,UNIQUE_KEY CHAR(1) NOT NULL,constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ);CREATE TABLE BATCH_JOB_EXECUTION_SEQ (ID BIGINT NOT NULL,UNIQUE_KEY CHAR(1) NOT NULL,constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ);CREATE TABLE BATCH_JOB_SEQ (ID BIGINT NOT NULL,UNIQUE_KEY CHAR(1) NOT NULL,constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);
2.3 在启动类加上注解,保证springbatch的类被注入spring容器内
@EnableBatchProcessing
2.4 编写批处理作业进行
模拟一个场景,从文件中读取数据,同步到数据库表A,生成50万条数据
编写相关同步批处理作业
@Configuration
public class CsvToDbJobConfig {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate SqlSessionFactory sqlSessionFactory;@Autowiredprivate UserService userService;@Value("${job.data.path}")private String path;// 定义itemReader@Beanpublic FlatFileItemReader<User> csvToDbItemReader(){return new FlatFileItemReaderBuilder<User>().name("csvToDbItemReader").saveState(false).resource(new PathResource(new File(path,"user.csv").getAbsolutePath())).delimited().names("id","name","age","addr").targetType(User.class).build();}// 定义itemWriter@Beanpublic MyBatisBatchItemWriter<User> csvToDbItemWriter(){return new MyBatisBatchItemWriterBuilder<User>().sqlSessionFactory(sqlSessionFactory).statementId("com.example.springbatch.mapper.UserMapper.saveUserTemp").build();}// 定义Step@Beanpublic Step csvToDbStep(){return stepBuilderFactory.get("csvToDbStep").<User,User>chunk(10000).reader(csvToDbItemReader())
// .writer(userService::saveUserTempBatch) // 使用 UserService 执行批量插入.writer(csvToDbItemWriter()).taskExecutor(new SimpleAsyncTaskExecutor()).build();}// 定义Job@Beanpublic Job csvToDbJob(){return jobBuilderFactory.get("csvToDbJob").start(csvToDbStep()).incrementer(new RunIdIncrementer()).listener(new CsvToDbJobListener()).build();}
}
模拟另一个场景,从数据库中A表中读取数据,同步到数据库表B生成数据。使用异步分区处理方式来生成50w条数据,
package com.example.springbatch.job.config;import com.example.springbatch.entity.User;
import com.example.springbatch.job.listener.CsvToDbJobListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisBatchItemWriter;
import org.mybatis.spring.batch.MyBatisCursorItemReader;
import org.mybatis.spring.batch.MyBatisPagingItemReader;
import org.mybatis.spring.batch.builder.MyBatisBatchItemWriterBuilder;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;import java.util.HashMap;
import java.util.Map;/*** @Description* @Author GoryLee* @Date 2024/9/18*/
@Configuration
@Slf4j
public class DbToDbJobConfig {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate SqlSessionFactory sqlSessionFactory;// 从步骤@Bean@StepScopepublic MyBatisPagingItemReader<User> dbToDbItemReader(@Value("#{stepExecutionContext[from]}") Integer from,@Value("#{stepExecutionContext[to]}") Integer to,@Value("#{stepExecutionContext[range]}") Integer range){log.info("------MyBatisPagingItemReader开始----from:"+from+"----to:"+to+"-----每片数量:"+range);MyBatisPagingItemReader<User> itemReader = new MyBatisPagingItemReader<>();itemReader.setSqlSessionFactory(sqlSessionFactory);itemReader.setPageSize(1000);itemReader.setQueryId("com.example.springbatch.mapper.UserMapper.selectUserTempList");Map<String,Object> map = new HashMap<>();map.put("from",from);map.put("to",to);itemReader.setParameterValues(map);return itemReader;}@Beanpublic MyBatisBatchItemWriter<User> dbToDbItemWriter(){return new MyBatisBatchItemWriterBuilder<User>().sqlSessionFactory(sqlSessionFactory).statementId("com.example.springbatch.mapper.UserMapper.saveUser").build();}// 定义Step@Beanpublic Step workerStep(){return stepBuilderFactory.get("dbToDb_workerStep").<User,User>chunk(10000).reader(dbToDbItemReader(null,null,null)).writer(dbToDbItemWriter()).taskExecutor(new SimpleAsyncTaskExecutor()).build();}// 分区器@Beanpublic Partitioner dbToDbPartitioner(){return new DbToDbPartitioner();}// 分区处理器@Beanpublic PartitionHandler dbToDbPartitionerHandler(){TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();handler.setGridSize(50);handler.setStep(workerStep());handler.setTaskExecutor(new SimpleAsyncTaskExecutor());try {handler.afterPropertiesSet();} catch (Exception e) {log.error(e.getMessage());}return handler;}// 主步骤@Beanpublic Step masterStep(){return stepBuilderFactory.get("dbToDb_masterStep").partitioner(workerStep().getName(),dbToDbPartitioner()).partitionHandler(dbToDbPartitionerHandler()).build();}// 作业@Beanpublic Job dbToDbJob(){return jobBuilderFactory.get("dbToDbJob").start(masterStep()).incrementer(new RunIdIncrementer()).build();}
}
分区器
package com.example.springbatch.job.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;import java.util.HashMap;
import java.util.Map;/*** @Description 分区器设置,把从步骤的from,to,和range 3个设置到从步骤上下文中* @Author GoryLee* @Date 2024/9/18*/
@Slf4j
public class DbToDbPartitioner implements Partitioner {@Overridepublic Map<String, ExecutionContext> partition(int girdSize) {String text = "---DbToDbPartitioner--第%s分区-----开始:%s-----结束:%s ---- 数据量:%s------";Map<String,ExecutionContext> map = new HashMap<>();int from = 1;int to = 10000;int range = 10000;for (int i = 0; i < girdSize; i++) {log.info(String.format(text,i,from,to,(to-from+1)));ExecutionContext executionContext = new ExecutionContext();executionContext.putInt("from",from);executionContext.putInt("to",to);executionContext.putInt("range",range);to += range;from +=range;map.put("partitioner_"+i,executionContext);}return map;}
}
2.5 使用restful的方式启动批处理作业
package com.example.springbatch.controller;import com.example.springbatch.mapper.UserMapper;
import example.common.model.Result;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.*;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.net.URISyntaxException;
import java.util.Date;/*** @Description* @Author GoryLee* @Date 2024/9/14*/
@RestController
@RequestMapping("/job")
@Slf4j
public class JobController {@Autowiredprivate UserMapper userMapper;@Autowiredprivate JobLauncher jobLauncher;@Autowired@Qualifier("csvToDbJob")private Job csvToDbJob;@Autowired@Qualifier("dbToDbJob")private Job dbToDbJob;@Autowiredprivate JobExplorer jobExplorer;@GetMapping("/csvToDb")public Result<BatchStatus> csvToDb() throws Exception{userMapper.truncateAllTemp();JobParameters jobParameters = new JobParametersBuilder(new JobParameters(), jobExplorer).addLong("time", System.currentTimeMillis()).getNextJobParameters(csvToDbJob).toJobParameters();JobExecution run = jobLauncher.run(csvToDbJob, jobParameters);return Result.createSuccess(run.getStatus());}@GetMapping("/dbToDb")public Result<BatchStatus> dbToDb() throws Exception{userMapper.truncateAll();JobParameters jobParameters = new JobParametersBuilder(new JobParameters(), jobExplorer).addLong("time", System.currentTimeMillis()).getNextJobParameters(dbToDbJob).toJobParameters();JobExecution run = jobLauncher.run(dbToDbJob, jobParameters);return Result.createSuccess(run.getStatus());}
}
2.6 使用xxljob调度的方式启动批处理作业
package com.example.springbatch.job;import com.example.springbatch.service.UserService;
import com.xxl.job.core.handler.annotation.XxlJob;
import example.common.model.Result;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;/*** @Description* @Author GoryLee* @Date 2023/6/27*/
@Component
@Slf4j
public class DemoJob {@Autowiredprivate UserService userService;@Autowiredprivate JobLauncher jobLauncher;@Autowired@Qualifier("dbToDbJob")private Job dbToDbJob;@Autowiredprivate JobExplorer jobExplorer;@XxlJob("dbToDbJobHandler")public void dbToDbJobHandler() throws Exception {log.info(">>>>>>>>>> BEAN模式(类形式) 开始 >>>>>>>>>>>>>>>");userService.truncateAll();JobParameters jobParameters = new JobParametersBuilder(new JobParameters(), jobExplorer).addLong("time", System.currentTimeMillis()).getNextJobParameters(dbToDbJob).toJobParameters();JobExecution run = jobLauncher.run(dbToDbJob, jobParameters);log.info(">>>>>>>>>> 作业调度完成,状态:"+run.getStatus()+" >>>>>>>>>>>>>>>");log.info(">>>>>>>>>> BEAN模式(类形式) 成功 >>>>>>>>>>>>>>>");}
}
3. 总结
我这里只是简单的描述了整合使用,很多springbatch的概念是没有说清楚的,其次文章中很多代码为什么这么用我是没有说明的,所以想要更深入了解学习springbatch,还是到官方文档阅读学习:https://docs.spring.io/spring-batch/docs/4.3.10/reference/html/
另外文章中的两个简单场景,有完整代码的demo,如有需要运行,就自己去下载:https://github.com/gorylee/learnDemo/tree/master/springbatchDemo