您的位置:首页 > 科技 > 能源 > 创建网站用什么语言_中国住房和城乡建设厅官网_六年级上册数学优化设计答案_排名优化公司哪家效果好

创建网站用什么语言_中国住房和城乡建设厅官网_六年级上册数学优化设计答案_排名优化公司哪家效果好

2024/11/17 16:48:21 来源:https://blog.csdn.net/qq798867485/article/details/142390040  浏览:    关键词:创建网站用什么语言_中国住房和城乡建设厅官网_六年级上册数学优化设计答案_排名优化公司哪家效果好
创建网站用什么语言_中国住房和城乡建设厅官网_六年级上册数学优化设计答案_排名优化公司哪家效果好

springboot整合springbatch和xxljob实现定时数据同步(完整代码)

前言:公司一些老项目的定时任务使用spring自带的调度器来完成数据同步的,久而久之带来很多的不方便。首先没办法快速定位监控定时任务的执行情况,其次就是在做数据同步处理的时候,遇到业务比较复杂,数据量比较大的情况,会经常执行非常慢,超时导致定时任务执行失败。基于上诉的两种需求,公司决定将老项目的定时任务进行重构和迁移,然后我们组长架构选型是使用xxljob调度器和springbatch批处理两个轻量级框架,因为之前对springbatch不是很熟悉,趁此机会学习记录下。

之前我就写过文章springboot整合xxljob使用,现在是在此基础上再整合springbatch实现批处理逻辑,如果对xxljob不是很熟悉可以先看这个:https://blog.csdn.net/qq798867485/article/details/131423174

当然要使用xxljob首先要搭建任务调度中心,这个我也写过一篇文章:https://blog.csdn.net/qq798867485/article/details/131415408

言归正传,我们学习任意一个新技术,首先离不开官方文档,springbatch官方文档地址:https://docs.spring.io/spring-batch/docs/4.3.10/reference/html/

Spring Batch是一个轻量级的综合批处理框架,旨在实现对企业系统的日常运作至关重要的强大批处理应用的开发。Spring Batch不是一个调度框架。Spring Batch旨在与调度器一起工作,而不是取代调度器。

1. springbatch的基本
1.1 基础架构

在这里插入图片描述

这是springbatch的基本架构图,无论多复杂的业务都离不开这个架构。

在这里插入图片描述

在 Spring Batch 中,Job 只是一个 Step 实例的容器。 JobExecution是运行过程中实际发生的事情的主要存储机制,它包含了许多必须被控制和持久化的属性,JobExecution的ExecutionContext可以实现数据在不同Step传递.

在这里插入图片描述

每个 Job 完全由一个或多个步骤(step)组成。一个 Step 包含定义和控制实际批处理的所有必要信息.StepExecution的ExecutionContext是不可以在不同Step传递数据的。 ItemReader对一个 Step 的输入是对象是一个个读取, ItemProcessor也是对一个个item单独处理业务逻辑,而 ItemWriter每次都是一批或一大批的item来处理输出数据的。

1.2 数据表

springbatch有9张表,依赖包自己提供了相关的建表语句脚本,这些脚本位于 org.springframework.batch.core 包中。其中有六张表是比较重要的,我们可以通过这些表数据了解批处理的执行状态和结果,方便追踪日志,多次批处理等。

BATCH_JOB_INSTANCE :保存与 JobInstance 相关的所有信息,是整个层次结构的顶层

BATCH_JOB_EXECUTION_PARAMS:保存与 JobParameters 对象相关的所有信息。它包含传递给 Job 的 0 个或多个键/值对,可作为 job 运行时的参数记录。对于每个有助于生成 job 标识的参数,IDENTIFYING 标志都会被设置为 true

BATCH_JOB_EXECUTION:保存与 JobExecution 对象相关的所有信息。每次运行一个 Job 时,该表中总会有一个新的被称为 JobExecution 的对象和一行新记录。

BATCH_STEP_EXECUTION :保存与 StepExecution 对象相关的所有信息。该表在很多方面都与 BATCH_JOB_EXECUTION 表相似,每个 Step 创建的每个 JobExecution 总是至少有一个条目。

BATCH_JOB_EXECUTION_CONTEXT :保存与 Job ExecutionContext 相关的所有信息。每个 JobExecution 都有一个 Job ExecutionContext ,它包含特定 job 执行所需的所有 job 级数据。这些数据通常代表 job 失败后必须检索的状态,以便 JobInstance 可以 “从原处开始”。

BATCH_STEP_EXECUTION_CONTEXT :保存与 StepExecutionContext 相关的所有信息。每个 StepExecution 都有一个 ExecutionContext,它包含特定 step execution 需要持久化的所有数据。这些数据通常代表失败后必须检索的状态,以便 JobInstance 可以 “从原处开始”。

2. springboot整合springbatch
2.1 引入springbatch依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId>
</dependency>
2.2 在yml文件

关闭项目启动时,自动启动job,其次项目不配置数据库表自动生成,因为在启动项目之前,就在对应的数据库建好相关的表,建表脚本可以到org.springframework.batch.core取相关数据库的脚本,我这边使用的mysql数据库,对应的建表脚本如下

spring:batch:job:enabled: false

建表脚本:

-- Autogenerated: do not edit this fileCREATE TABLE BATCH_JOB_INSTANCE  (JOB_INSTANCE_ID BIGINT  NOT NULL PRIMARY KEY ,VERSION BIGINT ,JOB_NAME VARCHAR(100) NOT NULL,JOB_KEY VARCHAR(32) NOT NULL,constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ENGINE=InnoDB;CREATE TABLE BATCH_JOB_EXECUTION  (JOB_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,VERSION BIGINT  ,JOB_INSTANCE_ID BIGINT NOT NULL,CREATE_TIME DATETIME(6) NOT NULL,START_TIME DATETIME(6) DEFAULT NULL ,END_TIME DATETIME(6) DEFAULT NULL ,STATUS VARCHAR(10) ,EXIT_CODE VARCHAR(2500) ,EXIT_MESSAGE VARCHAR(2500) ,LAST_UPDATED DATETIME(6),JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ENGINE=InnoDB;CREATE TABLE BATCH_JOB_EXECUTION_PARAMS  (JOB_EXECUTION_ID BIGINT NOT NULL ,TYPE_CD VARCHAR(6) NOT NULL ,KEY_NAME VARCHAR(100) NOT NULL ,STRING_VAL VARCHAR(250) ,DATE_VAL DATETIME(6) DEFAULT NULL ,LONG_VAL BIGINT ,DOUBLE_VAL DOUBLE PRECISION ,IDENTIFYING CHAR(1) NOT NULL ,constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;CREATE TABLE BATCH_STEP_EXECUTION  (STEP_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,VERSION BIGINT NOT NULL,STEP_NAME VARCHAR(100) NOT NULL,JOB_EXECUTION_ID BIGINT NOT NULL,START_TIME DATETIME(6) NOT NULL ,END_TIME DATETIME(6) DEFAULT NULL ,STATUS VARCHAR(10) ,COMMIT_COUNT BIGINT ,READ_COUNT BIGINT ,FILTER_COUNT BIGINT ,WRITE_COUNT BIGINT ,READ_SKIP_COUNT BIGINT ,WRITE_SKIP_COUNT BIGINT ,PROCESS_SKIP_COUNT BIGINT ,ROLLBACK_COUNT BIGINT ,EXIT_CODE VARCHAR(2500) ,EXIT_MESSAGE VARCHAR(2500) ,LAST_UPDATED DATETIME(6),constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT  (STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,SHORT_CONTEXT VARCHAR(2500) NOT NULL,SERIALIZED_CONTEXT TEXT ,constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ENGINE=InnoDB;CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT  (JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,SHORT_CONTEXT VARCHAR(2500) NOT NULL,SERIALIZED_CONTEXT TEXT ,constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;CREATE TABLE BATCH_STEP_EXECUTION_SEQ (ID BIGINT NOT NULL,UNIQUE_KEY CHAR(1) NOT NULL,constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ);CREATE TABLE BATCH_JOB_EXECUTION_SEQ (ID BIGINT NOT NULL,UNIQUE_KEY CHAR(1) NOT NULL,constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ);CREATE TABLE BATCH_JOB_SEQ (ID BIGINT NOT NULL,UNIQUE_KEY CHAR(1) NOT NULL,constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);
2.3 在启动类加上注解,保证springbatch的类被注入spring容器内
@EnableBatchProcessing
2.4 编写批处理作业进行

模拟一个场景,从文件中读取数据,同步到数据库表A,生成50万条数据

编写相关同步批处理作业

@Configuration
public class CsvToDbJobConfig {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate SqlSessionFactory sqlSessionFactory;@Autowiredprivate UserService userService;@Value("${job.data.path}")private String path;// 定义itemReader@Beanpublic FlatFileItemReader<User> csvToDbItemReader(){return new FlatFileItemReaderBuilder<User>().name("csvToDbItemReader").saveState(false).resource(new PathResource(new File(path,"user.csv").getAbsolutePath())).delimited().names("id","name","age","addr").targetType(User.class).build();}// 定义itemWriter@Beanpublic MyBatisBatchItemWriter<User> csvToDbItemWriter(){return new MyBatisBatchItemWriterBuilder<User>().sqlSessionFactory(sqlSessionFactory).statementId("com.example.springbatch.mapper.UserMapper.saveUserTemp").build();}// 定义Step@Beanpublic Step csvToDbStep(){return stepBuilderFactory.get("csvToDbStep").<User,User>chunk(10000).reader(csvToDbItemReader())
//                .writer(userService::saveUserTempBatch) // 使用 UserService 执行批量插入.writer(csvToDbItemWriter()).taskExecutor(new SimpleAsyncTaskExecutor()).build();}// 定义Job@Beanpublic Job csvToDbJob(){return jobBuilderFactory.get("csvToDbJob").start(csvToDbStep()).incrementer(new RunIdIncrementer()).listener(new CsvToDbJobListener()).build();}
}

模拟另一个场景,从数据库中A表中读取数据,同步到数据库表B生成数据。使用异步分区处理方式来生成50w条数据,

package com.example.springbatch.job.config;import com.example.springbatch.entity.User;
import com.example.springbatch.job.listener.CsvToDbJobListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisBatchItemWriter;
import org.mybatis.spring.batch.MyBatisCursorItemReader;
import org.mybatis.spring.batch.MyBatisPagingItemReader;
import org.mybatis.spring.batch.builder.MyBatisBatchItemWriterBuilder;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;import java.util.HashMap;
import java.util.Map;/*** @Description* @Author GoryLee* @Date 2024/9/18*/
@Configuration
@Slf4j
public class DbToDbJobConfig {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate SqlSessionFactory sqlSessionFactory;// 从步骤@Bean@StepScopepublic MyBatisPagingItemReader<User> dbToDbItemReader(@Value("#{stepExecutionContext[from]}") Integer from,@Value("#{stepExecutionContext[to]}") Integer to,@Value("#{stepExecutionContext[range]}") Integer range){log.info("------MyBatisPagingItemReader开始----from:"+from+"----to:"+to+"-----每片数量:"+range);MyBatisPagingItemReader<User> itemReader = new MyBatisPagingItemReader<>();itemReader.setSqlSessionFactory(sqlSessionFactory);itemReader.setPageSize(1000);itemReader.setQueryId("com.example.springbatch.mapper.UserMapper.selectUserTempList");Map<String,Object> map = new HashMap<>();map.put("from",from);map.put("to",to);itemReader.setParameterValues(map);return itemReader;}@Beanpublic MyBatisBatchItemWriter<User> dbToDbItemWriter(){return new MyBatisBatchItemWriterBuilder<User>().sqlSessionFactory(sqlSessionFactory).statementId("com.example.springbatch.mapper.UserMapper.saveUser").build();}// 定义Step@Beanpublic Step workerStep(){return stepBuilderFactory.get("dbToDb_workerStep").<User,User>chunk(10000).reader(dbToDbItemReader(null,null,null)).writer(dbToDbItemWriter()).taskExecutor(new SimpleAsyncTaskExecutor()).build();}// 分区器@Beanpublic Partitioner dbToDbPartitioner(){return new DbToDbPartitioner();}// 分区处理器@Beanpublic PartitionHandler dbToDbPartitionerHandler(){TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();handler.setGridSize(50);handler.setStep(workerStep());handler.setTaskExecutor(new SimpleAsyncTaskExecutor());try {handler.afterPropertiesSet();} catch (Exception e) {log.error(e.getMessage());}return handler;}// 主步骤@Beanpublic Step masterStep(){return stepBuilderFactory.get("dbToDb_masterStep").partitioner(workerStep().getName(),dbToDbPartitioner()).partitionHandler(dbToDbPartitionerHandler()).build();}// 作业@Beanpublic Job dbToDbJob(){return jobBuilderFactory.get("dbToDbJob").start(masterStep()).incrementer(new RunIdIncrementer()).build();}
}

分区器

package com.example.springbatch.job.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;import java.util.HashMap;
import java.util.Map;/*** @Description 分区器设置,把从步骤的from,to,和range 3个设置到从步骤上下文中* @Author GoryLee* @Date 2024/9/18*/
@Slf4j
public class DbToDbPartitioner implements Partitioner {@Overridepublic Map<String, ExecutionContext> partition(int girdSize) {String text = "---DbToDbPartitioner--第%s分区-----开始:%s-----结束:%s ---- 数据量:%s------";Map<String,ExecutionContext> map = new HashMap<>();int from = 1;int to = 10000;int range = 10000;for (int i = 0; i < girdSize; i++) {log.info(String.format(text,i,from,to,(to-from+1)));ExecutionContext executionContext = new ExecutionContext();executionContext.putInt("from",from);executionContext.putInt("to",to);executionContext.putInt("range",range);to += range;from +=range;map.put("partitioner_"+i,executionContext);}return map;}
}
2.5 使用restful的方式启动批处理作业
package com.example.springbatch.controller;import com.example.springbatch.mapper.UserMapper;
import example.common.model.Result;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.*;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.net.URISyntaxException;
import java.util.Date;/*** @Description* @Author GoryLee* @Date 2024/9/14*/
@RestController
@RequestMapping("/job")
@Slf4j
public class JobController {@Autowiredprivate UserMapper userMapper;@Autowiredprivate JobLauncher jobLauncher;@Autowired@Qualifier("csvToDbJob")private Job csvToDbJob;@Autowired@Qualifier("dbToDbJob")private Job dbToDbJob;@Autowiredprivate JobExplorer jobExplorer;@GetMapping("/csvToDb")public Result<BatchStatus> csvToDb() throws Exception{userMapper.truncateAllTemp();JobParameters jobParameters = new JobParametersBuilder(new JobParameters(), jobExplorer).addLong("time", System.currentTimeMillis()).getNextJobParameters(csvToDbJob).toJobParameters();JobExecution run = jobLauncher.run(csvToDbJob, jobParameters);return Result.createSuccess(run.getStatus());}@GetMapping("/dbToDb")public Result<BatchStatus> dbToDb() throws Exception{userMapper.truncateAll();JobParameters jobParameters = new JobParametersBuilder(new JobParameters(), jobExplorer).addLong("time", System.currentTimeMillis()).getNextJobParameters(dbToDbJob).toJobParameters();JobExecution run = jobLauncher.run(dbToDbJob, jobParameters);return Result.createSuccess(run.getStatus());}
}
2.6 使用xxljob调度的方式启动批处理作业
package com.example.springbatch.job;import com.example.springbatch.service.UserService;
import com.xxl.job.core.handler.annotation.XxlJob;
import example.common.model.Result;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;/*** @Description* @Author GoryLee* @Date 2023/6/27*/
@Component
@Slf4j
public class DemoJob {@Autowiredprivate UserService userService;@Autowiredprivate JobLauncher jobLauncher;@Autowired@Qualifier("dbToDbJob")private Job dbToDbJob;@Autowiredprivate JobExplorer jobExplorer;@XxlJob("dbToDbJobHandler")public void dbToDbJobHandler() throws Exception {log.info(">>>>>>>>>> BEAN模式(类形式) 开始 >>>>>>>>>>>>>>>");userService.truncateAll();JobParameters jobParameters = new JobParametersBuilder(new JobParameters(), jobExplorer).addLong("time", System.currentTimeMillis()).getNextJobParameters(dbToDbJob).toJobParameters();JobExecution run = jobLauncher.run(dbToDbJob, jobParameters);log.info(">>>>>>>>>> 作业调度完成,状态:"+run.getStatus()+" >>>>>>>>>>>>>>>");log.info(">>>>>>>>>> BEAN模式(类形式) 成功 >>>>>>>>>>>>>>>");}
}
3. 总结

我这里只是简单的描述了整合使用,很多springbatch的概念是没有说清楚的,其次文章中很多代码为什么这么用我是没有说明的,所以想要更深入了解学习springbatch,还是到官方文档阅读学习:https://docs.spring.io/spring-batch/docs/4.3.10/reference/html/

另外文章中的两个简单场景,有完整代码的demo,如有需要运行,就自己去下载:https://github.com/gorylee/learnDemo/tree/master/springbatchDemo

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com