目录
概述
核心架构
核心特点
应用场景
什么是任务调度
快速入门
获取源码
初始化调度数据库
基本配置
数据源datasource
邮箱email(可选)
会话令牌access token
启动调度中心
启动执行器
依赖
yaml基本配置
XxlJobConfig类配置
定义执行任务
添加执行任务
初级阶段
时间转为Cron表达式工具类
XxlJobRemoteApiUtils工具类
引入远程发送请求依赖
允许远程调用
概述
XXL-Job 是一个轻量级、分布式任务调度平台,由国内技术团队开发并开源。它旨在解决分布式系统中的定时任务调度问题,提供了一整套简单、高效且可靠的解决方案
核心架构
XXL-Job 的架构主要由以下几部分组成:
- 调度中心(Admin):负责任务的管理、调度策略、触发时机以及调度请求的发起。它提供了可视化的 Web 管理界面,方便用户进行任务的增删改查和调度监控。
-
执行器(Executor):部署在业务服务环境中,用于接收调度中心的请求并执行具体的任务逻辑
-
任务代码:由开发者编写的业务逻辑代码,注册到执行器中,由调度中心触发执行
核心特点
-
轻量级设计:核心代码简洁高效,易于集成和部署
-
分布式调度:支持多机分布式部署,可水平扩展,提高系统可用性和负载能力
-
简单易用:提供简洁的 API 和可视化界面,便于任务的创建、管理和监控
-
功能丰富:支持多种任务类型(如定时任务、周期任务、一次性任务),并提供任务分片、失败重试、任务依赖等功能
-
弹性扩缩容:支持动态添加或移除执行器节点,无需停止服务
-
高可用性:通过多节点部署和故障转移机制,确保任务的不中断执行
应用场景
XXL-Job 广泛应用于以下场景:
-
定时任务:如数据备份、报表生成、系统维护等
-
分布式任务处理:支持任务分片并行执行,提高任务处理效率
-
弹性扩缩容:根据业务量动态调整执行器数量,应对业务波动
-
业务流程自动化:实现复杂业务流程的自动化调度
什么是任务调度
我们可以思考一下下面业务场景的解决方案:
- 某电商平台需要每天上午10点,下午3点,晚上8点发放一批优惠券
- 某银行系统需要在信用卡到期还款日的前三天进行短信提醒
- 某财务系统需要在每天凌晨0:10分结算前一天的财务数据,统计汇总
以上场景就是任务调度所需要解决的问题。
任务调度是为了自动完成特定任务,在约定的特定时刻去执行任务的过程。
快速入门
官网: 分布式任务调度平台XXL-JOB
获取源码
源码仓库地址 |
https://github.com/xuxueli/xxl-job |
xxl-job: 一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。 |
GitCode - 全球开发者的开源社区,开源代码托管平台 |
获取源码解压即可
初始化调度数据库
打开项目我们可以获取到 调度数据库 ,路径为: xxl-job-master/doc/db/tables_xxl_job.sql
使用 数据库连接工具初始化运行即可。
基本配置
数据源datasource
随后打开使用 xxl-job-admin 模块,这个模块就是用于管理我们的调度。并修改我们数据相关配置:
邮箱email(可选)
email 的相关配置,就是 当我们调度执行失败的时候,可以通过 email 进行通知。具体email 的配置,通过个人的邮箱平台相关配置即可。
会话令牌access token
执行器 连接 调度中心 所需要的令牌。
启动调度中心
启动 admin 模块
调度中心访问地址: http://localhost:8080/xxl-job-admin
默认登录账号“admin/123456”,登录后运行界面如下图所示
启动执行器
打开你自己的项目,并进行相关配置。
依赖
<!-- xxl-job --><dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId><version>2.3.1</version></dependency>
yaml基本配置
dev:xxl:job:admin:### 调度中心部署根地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;addresses: http://localhost:8080/xxl-job-admin### 调度中心通讯TOKEN [选填]:非空时启用;accessToken: default### 调度中心通讯超时时间[选填],单位秒;默认3s;executor:### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册appname: xxl-job-executor-sample### 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。address:### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯使用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";ip: localhost### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;port: 9999### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;logpath: /data/applogs/xxl-job/jobhandler### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;logretentiondays: 30
XxlJobConfig类配置
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class XxlJobConfig {@Value("${xxl.job.admin.addresses}")private String adminAddresses;@Value("${xxl.job.admin.accessToken}")private String accessToken;@Value("${xxl.job.executor.appname}")private String appname;@Value("${xxl.job.executor.ip}")private String ip;@Value("${xxl.job.executor.port}")private int port;@Value("${xxl.job.executor.logpath}")private String logPath;@Value("${xxl.job.executor.logretentiondays}")private int logRetentionDays;@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {log.info(">>>>>>>>>>> xxl-job config init.");log.info("adminAddress:{}", adminAddresses);log.info("appname:{}", appname);log.info("ip:{}", ip);log.info("port:{}", port);log.info("accessToken:{}", accessToken);log.info("logPath:{}", logPath);log.info("logRetentionDays:{}", logRetentionDays);log.info(">>>>>>>>>>> xxl-job config init finish.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}
}
随后启动我们自己的项目。
同时查看执行器管理器
可以发现远程注册端口节点成功。
定义执行任务
添加执行任务
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;import java.util.Date;@Component
public class SimpleXxlJob {@XxlJob("simpleJobHandler") // 注解内的参数为我们运行模式为 Bean 类型对应的 JobHandlerpublic void simpleJobHandler() throws Exception {System.out.println("执行定时任务,执行时间>>>>>>>>>>> xxl-job, Hello World." + new Date());}
}
尝试执行一次。
通过我们执行一次成功后并调用对应方法,即可。可以根据我们自己需求进行启动配置对应的方法了。
初级阶段
有时候我们需要的是,用户使用自己的前端去设置触发的时间。并不是我们去xxl-job-admin 的管理端进行添加定时任务的。
时间转为Cron表达式工具类
以下是我收集的所用到的工具类。可以参考一下。
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;/**** <p>* 将时间转为Cron表达式* </p> ** @author Angindem* @since 2025-03-08**/
public class CronUtils {private static final DateTimeFormatter FORMAT = DateTimeFormatter.ofPattern("ss mm HH dd MM ? yyyy");public enum TimeCycle {YEAR, MONTH, WEEK, DAY, HOUR, MINUTE, SECOND}/*** 将LocalDateTime转换为cron表达式的字符串。* @param dateTime 要转换的时间字符串* @param format 要转换的时间格式* @return cron表达式*/public static String toCronExpression(String dateTime, String format) {LocalDateTime localDate = LocalDateTime.parse(dateTime, DateTimeFormatter.ofPattern(format));String date = localDate.format(FORMAT);return date;}/*** 将LocalDateTime转换为cron表达式的字符串。* @param dateTime 要转换的LocalDateTime* @return cron表达式*/public static String toCronExpression(LocalDateTime dateTime) {String date = dateTime.format(FORMAT);return date;}/*** 将多个 LocalDateTime 对象转换为一个 cron 表达式字符串* @param times LocalDateTime 对象列表* @return cron 表达式字符串*/public static String convertToCron(List<LocalDateTime> times) {// 提取秒、分、时、日、月、周几int second = times.get(0).getSecond();int minute = times.get(0).getMinute();List<Integer> hours = new ArrayList<>();List<Integer> daysOfMonth = new ArrayList<>();List<Integer> months = new ArrayList<>();List<Integer> daysOfWeek = new ArrayList<>();for (LocalDateTime time : times) {hours.add(time.getHour());daysOfMonth.add(time.getDayOfMonth());months.add(time.getMonthValue());daysOfWeek.add(time.getDayOfWeek().getValue());}// 构造Cron表达式StringBuilder cron = new StringBuilder();cron.append(second).append(" ");cron.append(minute).append(" ");cron.append(String.join(",", hours.stream().map(Object::toString).collect(Collectors.toList()))).append(" ");cron.append(String.join(",", daysOfMonth.stream().map(Object::toString).collect(Collectors.toList()))).append(" ");cron.append(String.join(",", months.stream().map(Object::toString).collect(Collectors.toList()))).append(" ");cron.append(String.join(",", daysOfWeek.stream().map(Object::toString).collect(Collectors.toList())));return cron.toString();}/*** 将指定的 LocalDateTime 对象转换为 指定周期的 cron 表达式字符串* @param dateTime LocalDateTime 对象* @param timeCycle 时间周期枚举值* @return cron 表达式字符串*/public static String toCronExpression(LocalDateTime dateTime, TimeCycle timeCycle) {String cron = null;switch (timeCycle) {case YEAR:cron = String.format("%d %d %d %d %d ? *", dateTime.getSecond(),dateTime.getMinute(), dateTime.getHour(), dateTime.getDayOfMonth(),dateTime.getMonthValue());break;case MONTH:cron = String.format("%d %d %d %d * ? *", dateTime.getSecond(),dateTime.getMinute(), dateTime.getHour(), dateTime.getDayOfMonth());break;case WEEK:cron = String.format("%d %d %d ? * %d *", dateTime.getSecond(),dateTime.getMinute(), dateTime.getHour(), dateTime.getDayOfWeek().getValue() % 7);break;case DAY:cron = String.format("%d %d %d * * ? *", dateTime.getSecond(),dateTime.getMinute(), dateTime.getHour());break;case HOUR:cron = String.format("%d %d * * * ? *", dateTime.getSecond(),dateTime.getMinute());break;case MINUTE:cron = String.format("%d * * * * ? *", dateTime.getSecond());break;case SECOND:cron = "0/1 * * * * ? *";break;default:throw new IllegalArgumentException("Unknown time cycle: " + timeCycle);}return cron;}
}
XxlJobRemoteApiUtils工具类
转完 Cron 表达式后,我们可以通过远程调用 xxl-job-admin 的对应接口进行操作添加。
引入远程发送请求依赖
<!--httpclient的坐标用于在java中发起请求--><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.13</version></dependency><!--使用fastjson解析json数据 --><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.42</version></dependency>
以下是我收集并使用的工具类,可以做一下参考。
import com.fasterxml.jackson.databind.ObjectMapper;
import com.pea.mic.domain.po.XxlJobInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;@Slf4j
@Component
public class XxlJobRemoteApiUtils {private static String adminAddresses;private static String appname;private static String accessToken;private final static RestTemplate restTemplate = new RestTemplate();private static final String ADD_URL = "/jobinfo/add";private static final String UPDATE_URL = "/jobinfo/update";private static final String REMOVE_URL = "/jobinfo/remove";private static final String PAUSE_URL = "/jobinfo/pause";private static final String START_URL = "/jobinfo/start";@Autowiredpublic void init(Environment env) {adminAddresses = env.getProperty("xxl.job.admin.addresses");appname = env.getProperty("xxl.job.executor.appname");accessToken = env.getProperty("xxl.job.admin.accessToken");log.info("xxl.job.admin.addresses:{}", adminAddresses);log.info("xxl.job.executor.appname:{}", appname);log.info("xxl.job.accessToken:{}", accessToken);}public static Map getJobInfoByLocalDateTime(LocalDateTime times,String author,String JobDesc,String taskHandler){String cron = CronUtils.toCronExpression(times);XxlJobInfo jobInfo = new XxlJobInfo();jobInfo.setJobGroup(2);jobInfo.setJobDesc(JobDesc);jobInfo.setAuthor(author);jobInfo.setScheduleType("CRON");jobInfo.setMisfireStrategy("DO_NOTHING");//执行时间jobInfo.setScheduleConf(cron);jobInfo.setGlueType("BEAN");jobInfo.setExecutorHandler(taskHandler);jobInfo.setExecutorRouteStrategy("FIRST");jobInfo.setExecutorBlockStrategy("SERIAL_EXECUTION");jobInfo.setExecutorTimeout(0);jobInfo.setExecutorFailRetryCount(0);jobInfo.setGlueType("BEAN");jobInfo.setGlueRemark("GLUE代码初始化");jobInfo.setTriggerStatus(0);jobInfo.setTriggerLastTime(0);jobInfo.setTriggerNextTime(0);ObjectMapper objectMapper = new ObjectMapper();Map map = objectMapper.convertValue(jobInfo, Map.class);return map;}public static String add(Map param){return doPost(adminAddresses + ADD_URL, param);}public static String update(String id, String cron){Map param = new HashMap<>();param.put("id", id);param.put("jobCron", cron);return doPost(adminAddresses + UPDATE_URL, param);}public static String remove(String id){Map param = new HashMap<>();param.put("id", id);return doGet(adminAddresses + REMOVE_URL, param);}public static String pause(String id){Map param = new HashMap<>();param.put("id", id);return doGet(adminAddresses + PAUSE_URL, param);}public static String start(String id){Map param = new HashMap<>();param.put("id", id);return doGet(adminAddresses + START_URL, param);}public static String doPost(String url, Map fromData){HttpHeaders headers = new HttpHeaders();headers.setContentType(MediaType.APPLICATION_JSON);headers.add("accessToken", accessToken);HttpEntity<Map> entity = new HttpEntity<>(fromData ,headers);log.info(entity.toString());ResponseEntity<String> stringResponseEntity = restTemplate.postForEntity(url, entity, String.class);return stringResponseEntity.getBody().toString();}public static String doGet(String url, Map<String, String> params) {// 创建可关闭的HttpClient,使用try-with-resources确保资源自动关闭try (CloseableHttpClient httpClient = HttpClients.createDefault()) {// 使用URIBuilder来构建带参数的URLURIBuilder uriBuilder = new URIBuilder(url);// 将Map中的参数添加到URL中if (params != null) {for (Map.Entry<String, String> entry : params.entrySet()) {uriBuilder.setParameter(entry.getKey(), entry.getValue());}}// 创建GET请求对象HttpGet httpGet = new HttpGet(uriBuilder.build());httpGet.setHeader("accessToken", accessToken);// 设置请求配置(超时等)
// httpGet.setConfig(buildRequestConfig());// 执行请求并获取响应try (CloseableHttpResponse response = httpClient.execute(httpGet)) {// 判断响应状态码是否为200(成功)if (response.getStatusLine().getStatusCode() == 200) {// 获取响应内容并转换为字符串String result = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);log.info("GET Response body: {}", result);return result;}}} catch (Exception e) {log.error("发送GET请求出错: ", e);}return null;}
}
允许远程调用
通过二次开发xxl-job-admin,允许远程调用,修改方法如下:
通过发送请求并,走的流程,可以直到调用接口的时候是通过请求参数中的XXL_JOB_LOGIN_IDENTITY 进行校验,我们可以通过当我们发送的请求参数,直接获取即可cookieToken,即可。
PS:由于博主已经实验成功过了,具体方法,大家可以参考参考即可。博主就不走结果啦。
----------------------------持续更新中----------------------------